diff options
Diffstat (limited to 'src/daemon/dbserver.cpp')
-rw-r--r-- | src/daemon/dbserver.cpp | 587 |
1 files changed, 431 insertions, 156 deletions
diff --git a/src/daemon/dbserver.cpp b/src/daemon/dbserver.cpp index 29b050c0..845100a0 100644 --- a/src/daemon/dbserver.cpp +++ b/src/daemon/dbserver.cpp @@ -39,14 +39,12 @@ ** ****************************************************************************/ -#include <QtCore> #include <QtNetwork> +#include <QDir> #include <QElapsedTimer> -#include <json.h> - -#include "jsondb-strings.h" -#include "jsondb-error.h" +#include "jsondbstrings.h" +#include "jsondberrors.h" #include "jsondbephemeralpartition.h" #include "jsondbindexquery.h" @@ -61,7 +59,7 @@ #include <errno.h> #endif -QT_BEGIN_NAMESPACE_JSONDB +QT_USE_NAMESPACE_JSONDB_PARTITION static const int gReadBufferSize = 65536; @@ -92,45 +90,58 @@ static void sendError( JsonStream *stream, JsonDbError::ErrorCode code, stream->send(map); } -DBServer::DBServer(const QString &filePath, const QString &baseName, QObject *parent) - : QObject(parent), - mDefaultPartition(0), - mEphemeralPartition(0), - mTcpServerPort(0), - mServer(0), - mTcpServer(0), - mOwner(new JsonDbOwner(this)), - mFilePath(filePath), - mBaseName(baseName) +DBServer::DBServer(const QString &searchPath, QObject *parent) : + QObject(parent) + , mDefaultPartition(0) + , mEphemeralPartition(0) + , mTcpServerPort(0) + , mServer(0) + , mTcpServer(0) + , mOwner(new JsonDbOwner(this)) { // for queued connection handling qRegisterMetaType<JsonDbPartition*>("JsonDbPartition*"); qRegisterMetaType<QSet<QString> >("QSet<QString>"); + qRegisterMetaType<JsonDbUpdateList>("JsonDbUpdateList"); - QFileInfo info(filePath); + // make the user-specified path (or PWD) the first in the search path, then and + // the /etc one the last + QStringList searchPaths = jsondbSettings->configSearchPath(); + if (searchPath.isEmpty()) + searchPaths.prepend(QDir::currentPath()); + else if (!searchPaths.contains(searchPath)) + searchPaths.prepend(searchPath); - if (QString::compare(info.suffix(), QLatin1String("db"), Qt::CaseInsensitive) == 0) { - mFilePath = info.absolutePath(); - if (mBaseName.isEmpty()) - mBaseName = info.baseName(); - } + if (!searchPaths.contains(QLatin1String("/etc/jsondb"))) + searchPaths.append(QLatin1String("/etc/jsondb")); + jsondbSettings->setConfigSearchPath(searchPaths); - if (mFilePath.isEmpty()) - mFilePath = QDir::currentPath(); - if (mBaseName.isEmpty()) - mBaseName = QLatin1String("default.System"); - if (!mBaseName.endsWith(QLatin1String(".System"))) - mBaseName += QLatin1String(".System"); + mOwner->setAllowAll(true); +} - QDir(mFilePath).mkpath(QString(".")); +DBServer::~DBServer() +{ + close(); +} - mOwner->setAllowAll(true); +void DBServer::clearNotifications() +{ + QMapIterator<QString,JsonDbNotification*> mi(mNotificationMap); + while (mi.hasNext()) { + delete mi.value(); + mi.next(); + } + delete mi.value(); + mNotificationMap.clear(); + mNotifications.clear(); + mKeyedNotifications.clear(); } void DBServer::sigHUP() { if (jsondbSettings->debug()) qDebug() << "SIGHUP received"; + loadPartitions(); reduceMemoryUsage(); } @@ -220,7 +231,7 @@ void DBServer::close() partition->compact(); partition->close(); } - + clearNotifications(); QCoreApplication::exit(); } @@ -232,54 +243,87 @@ bool DBServer::loadPartitions() this, SLOT(objectsUpdated(JsonDbUpdateList))); } - QHash<QString, JsonDbPartition*> oldPartitions = mPartitions; - oldPartitions.remove(mBaseName); - - if (!mDefaultPartition) { - mDefaultPartition = new JsonDbPartition(QDir(mFilePath).absoluteFilePath(mBaseName + QLatin1String(".db")), - mBaseName, mOwner, this); - connect(mDefaultPartition, SIGNAL(objectsUpdated(JsonDbUpdateList)), - this, SLOT(objectsUpdated(JsonDbUpdateList))); - connect(mDefaultPartition, SIGNAL(viewUpdated(QString)), - this, SLOT(viewUpdated(QString)), - Qt::QueuedConnection); + QHash<QString, JsonDbPartition*> partitions; + QList<QJsonObject> definitions = findPartitionDefinitions(); + QString defaultPartitionName; - if (!mDefaultPartition->open()) - return false; + foreach (const QJsonObject &definition, definitions) { + QString name = definition.value(JsonDbString::kNameStr).toString(); - mPartitions[mBaseName] = mDefaultPartition; - } + if (definition.value(JsonDbString::kDefaultStr).toBool() && defaultPartitionName.isEmpty()) + defaultPartitionName = name; - JsonDbQueryResult partitions = mDefaultPartition->queryObjects(mOwner, JsonDbQuery::parse(QLatin1String("[?_type=\"Partition\"]"))); + if (mPartitions.contains(name)) { + partitions[name] = mPartitions.take(name); + } else { - foreach (const JsonDbObject &partition, partitions.data) { - if (partition.contains(JsonDbString::kNameStr)) { - QString name = partition.value(JsonDbString::kNameStr).toString(); + if (partitions.contains(name)) { + qWarning() << "Duplicate partition name:" << name; + continue; + } - if (!mPartitions.contains(name)) { - QString filename = partition.contains(QLatin1String("file")) ? - partition.value(QLatin1String("file")).toString() : - QDir(mFilePath).absoluteFilePath(name + QLatin1String(".db")); - JsonDbPartition *p = new JsonDbPartition(filename, name, mOwner, this); - connect(p, SIGNAL(objectsUpdated(JsonDbUpdateList)), - this, SLOT(objectsUpdated(JsonDbUpdateList))); - connect(p, SIGNAL(viewUpdated(QString)), - this, SLOT(viewUpdated(QString)), - Qt::QueuedConnection); + QString path = definition.value(JsonDbString::kPathStr).toString(); + QDir pathDir(path); + pathDir.mkpath(QLatin1String(".")); - if (!p->open()) - return false; + JsonDbPartition *partition = new JsonDbPartition(pathDir.absoluteFilePath(name), name, mOwner, this); + partitions[name] = partition; + connect(partition, SIGNAL(objectsUpdated(JsonDbUpdateList)), this, SLOT(objectsUpdated(JsonDbUpdateList))); - mPartitions[name] = p; + // TODO: for removable partitions, this shouldn't cause a total failure + if (!partition->open()) { + close(); + return false; } - oldPartitions.remove(name); + // create an object in the Ephemeral partition to reflect this partition + JsonDbObject partitionRecord(definition); + partitionRecord.insert(JsonDbString::kUuidStr, JsonDbObject::createUuidFromString(name).toString()); + partitionRecord.insert(JsonDbString::kTypeStr, JsonDbString::kPartitionTypeStr); + mEphemeralPartition->updateObjects(mOwner, JsonDbObjectList() << partitionRecord, JsonDbPartition::ForcedWrite); } } // close any partitions that were declared previously but are no longer present - foreach (JsonDbPartition *p, oldPartitions.values()) - p->close(); + foreach (JsonDbPartition *partition, mPartitions.values()) { + + if (mDefaultPartition == partition) + mDefaultPartition = 0; + + QList<JsonDbObject> toRemove; + + // remove the ephemeral object representing this partition + JsonDbObject partitionRecord; + partitionRecord.insert(JsonDbString::kUuidStr, JsonDbObject::createUuidFromString(partition->name()).toString()); + partitionRecord.insert(JsonDbString::kTypeStr, JsonDbString::kPartitionTypeStr); + partitionRecord.markDeleted(); + toRemove.append(partitionRecord); + + // remove any notifications for the partition being closed + QJsonObject bindings; + bindings.insert(QLatin1String("notification"), JsonDbString::kNotificationTypeStr); + bindings.insert(QLatin1String("partition"), partition->name()); + + QScopedPointer<JsonDbQuery> query(JsonDbQuery::parse(QLatin1String("[?_type=%notification][?partition=%partition]"), + bindings)); + JsonDbQueryResult results = mEphemeralPartition->queryObjects(mOwner, query.data()); + foreach (const JsonDbObject &result, results.data) { + JsonDbObject notification = result; + notification.markDeleted(); + toRemove.append(notification); + } + + mEphemeralPartition->updateObjects(mOwner, toRemove, JsonDbPartition::ForcedWrite); + + disconnect(partition, SIGNAL(objectsUpdated(JsonDbUpdateList)), this, SLOT(objectsUpdated(JsonDbUpdateList))); + partition->close(); + delete partition; + } + + mPartitions = partitions; + + if (!mDefaultPartition) + mDefaultPartition = mPartitions[defaultPartitionName]; return true; } @@ -305,7 +349,8 @@ void DBServer::handleConnection() if (jsondbSettings->debug()) qDebug() << "client connected to jsondb server" << connection; connect(connection, SIGNAL(disconnected()), this, SLOT(removeConnection())); - JsonStream *stream = new JsonStream(connection, this); + JsonStream *stream = new JsonStream(this); + stream->setDevice(connection); connect(stream, SIGNAL(receive(QJsonObject)), this, SLOT(receiveMessage(QJsonObject))); mConnections.insert(connection, stream); @@ -318,7 +363,8 @@ void DBServer::handleTcpConnection() if (jsondbSettings->debug()) qDebug() << "remote client connected to jsondb server" << connection; connect(connection, SIGNAL(disconnected()), this, SLOT(removeConnection())); - JsonStream *stream = new JsonStream(connection, this); + JsonStream *stream = new JsonStream(this); + stream->setDevice(connection); connect(stream, SIGNAL(receive(QJsonObject)), this, SLOT(receiveMessage(QJsonObject))); mConnections.insert(connection, stream); @@ -431,6 +477,9 @@ void DBServer::objectsUpdated(const QList<JsonDbUpdate> &objects) { QString partitionName; JsonDbPartition *partition = 0; + QList<JsonDbUpdate> updatesToEagerViews; + QSet<QString> eagerViewTypes; + bool isViewUpdate = false; if (sender() == mEphemeralPartition) { partitionName = mEphemeralPartition->name(); @@ -441,8 +490,14 @@ void DBServer::objectsUpdated(const QList<JsonDbUpdate> &objects) else return; } + quint32 partitionStateNumber = 0; + if (partition) + partition->mainObjectTable()->stateNumber(); + else if (mDefaultPartition) + partitionStateNumber = mDefaultPartition->mainObjectTable()->stateNumber(); - QSet<QString> eagerViewUpdates; + if (jsondbSettings->debug()) + qDebug() << "objectsUpdated" << partitionName << partitionStateNumber; // FIXME: pretty good place to batch notifications foreach (const JsonDbUpdate &updated, objects) { @@ -455,12 +510,13 @@ void DBServer::objectsUpdated(const QList<JsonDbUpdate> &objects) if (object.type() == JsonDbString::kNotificationTypeStr) continue; + QString oldObjectType = oldObject.value(JsonDbString::kTypeStr).toString(); QString objectType = object.value(JsonDbString::kTypeStr).toString(); quint32 stateNumber; if (partition) { JsonDbObjectTable *objectTable = partition->findObjectTable(objectType); stateNumber = objectTable->stateNumber(); - } else + } else if (partitionName != mEphemeralPartition->name()) stateNumber = mDefaultPartition->mainObjectTable()->stateNumber(); QStringList notificationKeys; @@ -468,9 +524,37 @@ void DBServer::objectsUpdated(const QList<JsonDbUpdate> &objects) notificationKeys << objectType; // eagerly update views if this object that was created isn't a view type itself - if (mEagerViewSourceTypes.contains(objectType) && partition - && !partition->findView(objectType)) - eagerViewUpdates.insert(objectType); + if (partition) { + WeightedSourceViewGraph &sourceViewGraph = mEagerViewSourceGraph[partitionName]; + if (jsondbSettings->verbose()) qDebug() << "objectType" << objectType << sourceViewGraph.contains(oldObjectType) << sourceViewGraph.contains(objectType); + if (partition->findView(objectType)) { + if (jsondbSettings->verbose()) qDebug() << "isViewUpdate" << objectType; + isViewUpdate = true; + } else if ((sourceViewGraph.contains(oldObjectType)) + || (sourceViewGraph.contains(objectType))) { + JsonDbUpdateList updateList; + if (oldObjectType == objectType) { + updateList.append(updated); + } else { + JsonDbObject tombstone(oldObject); + tombstone.insert(QLatin1String("_deleted"), true); + updateList.append(JsonDbUpdate(oldObject, tombstone, JsonDbNotification::Delete)); + updateList.append(JsonDbUpdate(JsonDbObject(), object, JsonDbNotification::Create)); + } + foreach (const JsonDbUpdate &splitUpdate, updateList) { + const QString updatedObjectType = splitUpdate.newObject.type(); + const ViewEdgeWeights &edgeWeights = sourceViewGraph[updatedObjectType]; + for (ViewEdgeWeights::const_iterator it = edgeWeights.begin(); it != edgeWeights.end(); ++it) { + if (jsondbSettings->verbose()) qDebug() << "edge weight" << updatedObjectType << it.key() << it.value().count; + if (it.value() > 0) { + eagerViewTypes.insert(it.key()); + if (sourceViewGraph.contains(updatedObjectType)) + updatesToEagerViews.append(splitUpdate); + } + } + } + } + } } if (object.contains(JsonDbString::kUuidStr)) @@ -491,31 +575,119 @@ void DBServer::objectsUpdated(const QList<JsonDbUpdate> &objects) } } - if (!eagerViewUpdates.isEmpty()) + if (isViewUpdate) + return; + if (updatesToEagerViews.isEmpty()) { + updateEagerViewStateNumbers(partition ? partition : mDefaultPartition, partitionStateNumber); + emitStateChanged(partition); + } else { QMetaObject::invokeMethod(this, "updateEagerViews", Qt::QueuedConnection, Q_ARG(JsonDbPartition*, partition), - Q_ARG(QSet<QString>, eagerViewUpdates)); + Q_ARG(QSet<QString>, eagerViewTypes), + Q_ARG(JsonDbUpdateList, updatesToEagerViews)); + } } -void DBServer::viewUpdated(const QString &type) +// Updates the in-memory state numbers on each view so that we know it +// has seen all relevant updates from this transaction +void DBServer::updateEagerViewStateNumbers(JsonDbPartition *partition, quint32 partitionStateNumber) { - JsonDbPartition *partition = qobject_cast<JsonDbPartition*>(sender()); if (!partition) return; + if (jsondbSettings->verbose()) + qDebug() << "updateEagerViewStateNumbers" << (partition ? partition->name() : "no partition") << partitionStateNumber << "{"; + const QString &partitionName = partition->name(); + WeightedSourceViewGraph &sourceViewGraph = mEagerViewSourceGraph[partitionName]; + foreach (const ViewEdgeWeights &edgeWeights, sourceViewGraph) { + for (ViewEdgeWeights::const_iterator jt = edgeWeights.begin(); jt != edgeWeights.end(); ++jt) { + const QString &viewType = jt.key(); + if (jt.value() == 0) + continue; + JsonDbView *view = partition->findView(viewType); + if (view) + view->updateViewStateNumber(partitionStateNumber); + else + if (jsondbSettings->debug()) + qCritical() << "no view for" << viewType << partition->name(); + } + } + if (jsondbSettings->verbose()) + qDebug() << "updateEagerViewStateNumbers" << (partition ? partition->name() : "no partition") << partitionStateNumber << "}"; +} - if (mEagerViewSourceTypes.contains(type)) - updateEagerViews(partition, QSet<QString>() << type); +void DBServer::emitStateChanged(JsonDbPartition *partition) +{ + if (!partition) + return; + quint32 lastStateNumber = partition->mainObjectTable()->stateNumber(); + QJsonObject stateChange; + stateChange.insert("_state", static_cast<int>(lastStateNumber)); + foreach (const JsonDbNotification *n, mNotificationMap) { + if (n->lastStateNumber() == lastStateNumber + && n->partition() == partition->name()) + emit notified(n->uuid(), lastStateNumber, stateChange, "stateChange"); + } } -void DBServer::updateEagerViews(JsonDbPartition *partition, const QSet<QString> &viewTypes) +void DBServer::updateEagerViews(JsonDbPartition *partition, QSet<QString> viewTypes, QList<JsonDbUpdate> changeList) { - foreach (const QString &type, viewTypes) { - const QSet<QString> &targetTypes = mEagerViewSourceTypes[type]; - for (QSet<QString>::const_iterator it = targetTypes.begin(); it != targetTypes.end(); ++it) { - if (partition) - partition->updateView(*it); + QSet<QString> viewsUpdated; + + if (jsondbSettings->verbose()) + qDebug() << "updateEagerViews {" << partition->mainObjectTable()->stateNumber() << viewTypes; + quint32 partitionStateNumber = partition->mainObjectTable()->stateNumber(); + const QString &partitionName = partition->name(); + WeightedSourceViewGraph &sourceViewGraph = mEagerViewSourceGraph[partitionName]; + + while (!viewTypes.isEmpty()) { + bool madeProgress = false; + foreach (const QString &targetType, viewTypes) { + JsonDbView *view = partition->findView(targetType); + if (!view) { + if (jsondbSettings->verbose()) + qWarning() << "non-view viewType?" << targetType << "eager views to update" << viewTypes; + viewTypes.remove(targetType); + madeProgress = true; + continue; + } + QSet<QString> typesNeeded(view->sourceTypeSet()); + typesNeeded.intersect(viewTypes); + if (!typesNeeded.isEmpty()) + continue; + viewTypes.remove(targetType); + QList<JsonDbUpdate> additionalChanges; + view->updateEagerView(changeList, &additionalChanges); + if (jsondbSettings->verbose()) + qDebug() << "updated view" << targetType << additionalChanges.size() << additionalChanges; + changeList.append(additionalChanges); + viewsUpdated.insert(targetType); + // if this triggers other eager types, we need to update that also + if (sourceViewGraph.contains(targetType)) { + const ViewEdgeWeights &edgeWeights = sourceViewGraph[targetType]; + for (ViewEdgeWeights::const_iterator it = edgeWeights.begin(); it != edgeWeights.end(); ++it) { + if (it.value() == 0) + continue; + const QString &viewType = it.key(); + if (viewsUpdated.contains(viewType)) + qWarning() << "View update cycle detected" << targetType << viewType << viewsUpdated; + else + viewTypes.insert(viewType); + } + } + + madeProgress = true; + } + if (!madeProgress) { + qCritical() << "Failed to update any views" << viewTypes; + break; } } + + updateEagerViewStateNumbers(partition, partitionStateNumber); + emitStateChanged(partition); + + if (jsondbSettings->verbose()) + qDebug() << "updateEagerViews }" << partition->mainObjectTable()->stateNumber() << viewsUpdated; } void DBServer::objectUpdated(const QString &partitionName, quint32 stateNumber, JsonDbNotification *n, @@ -549,10 +721,23 @@ void DBServer::objectUpdated(const QString &partitionName, quint32 stateNumber, r = object; } if (!r.isEmpty()&& (n->actions() & effectiveAction)) { + JsonDbPartition *partition = findPartition(partitionName); + if (partition && !n->parsedQuery()->orderTerms.isEmpty()) { + const QString &indexName = n->parsedQuery()->orderTerms[0].propertyName; + QString objectType = r.type(); + JsonDbObjectTable *objectTable = partition->findObjectTable(objectType); + IndexSpec *indexSpec = objectTable->indexSpec(indexName); + if (indexSpec) { + QList<QJsonValue> indexValues = indexSpec->index->indexValues(r); + if (!indexValues.isEmpty()) + r.insert(JsonDbString::kIndexValueStr, indexValues[0]); + } + } QString actionStr = (effectiveAction == JsonDbNotification::Create ? JsonDbString::kCreateStr : (effectiveAction == JsonDbNotification::Update ? JsonDbString::kUpdateStr : JsonDbString::kRemoveStr)); notified(n->uuid(), stateNumber, r, actionStr); + n->setLastStateNumber(stateNumber); } } } @@ -587,10 +772,6 @@ void DBServer::processWrite(JsonStream *stream, JsonDbOwner *owner, const JsonDb removeNotification(object); if (!object.isDeleted()) createNotification(object, stream); - - // handle partitions - } else if (object.type() == JsonDbString::kPartitionTypeStr) { - loadPartitions(); } } @@ -652,18 +833,11 @@ void DBServer::processRead(JsonStream *stream, JsonDbOwner *owner, const QJsonVa response.insert(JsonDbString::kResultStr, QJsonValue()); } else { QJsonObject result; - if (queryResult.values.size()) { - result.insert(JsonDbString::kDataStr, queryResult.values); - result.insert(JsonDbString::kLengthStr, queryResult.values.size()); - } else { - QJsonArray values; - for (int i = 0; i < queryResult.data.size(); i++) { - JsonDbObject d = queryResult.data.at(i); - values.append(d); - } - result.insert(JsonDbString::kDataStr, values); - result.insert(JsonDbString::kLengthStr, values.size()); - } + QJsonArray data; + for (int i = 0; i < queryResult.data.size(); i++) + data.append(queryResult.data.at(i)); + result.insert(JsonDbString::kDataStr, data); + result.insert(JsonDbString::kLengthStr, data.size()); result.insert(JsonDbString::kOffsetStr, queryResult.offset); result.insert(JsonDbString::kExplanationStr, queryResult.explanation); result.insert("sortKeys", queryResult.sortKeys); @@ -678,6 +852,7 @@ void DBServer::processRead(JsonStream *stream, JsonDbOwner *owner, const QJsonVa void DBServer::processChangesSince(JsonStream *stream, JsonDbOwner *owner, const QJsonValue &object, const QString &partitionName, int id) { + Q_UNUSED(owner); QJsonObject result; if (object.type() == QJsonValue::Object) { @@ -703,6 +878,7 @@ void DBServer::processChangesSince(JsonStream *stream, JsonDbOwner *owner, const void DBServer::processFlush(JsonStream *stream, JsonDbOwner *owner, const QString &partitionName, int id) { + Q_UNUSED(owner); JsonDbPartition *partition = mPartitions.value(partitionName, mDefaultPartition); QJsonObject result = partition->flush(); result.insert(JsonDbString::kIdStr, id); @@ -715,14 +891,15 @@ void DBServer::debugQuery(JsonDbQuery *query, int limit, int offset, const JsonD for (int i = 0; i < orQueryTerms.size(); i++) { const OrQueryTerm &orQueryTerm = orQueryTerms[i]; foreach (const QueryTerm &queryTerm, orQueryTerm.terms()) { - if (jsondbSettings->verbose()) + if (jsondbSettings->verbose()) { qDebug() << __FILE__ << __LINE__ - << QString(" %1%2%3 %4 %5 ") - .arg(queryTerm.propertyName()) - .arg(queryTerm.joinField().size() ? "->" : "") - .arg(queryTerm.joinField()) - .arg(queryTerm.op()) - .arg(JsonWriter().toString(queryTerm.value().toVariant())); + << QString(" %1%2%3 %4") + .arg(queryTerm.propertyName()) + .arg(queryTerm.joinField().size() ? "->" : "") + .arg(queryTerm.joinField()) + .arg(queryTerm.op()) + << queryTerm.value(); + } } } @@ -797,18 +974,20 @@ void DBServer::createNotification(const JsonDbObject &object, JsonStream *stream QStringList actions = QVariant(object.value(JsonDbString::kActionsStr).toArray().toVariantList()).toStringList(); QString query = object.value(JsonDbString::kQueryStr).toString(); QJsonObject bindings = object.value("bindings").toObject(); - QString partition = object.value(JsonDbString::kPartitionStr).toString(); - - bool ok; - quint32 stateNumber = object.value("initialStateNumber").toVariant().toInt(&ok); - if (!ok) - stateNumber = 0; + QString partitionName = object.value(JsonDbString::kPartitionStr).toString(); + quint32 stateNumber = 0; - if (partition.isEmpty()) - partition = mDefaultPartition->name(); + if (partitionName.isEmpty()) + partitionName = mDefaultPartition->name(); + JsonDbPartition *partition = findPartition(partitionName); - JsonDbNotification *n = new JsonDbNotification(getOwner(stream), uuid, query, actions, partition); + JsonDbNotification *n = new JsonDbNotification(getOwner(stream), uuid, query, actions, partitionName); + if (object.contains("initialStateNumber") && object.value("initialStateNumber").isDouble()) + stateNumber = static_cast<quint32>(object.value("initialStateNumber").toDouble()); + else if (partition) + stateNumber = partition->mainObjectTable()->stateNumber(); n->setInitialStateNumber(stateNumber); + JsonDbQuery *parsedQuery = JsonDbQuery::parse(query, bindings); n->setCompiledQuery(parsedQuery); const QList<OrQueryTerm> &orQueryTerms = parsedQuery->queryTerms; @@ -840,11 +1019,10 @@ void DBServer::createNotification(const JsonDbObject &object, JsonStream *stream mNotifications[uuid] = stream; foreach (const QString &objectType, parsedQuery->matchedTypes()) - updateEagerViewTypes(objectType, mPartitions.value(partition, mDefaultPartition), stateNumber); + updateEagerViewTypes(objectType, mPartitions.value(partitionName, mDefaultPartition), stateNumber, 1); - if (stateNumber) + if (partition) notifyHistoricalChanges(n); - } void DBServer::removeNotification(const JsonDbObject &object) @@ -873,6 +1051,10 @@ void DBServer::removeNotification(const JsonDbObject &object) mKeyedNotifications.remove("__generic_notification__", n); + const QString &partitionName = n->partition(); + foreach (const QString &objectType, parsedQuery->matchedTypes()) + updateEagerViewTypes(objectType, mPartitions.value(partitionName, mDefaultPartition), 0, -1); + delete n; } } @@ -885,7 +1067,7 @@ void DBServer::notifyHistoricalChanges(JsonDbNotification *n) JsonDbQuery *parsedQuery = n->parsedQuery(); QSet<QString> matchedTypes = parsedQuery->matchedTypes(); bool matchAnyType = matchedTypes.isEmpty(); - if (stateNumber == static_cast<quint32>(-1)) { + if (stateNumber == 0) { QString indexName = JsonDbString::kTypeStr; if (matchAnyType) { matchedTypes.insert(QString()); @@ -894,62 +1076,96 @@ void DBServer::notifyHistoricalChanges(JsonDbNotification *n) } foreach (const QString matchedType, matchedTypes) { JsonDbObjectTable *objectTable = partition->findObjectTable(matchedType); + lastStateNumber = objectTable->stateNumber(); + if (lastStateNumber == stateNumber) + continue; // views dont have a _type index if (partition->findView(matchedType)) indexName = JsonDbString::kUuidStr; lastStateNumber = objectTable->stateNumber(); - JsonDbIndexQuery *indexQuery = JsonDbIndexQuery::indexQuery(partition, objectTable, + QScopedPointer<JsonDbIndexQuery> indexQuery(JsonDbIndexQuery::indexQuery(partition, objectTable, indexName, QString("string"), - n->owner()); + n->owner())); if (!matchAnyType) { - indexQuery->setMin(matchedType); - indexQuery->setMax(matchedType); + indexQuery.data()->setMin(matchedType); + indexQuery.data()->setMax(matchedType); } JsonDbObject oldObject; - for (JsonDbObject o = indexQuery->first(); !o.isEmpty(); o = indexQuery->next()) { + int c = 0; + for (JsonDbObject o = indexQuery.data()->first(); !o.isEmpty(); o = indexQuery.data()->next()) { JsonDbNotification::Action action = JsonDbNotification::Create; - objectUpdated(partition->name(), stateNumber, n, action, oldObject, o); + objectUpdated(partition->name(), lastStateNumber, n, action, oldObject, o); + c++; } } } else { - QJsonObject changesSince = partition->changesSince(stateNumber, matchedTypes); - QJsonObject changes(changesSince.value("result").toObject()); - lastStateNumber = changes.value("currentStateNumber").toDouble(); - QJsonArray changeList(changes.value("changes").toArray()); - quint32 count = changeList.size(); - for (quint32 i = 0; i < count; i++) { - QJsonObject change = changeList.at(i).toObject(); - QJsonObject before = change.value("before").toObject(); - QJsonObject after = change.value("after").toObject(); - - JsonDbNotification::Action action = JsonDbNotification::Update; - if (before.isEmpty()) - action = JsonDbNotification::Create; - else if (after.contains(JsonDbString::kDeletedStr)) - action = JsonDbNotification::Delete; - objectUpdated(partition->name(), stateNumber, n, action, before, after); + foreach (const QString matchedType, matchedTypes) { + JsonDbObjectTable *objectTable = partition->findObjectTable(matchedType); + if (objectTable->stateNumber() == stateNumber) + continue; + QList<JsonDbUpdate> updateList; + lastStateNumber = objectTable->changesSince(stateNumber, matchedTypes, &updateList); + foreach (const JsonDbUpdate &update, updateList) { + QJsonObject before = update.oldObject; + QJsonObject after = update.newObject; + + JsonDbNotification::Action action = JsonDbNotification::Update; + if (before.isEmpty()) + action = JsonDbNotification::Create; + else if (after.contains(JsonDbString::kDeletedStr)) + action = JsonDbNotification::Delete; + objectUpdated(partition->name(), lastStateNumber, n, action, before, after); + } } } QJsonObject stateChange; stateChange.insert("_state", static_cast<int>(lastStateNumber)); - emit notified(n->uuid(), stateNumber, stateChange, "stateChange"); + emit notified(n->uuid(), lastStateNumber, stateChange, "stateChange"); } -void DBServer::updateEagerViewTypes(const QString &objectType, JsonDbPartition *partition, quint32 stateNumber) +/*! + Updates the per-partition information on eager views. + + For each partition, we maintain a graph with weighted edges from + source types to \a viewType. + + Adds \a increment weight to the edge from each of the source types + of the view to the target type. Call with \a increment of 1 when + adding an eager view type, and -1 when removing an eager view type. + + It recursively updates the graph for each source type that is also a + view. + + If \a stateNumber is non-zero, do a full update of each the views so + that they will be ready for eager updates. + */ +void DBServer::updateEagerViewTypes(const QString &viewType, JsonDbPartition *partition, quint32 stateNumber, int increment) { - // FIXME: eager view types should be broken down by partition - JsonDbView *view = partition->findView(objectType); + JsonDbView *view = partition->findView(viewType); if (!view) return; + QString partitionName = partition->name(); + WeightedSourceViewGraph &sourceViewGraph = mEagerViewSourceGraph[partitionName]; + + // An update of the Map/Reduce definition also causes the view to + // need to be updated, so we count it as a view source. + + // this is a bit conservative, since we add both Map and Reduce as + // sources, but it shortens the code + sourceViewGraph[JsonDbString::kMapTypeStr][viewType] += increment; + sourceViewGraph[JsonDbString::kReduceTypeStr][viewType] += increment; foreach (const QString sourceType, view->sourceTypes()) { - mEagerViewSourceTypes[sourceType].insert(objectType); + sourceViewGraph[sourceType][viewType] += increment; + if (jsondbSettings->verbose()) + qDebug() << "SourceView" << sourceType << viewType << sourceViewGraph[sourceType][viewType].count; // now recurse until we get to a non-view sourceType - updateEagerViewTypes(sourceType, partition, stateNumber); + updateEagerViewTypes(sourceType, partition, stateNumber, increment); } - partition->updateView(objectType, stateNumber); + if (stateNumber) + partition->updateView(viewType, stateNumber); } JsonDbPartition *DBServer::findPartition(const QString &partitionName) @@ -965,6 +1181,67 @@ JsonDbPartition *DBServer::findPartition(const QString &partitionName) return partition; } +QList<QJsonObject> DBServer::findPartitionDefinitions() const +{ + QList<QJsonObject> partitions; + + bool defaultSpecified = false; + + foreach (const QString &path, jsondbSettings->configSearchPath()) { + QDir searchPath(path); + if (!searchPath.exists()) + continue; + + if (jsondbSettings->debug()) + qDebug() << QString("Searching %1 for partition definition files").arg(path); + + QStringList files = searchPath.entryList(QStringList() << "partitions*.json", + QDir::CaseSensitive | QDir::Files | QDir::Readable); + foreach (const QString file, files) { + if (jsondbSettings->debug()) + qDebug() << QString("Loading partition definitions from %1").arg(file); + + QFile partitionFile(searchPath.absoluteFilePath(file)); + partitionFile.open(QFile::ReadOnly); + + QJsonArray partitionList = QJsonDocument::fromJson(partitionFile.readAll()).array(); + if (partitionList.isEmpty()) + continue; + + for (int i = 0; i < partitionList.count(); i++) { + QJsonObject def = partitionList[i].toObject(); + if (def.contains(JsonDbString::kNameStr)) { + if (!def.contains(JsonDbString::kPathStr)) + def.insert(JsonDbString::kPathStr, QDir::currentPath()); + if (def.contains(JsonDbString::kDefaultStr)) + defaultSpecified = true; + partitions.append(def); + } + } + } + } + + // if no partitions are specified just make a partition in the current working + // directory and call it "default" + if (partitions.isEmpty()) { + QJsonObject defaultPartition; + defaultPartition.insert(JsonDbString::kNameStr, QLatin1String("default")); + defaultPartition.insert(JsonDbString::kPathStr, QDir::currentPath()); + defaultPartition.insert(JsonDbString::kDefaultStr, true); + partitions.append(defaultPartition); + defaultSpecified = true; + } + + // ensure that at least one partition is marked as default + if (!defaultSpecified) { + QJsonObject defaultPartition = partitions.takeFirst(); + defaultPartition.insert(JsonDbString::kDefaultStr, true); + partitions.append(defaultPartition); + } + + return partitions; +} + void DBServer::receiveMessage(const QJsonObject &message) { JsonStream *stream = qobject_cast<JsonStream *>(sender()); @@ -975,7 +1252,7 @@ void DBServer::receiveMessage(const QJsonObject &message) JsonDbPartition *partition = findPartition(partitionName); if (!(partitionName.isEmpty() || partition || partitionName == mEphemeralPartition->name())) { - sendError(stream, JsonDbError::InvalidRequest, + sendError(stream, JsonDbError::InvalidPartition, QString("Invalid partition '%1'").arg(partitionName), id); return; } @@ -1017,12 +1294,12 @@ void DBServer::receiveMessage(const QJsonObject &message) // TODO: remove at the same time that clientcompat is dropped if (action == JsonDbString::kRemoveStr && object.toObject().contains(JsonDbString::kQueryStr)) { - JsonDbQuery *query = JsonDbQuery::parse(object.toObject().value(JsonDbString::kQueryStr).toString()); + QScopedPointer<JsonDbQuery> parsedQuery(JsonDbQuery::parse(object.toObject().value(JsonDbString::kQueryStr).toString())); JsonDbQueryResult res; if (partition) - res = partition->queryObjects(owner, query); + res = partition->queryObjects(owner, parsedQuery.data()); else - res = mEphemeralPartition->queryObjects(owner, query); + res = mEphemeralPartition->queryObjects(owner, parsedQuery.data()); QJsonArray toRemove; foreach (const QJsonValue &value, res.data) @@ -1134,5 +1411,3 @@ void DBServer::removeConnection() } #include "moc_dbserver.cpp" - -QT_END_NAMESPACE_JSONDB |