From aea684506945a12312fc05fb3bb4f549da93f7f5 Mon Sep 17 00:00:00 2001 From: Matt Newell Date: Thu, 22 Mar 2012 10:42:56 -0700 Subject: Fix postgres notification support in the QPSQLDriver. This patch fixes a critical bug in the qsqlpsql driver where notifications aren't delivered when received. Any blocking libpq function(specifically PQexec) will read all the incoming data from the socket, including any pending notifications. This would cause the socket notifier to never be fired for incoming notifications that are already queued inside libpq. The qsqldriver test case was skipping the postgres notification test because of this bug, now its enabled and passing. In order to fix this bug I made a wrapper function for PQexec in QPSQLDriverPrivate that calls _q_handleNotification via QMetaObject::callMethod QueuedConnection in order to deliver pending notifications when control returns to the event loop. I also added a flag to ensure only one call is made each time the event loop is entered. Change-Id: I19f5297094ae7ae46bfb0717e4fca744d69f7b92 Reviewed-by: Honglei Zhang Reviewed-by: Mark Brand --- src/sql/drivers/psql/qsql_psql.cpp | 94 +++++++++++++++++++++----------------- 1 file changed, 53 insertions(+), 41 deletions(-) (limited to 'src/sql/drivers/psql/qsql_psql.cpp') diff --git a/src/sql/drivers/psql/qsql_psql.cpp b/src/sql/drivers/psql/qsql_psql.cpp index ec31d54f0f..10374bb51e 100644 --- a/src/sql/drivers/psql/qsql_psql.cpp +++ b/src/sql/drivers/psql/qsql_psql.cpp @@ -123,14 +123,21 @@ inline void qPQfreemem(void *buffer) class QPSQLDriverPrivate { public: - QPSQLDriverPrivate() : connection(0), isUtf8(false), pro(QPSQLDriver::Version6), sn(0) {} + QPSQLDriverPrivate(QPSQLDriver *qq) : q(qq), connection(0), isUtf8(false), pro(QPSQLDriver::Version6), sn(0), pendingNotifyCheck(false) {} + QPSQLDriver *q; PGconn *connection; bool isUtf8; QPSQLDriver::Protocol pro; QSocketNotifier *sn; QStringList seid; + mutable bool pendingNotifyCheck; void appendTables(QStringList &tl, QSqlQuery &t, QChar type); + PGresult * exec(const char * stmt) const; + PGresult * exec(const QString & stmt) const; + QPSQLDriver::Protocol getPSQLVersion(); + bool setEncodingUtf8(); + void setDatestyle(); }; void QPSQLDriverPrivate::appendTables(QStringList &tl, QSqlQuery &t, QChar type) @@ -157,6 +164,21 @@ void QPSQLDriverPrivate::appendTables(QStringList &tl, QSqlQuery &t, QChar type) } } +PGresult * QPSQLDriverPrivate::exec(const char * stmt) const +{ + PGresult *result = PQexec(connection, stmt); + if (seid.size() && !pendingNotifyCheck) { + pendingNotifyCheck = true; + QMetaObject::invokeMethod(q, "_q_handleNotification", Qt::QueuedConnection, Q_ARG(int,0)); + } + return result; +} + +PGresult * QPSQLDriverPrivate::exec(const QString & stmt) const +{ + return exec(isUtf8 ? stmt.toUtf8().constData() : stmt.toLocal8Bit().constData()); +} + class QPSQLResultPrivate { public: @@ -251,9 +273,7 @@ static QVariant::Type qDecodePSQLType(int t) static void qDeallocatePreparedStmt(QPSQLResultPrivate *d) { const QString stmt = QLatin1String("DEALLOCATE ") + d->preparedStmtId; - PGresult *result = PQexec(d->driver->connection, - d->driver->isUtf8 ? stmt.toUtf8().constData() - : stmt.toLocal8Bit().constData()); + PGresult *result = d->driver->exec(stmt); if (PQresultStatus(result) != PGRES_COMMAND_OK) qWarning("Unable to free statement: %s", PQerrorMessage(d->driver->connection)); @@ -431,9 +451,7 @@ bool QPSQLResult::reset (const QString& query) return false; if (!driver()->isOpen() || driver()->isOpenError()) return false; - d->result = PQexec(d->driver->connection, - d->driver->isUtf8 ? query.toUtf8().constData() - : query.toLocal8Bit().constData()); + d->result = d->driver->exec(query); return d->processResults(); } @@ -564,9 +582,7 @@ bool QPSQLResult::prepare(const QString &query) const QString stmtId = qMakePreparedStmtId(); const QString stmt = QString::fromLatin1("PREPARE %1 AS ").arg(stmtId).append(qReplacePlaceholderMarkers(query)); - PGresult *result = PQexec(d->driver->connection, - d->driver->isUtf8 ? stmt.toUtf8().constData() - : stmt.toLocal8Bit().constData()); + PGresult *result = d->driver->exec(stmt); if (PQresultStatus(result) != PGRES_COMMAND_OK) { setLastError(qMakeError(QCoreApplication::translate("QPSQLResult", @@ -595,26 +611,24 @@ bool QPSQLResult::exec() else stmt = QString::fromLatin1("EXECUTE %1 (%2)").arg(d->preparedStmtId).arg(params); - d->result = PQexec(d->driver->connection, - d->driver->isUtf8 ? stmt.toUtf8().constData() - : stmt.toLocal8Bit().constData()); + d->result = d->driver->exec(stmt); return d->processResults(); } /////////////////////////////////////////////////////////////////// -static bool setEncodingUtf8(PGconn* connection) +bool QPSQLDriverPrivate::setEncodingUtf8() { - PGresult* result = PQexec(connection, "SET CLIENT_ENCODING TO 'UNICODE'"); + PGresult* result = exec("SET CLIENT_ENCODING TO 'UNICODE'"); int status = PQresultStatus(result); PQclear(result); return status == PGRES_COMMAND_OK; } -static void setDatestyle(PGconn* connection) +void QPSQLDriverPrivate::setDatestyle() { - PGresult* result = PQexec(connection, "SET DATESTYLE TO 'ISO'"); + PGresult* result = exec("SET DATESTYLE TO 'ISO'"); int status = PQresultStatus(result); if (status != PGRES_COMMAND_OK) qWarning("%s", PQerrorMessage(connection)); @@ -665,10 +679,10 @@ static QPSQLDriver::Protocol qMakePSQLVersion(int vMaj, int vMin) return QPSQLDriver::VersionUnknown; } -static QPSQLDriver::Protocol getPSQLVersion(PGconn* connection) +QPSQLDriver::Protocol QPSQLDriverPrivate::getPSQLVersion() { QPSQLDriver::Protocol serverVersion = QPSQLDriver::Version6; - PGresult* result = PQexec(connection, "select version()"); + PGresult* result = exec("select version()"); int status = PQresultStatus(result); if (status == PGRES_COMMAND_OK || status == PGRES_TUPLES_OK) { QString val = QString::fromAscii(PQgetvalue(result, 0, 0)); @@ -691,7 +705,7 @@ static QPSQLDriver::Protocol getPSQLVersion(PGconn* connection) //Client version before QPSQLDriver::Version9 only supports escape mode for bytea type, //but bytea format is set to hex by default in PSQL 9 and above. So need to force the //server use the old escape mode when connects to the new server with old client library. - result = PQexec(connection, "SET bytea_output=escape; "); + result = exec("SET bytea_output=escape; "); status = PQresultStatus(result); } else if (serverVersion == QPSQLDriver::VersionUnknown) { serverVersion = clientVersion; @@ -726,7 +740,7 @@ QPSQLDriver::QPSQLDriver(PGconn *conn, QObject *parent) init(); d->connection = conn; if (conn) { - d->pro = getPSQLVersion(d->connection); + d->pro = d->getPSQLVersion(); setOpen(true); setOpenError(false); } @@ -734,7 +748,7 @@ QPSQLDriver::QPSQLDriver(PGconn *conn, QObject *parent) void QPSQLDriver::init() { - d = new QPSQLDriverPrivate(); + d = new QPSQLDriverPrivate(this); } QPSQLDriver::~QPSQLDriver() @@ -826,9 +840,9 @@ bool QPSQLDriver::open(const QString & db, return false; } - d->pro = getPSQLVersion(d->connection); - d->isUtf8 = setEncodingUtf8(d->connection); - setDatestyle(d->connection); + d->pro = d->getPSQLVersion(); + d->isUtf8 = d->setEncodingUtf8(); + d->setDatestyle(); setOpen(true); setOpenError(false); @@ -865,7 +879,7 @@ bool QPSQLDriver::beginTransaction() qWarning("QPSQLDriver::beginTransaction: Database not open"); return false; } - PGresult* res = PQexec(d->connection, "BEGIN"); + PGresult* res = d->exec("BEGIN"); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { PQclear(res); setLastError(qMakeError(tr("Could not begin transaction"), @@ -882,7 +896,7 @@ bool QPSQLDriver::commitTransaction() qWarning("QPSQLDriver::commitTransaction: Database not open"); return false; } - PGresult* res = PQexec(d->connection, "COMMIT"); + PGresult* res = d->exec("COMMIT"); bool transaction_failed = false; @@ -915,7 +929,7 @@ bool QPSQLDriver::rollbackTransaction() qWarning("QPSQLDriver::rollbackTransaction: Database not open"); return false; } - PGresult* res = PQexec(d->connection, "ROLLBACK"); + PGresult* res = d->exec("ROLLBACK"); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { setLastError(qMakeError(tr("Could not rollback transaction"), QSqlError::TransactionError, d)); @@ -1298,11 +1312,11 @@ bool QPSQLDriver::subscribeToNotificationImplementation(const QString &name) int socket = PQsocket(d->connection); if (socket) { + // Add the name to the list of subscriptions here so that QSQLDriverPrivate::exec knows + // to check for notifications immediately after executing the LISTEN + d->seid << name; QString query = QLatin1String("LISTEN ") + escapeIdentifier(name, QSqlDriver::TableName); - if (PQresultStatus(PQexec(d->connection, - d->isUtf8 ? query.toUtf8().constData() - : query.toLocal8Bit().constData()) - ) != PGRES_COMMAND_OK) { + if (PQresultStatus(d->exec(query)) != PGRES_COMMAND_OK) { setLastError(qMakeError(tr("Unable to subscribe"), QSqlError::StatementError, d)); return false; } @@ -1311,9 +1325,11 @@ bool QPSQLDriver::subscribeToNotificationImplementation(const QString &name) d->sn = new QSocketNotifier(socket, QSocketNotifier::Read); connect(d->sn, SIGNAL(activated(int)), this, SLOT(_q_handleNotification(int))); } + } else { + qWarning("QPSQLDriver::subscribeToNotificationImplementation: PQsocket didn't return a valid socket to listen on"); + return false; } - d->seid << name; return true; } @@ -1331,10 +1347,7 @@ bool QPSQLDriver::unsubscribeFromNotificationImplementation(const QString &name) } QString query = QLatin1String("UNLISTEN ") + escapeIdentifier(name, QSqlDriver::TableName); - if (PQresultStatus(PQexec(d->connection, - d->isUtf8 ? query.toUtf8().constData() - : query.toLocal8Bit().constData()) - ) != PGRES_COMMAND_OK) { + if (PQresultStatus(d->exec(query)) != PGRES_COMMAND_OK) { setLastError(qMakeError(tr("Unable to unsubscribe"), QSqlError::StatementError, d)); return false; } @@ -1357,6 +1370,7 @@ QStringList QPSQLDriver::subscribedToNotificationsImplementation() const void QPSQLDriver::_q_handleNotification(int) { + d->pendingNotifyCheck = false; PQconsumeInput(d->connection); PGnotify *notify = 0; @@ -1364,10 +1378,8 @@ void QPSQLDriver::_q_handleNotification(int) QString name(QLatin1String(notify->relname)); if (d->seid.contains(name)) { emit notification(name); - if (notify->be_pid == PQbackendPID(d->connection)) - emit notification(name, QSqlDriver::SelfSource); - else - emit notification(name, QSqlDriver::OtherSource); + QSqlDriver::NotificationSource source = (notify->be_pid == PQbackendPID(d->connection)) ? QSqlDriver::SelfSource : QSqlDriver::OtherSource; + emit notification(name, source); } else qWarning("QPSQLDriver: received notification for '%s' which isn't subscribed to.", -- cgit v1.2.3 From d5e9616e399e68838f99ae4c123930b330bc4221 Mon Sep 17 00:00:00 2001 From: Matt Newell Date: Thu, 22 Mar 2012 10:42:56 -0700 Subject: Add payload to QSqlDriver notification with PSQL implementation. Postgres async notifications can contain a payload parameter that is currently discarded. This patch provides the QSqlDriver api change necessary to deliver a payload with each emitted notification by adding a QVariant parameter to the notification signal. It also provides the implementation for the qsqlpsql driver. The qsql_ibase driver has been updated to reflect the change to the notification signal signature. The eventNotificationPSQL test in the qsqldatabase test has been expanded to test proper payload sending and receiving. All tests/auto/sql/kernel tests have been run with sqllite and postgres with no regressions. Task-number: QTBUG-13500 Change-Id: I9137f6acc8cfca93f45791ca930e0287d93d5d0d Reviewed-by: Mark Brand --- src/sql/drivers/psql/qsql_psql.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'src/sql/drivers/psql/qsql_psql.cpp') diff --git a/src/sql/drivers/psql/qsql_psql.cpp b/src/sql/drivers/psql/qsql_psql.cpp index 10374bb51e..de7aa5ea78 100644 --- a/src/sql/drivers/psql/qsql_psql.cpp +++ b/src/sql/drivers/psql/qsql_psql.cpp @@ -1377,9 +1377,14 @@ void QPSQLDriver::_q_handleNotification(int) while((notify = PQnotifies(d->connection)) != 0) { QString name(QLatin1String(notify->relname)); if (d->seid.contains(name)) { + QString payload; +#if defined PG_VERSION_NUM && PG_VERSION_NUM-0 >= 70400 + if (notify->extra) + payload = d->isUtf8 ? QString::fromUtf8(notify->extra) : QString::fromAscii(notify->extra); +#endif emit notification(name); QSqlDriver::NotificationSource source = (notify->be_pid == PQbackendPID(d->connection)) ? QSqlDriver::SelfSource : QSqlDriver::OtherSource; - emit notification(name, source); + emit notification(name, source, payload); } else qWarning("QPSQLDriver: received notification for '%s' which isn't subscribed to.", -- cgit v1.2.3