summaryrefslogtreecommitdiffstats
path: root/src/daemon/dbserver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/daemon/dbserver.cpp')
-rw-r--r--src/daemon/dbserver.cpp587
1 files changed, 431 insertions, 156 deletions
diff --git a/src/daemon/dbserver.cpp b/src/daemon/dbserver.cpp
index 29b050c0..845100a0 100644
--- a/src/daemon/dbserver.cpp
+++ b/src/daemon/dbserver.cpp
@@ -39,14 +39,12 @@
**
****************************************************************************/
-#include <QtCore>
#include <QtNetwork>
+#include <QDir>
#include <QElapsedTimer>
-#include <json.h>
-
-#include "jsondb-strings.h"
-#include "jsondb-error.h"
+#include "jsondbstrings.h"
+#include "jsondberrors.h"
#include "jsondbephemeralpartition.h"
#include "jsondbindexquery.h"
@@ -61,7 +59,7 @@
#include <errno.h>
#endif
-QT_BEGIN_NAMESPACE_JSONDB
+QT_USE_NAMESPACE_JSONDB_PARTITION
static const int gReadBufferSize = 65536;
@@ -92,45 +90,58 @@ static void sendError( JsonStream *stream, JsonDbError::ErrorCode code,
stream->send(map);
}
-DBServer::DBServer(const QString &filePath, const QString &baseName, QObject *parent)
- : QObject(parent),
- mDefaultPartition(0),
- mEphemeralPartition(0),
- mTcpServerPort(0),
- mServer(0),
- mTcpServer(0),
- mOwner(new JsonDbOwner(this)),
- mFilePath(filePath),
- mBaseName(baseName)
+DBServer::DBServer(const QString &searchPath, QObject *parent) :
+ QObject(parent)
+ , mDefaultPartition(0)
+ , mEphemeralPartition(0)
+ , mTcpServerPort(0)
+ , mServer(0)
+ , mTcpServer(0)
+ , mOwner(new JsonDbOwner(this))
{
// for queued connection handling
qRegisterMetaType<JsonDbPartition*>("JsonDbPartition*");
qRegisterMetaType<QSet<QString> >("QSet<QString>");
+ qRegisterMetaType<JsonDbUpdateList>("JsonDbUpdateList");
- QFileInfo info(filePath);
+ // make the user-specified path (or PWD) the first in the search path, then and
+ // the /etc one the last
+ QStringList searchPaths = jsondbSettings->configSearchPath();
+ if (searchPath.isEmpty())
+ searchPaths.prepend(QDir::currentPath());
+ else if (!searchPaths.contains(searchPath))
+ searchPaths.prepend(searchPath);
- if (QString::compare(info.suffix(), QLatin1String("db"), Qt::CaseInsensitive) == 0) {
- mFilePath = info.absolutePath();
- if (mBaseName.isEmpty())
- mBaseName = info.baseName();
- }
+ if (!searchPaths.contains(QLatin1String("/etc/jsondb")))
+ searchPaths.append(QLatin1String("/etc/jsondb"));
+ jsondbSettings->setConfigSearchPath(searchPaths);
- if (mFilePath.isEmpty())
- mFilePath = QDir::currentPath();
- if (mBaseName.isEmpty())
- mBaseName = QLatin1String("default.System");
- if (!mBaseName.endsWith(QLatin1String(".System")))
- mBaseName += QLatin1String(".System");
+ mOwner->setAllowAll(true);
+}
- QDir(mFilePath).mkpath(QString("."));
+DBServer::~DBServer()
+{
+ close();
+}
- mOwner->setAllowAll(true);
+void DBServer::clearNotifications()
+{
+ QMapIterator<QString,JsonDbNotification*> mi(mNotificationMap);
+ while (mi.hasNext()) {
+ delete mi.value();
+ mi.next();
+ }
+ delete mi.value();
+ mNotificationMap.clear();
+ mNotifications.clear();
+ mKeyedNotifications.clear();
}
void DBServer::sigHUP()
{
if (jsondbSettings->debug())
qDebug() << "SIGHUP received";
+ loadPartitions();
reduceMemoryUsage();
}
@@ -220,7 +231,7 @@ void DBServer::close()
partition->compact();
partition->close();
}
-
+ clearNotifications();
QCoreApplication::exit();
}
@@ -232,54 +243,87 @@ bool DBServer::loadPartitions()
this, SLOT(objectsUpdated(JsonDbUpdateList)));
}
- QHash<QString, JsonDbPartition*> oldPartitions = mPartitions;
- oldPartitions.remove(mBaseName);
-
- if (!mDefaultPartition) {
- mDefaultPartition = new JsonDbPartition(QDir(mFilePath).absoluteFilePath(mBaseName + QLatin1String(".db")),
- mBaseName, mOwner, this);
- connect(mDefaultPartition, SIGNAL(objectsUpdated(JsonDbUpdateList)),
- this, SLOT(objectsUpdated(JsonDbUpdateList)));
- connect(mDefaultPartition, SIGNAL(viewUpdated(QString)),
- this, SLOT(viewUpdated(QString)),
- Qt::QueuedConnection);
+ QHash<QString, JsonDbPartition*> partitions;
+ QList<QJsonObject> definitions = findPartitionDefinitions();
+ QString defaultPartitionName;
- if (!mDefaultPartition->open())
- return false;
+ foreach (const QJsonObject &definition, definitions) {
+ QString name = definition.value(JsonDbString::kNameStr).toString();
- mPartitions[mBaseName] = mDefaultPartition;
- }
+ if (definition.value(JsonDbString::kDefaultStr).toBool() && defaultPartitionName.isEmpty())
+ defaultPartitionName = name;
- JsonDbQueryResult partitions = mDefaultPartition->queryObjects(mOwner, JsonDbQuery::parse(QLatin1String("[?_type=\"Partition\"]")));
+ if (mPartitions.contains(name)) {
+ partitions[name] = mPartitions.take(name);
+ } else {
- foreach (const JsonDbObject &partition, partitions.data) {
- if (partition.contains(JsonDbString::kNameStr)) {
- QString name = partition.value(JsonDbString::kNameStr).toString();
+ if (partitions.contains(name)) {
+ qWarning() << "Duplicate partition name:" << name;
+ continue;
+ }
- if (!mPartitions.contains(name)) {
- QString filename = partition.contains(QLatin1String("file")) ?
- partition.value(QLatin1String("file")).toString() :
- QDir(mFilePath).absoluteFilePath(name + QLatin1String(".db"));
- JsonDbPartition *p = new JsonDbPartition(filename, name, mOwner, this);
- connect(p, SIGNAL(objectsUpdated(JsonDbUpdateList)),
- this, SLOT(objectsUpdated(JsonDbUpdateList)));
- connect(p, SIGNAL(viewUpdated(QString)),
- this, SLOT(viewUpdated(QString)),
- Qt::QueuedConnection);
+ QString path = definition.value(JsonDbString::kPathStr).toString();
+ QDir pathDir(path);
+ pathDir.mkpath(QLatin1String("."));
- if (!p->open())
- return false;
+ JsonDbPartition *partition = new JsonDbPartition(pathDir.absoluteFilePath(name), name, mOwner, this);
+ partitions[name] = partition;
+ connect(partition, SIGNAL(objectsUpdated(JsonDbUpdateList)), this, SLOT(objectsUpdated(JsonDbUpdateList)));
- mPartitions[name] = p;
+ // TODO: for removable partitions, this shouldn't cause a total failure
+ if (!partition->open()) {
+ close();
+ return false;
}
- oldPartitions.remove(name);
+ // create an object in the Ephemeral partition to reflect this partition
+ JsonDbObject partitionRecord(definition);
+ partitionRecord.insert(JsonDbString::kUuidStr, JsonDbObject::createUuidFromString(name).toString());
+ partitionRecord.insert(JsonDbString::kTypeStr, JsonDbString::kPartitionTypeStr);
+ mEphemeralPartition->updateObjects(mOwner, JsonDbObjectList() << partitionRecord, JsonDbPartition::ForcedWrite);
}
}
// close any partitions that were declared previously but are no longer present
- foreach (JsonDbPartition *p, oldPartitions.values())
- p->close();
+ foreach (JsonDbPartition *partition, mPartitions.values()) {
+
+ if (mDefaultPartition == partition)
+ mDefaultPartition = 0;
+
+ QList<JsonDbObject> toRemove;
+
+ // remove the ephemeral object representing this partition
+ JsonDbObject partitionRecord;
+ partitionRecord.insert(JsonDbString::kUuidStr, JsonDbObject::createUuidFromString(partition->name()).toString());
+ partitionRecord.insert(JsonDbString::kTypeStr, JsonDbString::kPartitionTypeStr);
+ partitionRecord.markDeleted();
+ toRemove.append(partitionRecord);
+
+ // remove any notifications for the partition being closed
+ QJsonObject bindings;
+ bindings.insert(QLatin1String("notification"), JsonDbString::kNotificationTypeStr);
+ bindings.insert(QLatin1String("partition"), partition->name());
+
+ QScopedPointer<JsonDbQuery> query(JsonDbQuery::parse(QLatin1String("[?_type=%notification][?partition=%partition]"),
+ bindings));
+ JsonDbQueryResult results = mEphemeralPartition->queryObjects(mOwner, query.data());
+ foreach (const JsonDbObject &result, results.data) {
+ JsonDbObject notification = result;
+ notification.markDeleted();
+ toRemove.append(notification);
+ }
+
+ mEphemeralPartition->updateObjects(mOwner, toRemove, JsonDbPartition::ForcedWrite);
+
+ disconnect(partition, SIGNAL(objectsUpdated(JsonDbUpdateList)), this, SLOT(objectsUpdated(JsonDbUpdateList)));
+ partition->close();
+ delete partition;
+ }
+
+ mPartitions = partitions;
+
+ if (!mDefaultPartition)
+ mDefaultPartition = mPartitions[defaultPartitionName];
return true;
}
@@ -305,7 +349,8 @@ void DBServer::handleConnection()
if (jsondbSettings->debug())
qDebug() << "client connected to jsondb server" << connection;
connect(connection, SIGNAL(disconnected()), this, SLOT(removeConnection()));
- JsonStream *stream = new JsonStream(connection, this);
+ JsonStream *stream = new JsonStream(this);
+ stream->setDevice(connection);
connect(stream, SIGNAL(receive(QJsonObject)), this,
SLOT(receiveMessage(QJsonObject)));
mConnections.insert(connection, stream);
@@ -318,7 +363,8 @@ void DBServer::handleTcpConnection()
if (jsondbSettings->debug())
qDebug() << "remote client connected to jsondb server" << connection;
connect(connection, SIGNAL(disconnected()), this, SLOT(removeConnection()));
- JsonStream *stream = new JsonStream(connection, this);
+ JsonStream *stream = new JsonStream(this);
+ stream->setDevice(connection);
connect(stream, SIGNAL(receive(QJsonObject)), this,
SLOT(receiveMessage(QJsonObject)));
mConnections.insert(connection, stream);
@@ -431,6 +477,9 @@ void DBServer::objectsUpdated(const QList<JsonDbUpdate> &objects)
{
QString partitionName;
JsonDbPartition *partition = 0;
+ QList<JsonDbUpdate> updatesToEagerViews;
+ QSet<QString> eagerViewTypes;
+ bool isViewUpdate = false;
if (sender() == mEphemeralPartition) {
partitionName = mEphemeralPartition->name();
@@ -441,8 +490,14 @@ void DBServer::objectsUpdated(const QList<JsonDbUpdate> &objects)
else
return;
}
+ quint32 partitionStateNumber = 0;
+ if (partition)
+ partition->mainObjectTable()->stateNumber();
+ else if (mDefaultPartition)
+ partitionStateNumber = mDefaultPartition->mainObjectTable()->stateNumber();
- QSet<QString> eagerViewUpdates;
+ if (jsondbSettings->debug())
+ qDebug() << "objectsUpdated" << partitionName << partitionStateNumber;
// FIXME: pretty good place to batch notifications
foreach (const JsonDbUpdate &updated, objects) {
@@ -455,12 +510,13 @@ void DBServer::objectsUpdated(const QList<JsonDbUpdate> &objects)
if (object.type() == JsonDbString::kNotificationTypeStr)
continue;
+ QString oldObjectType = oldObject.value(JsonDbString::kTypeStr).toString();
QString objectType = object.value(JsonDbString::kTypeStr).toString();
quint32 stateNumber;
if (partition) {
JsonDbObjectTable *objectTable = partition->findObjectTable(objectType);
stateNumber = objectTable->stateNumber();
- } else
+ } else if (partitionName != mEphemeralPartition->name())
stateNumber = mDefaultPartition->mainObjectTable()->stateNumber();
QStringList notificationKeys;
@@ -468,9 +524,37 @@ void DBServer::objectsUpdated(const QList<JsonDbUpdate> &objects)
notificationKeys << objectType;
// eagerly update views if this object that was created isn't a view type itself
- if (mEagerViewSourceTypes.contains(objectType) && partition
- && !partition->findView(objectType))
- eagerViewUpdates.insert(objectType);
+ if (partition) {
+ WeightedSourceViewGraph &sourceViewGraph = mEagerViewSourceGraph[partitionName];
+ if (jsondbSettings->verbose()) qDebug() << "objectType" << objectType << sourceViewGraph.contains(oldObjectType) << sourceViewGraph.contains(objectType);
+ if (partition->findView(objectType)) {
+ if (jsondbSettings->verbose()) qDebug() << "isViewUpdate" << objectType;
+ isViewUpdate = true;
+ } else if ((sourceViewGraph.contains(oldObjectType))
+ || (sourceViewGraph.contains(objectType))) {
+ JsonDbUpdateList updateList;
+ if (oldObjectType == objectType) {
+ updateList.append(updated);
+ } else {
+ JsonDbObject tombstone(oldObject);
+ tombstone.insert(QLatin1String("_deleted"), true);
+ updateList.append(JsonDbUpdate(oldObject, tombstone, JsonDbNotification::Delete));
+ updateList.append(JsonDbUpdate(JsonDbObject(), object, JsonDbNotification::Create));
+ }
+ foreach (const JsonDbUpdate &splitUpdate, updateList) {
+ const QString updatedObjectType = splitUpdate.newObject.type();
+ const ViewEdgeWeights &edgeWeights = sourceViewGraph[updatedObjectType];
+ for (ViewEdgeWeights::const_iterator it = edgeWeights.begin(); it != edgeWeights.end(); ++it) {
+ if (jsondbSettings->verbose()) qDebug() << "edge weight" << updatedObjectType << it.key() << it.value().count;
+ if (it.value() > 0) {
+ eagerViewTypes.insert(it.key());
+ if (sourceViewGraph.contains(updatedObjectType))
+ updatesToEagerViews.append(splitUpdate);
+ }
+ }
+ }
+ }
+ }
}
if (object.contains(JsonDbString::kUuidStr))
@@ -491,31 +575,119 @@ void DBServer::objectsUpdated(const QList<JsonDbUpdate> &objects)
}
}
- if (!eagerViewUpdates.isEmpty())
+ if (isViewUpdate)
+ return;
+ if (updatesToEagerViews.isEmpty()) {
+ updateEagerViewStateNumbers(partition ? partition : mDefaultPartition, partitionStateNumber);
+ emitStateChanged(partition);
+ } else {
QMetaObject::invokeMethod(this, "updateEagerViews", Qt::QueuedConnection,
Q_ARG(JsonDbPartition*, partition),
- Q_ARG(QSet<QString>, eagerViewUpdates));
+ Q_ARG(QSet<QString>, eagerViewTypes),
+ Q_ARG(JsonDbUpdateList, updatesToEagerViews));
+ }
}
-void DBServer::viewUpdated(const QString &type)
+// Updates the in-memory state numbers on each view so that we know it
+// has seen all relevant updates from this transaction
+void DBServer::updateEagerViewStateNumbers(JsonDbPartition *partition, quint32 partitionStateNumber)
{
- JsonDbPartition *partition = qobject_cast<JsonDbPartition*>(sender());
if (!partition)
return;
+ if (jsondbSettings->verbose())
+ qDebug() << "updateEagerViewStateNumbers" << (partition ? partition->name() : "no partition") << partitionStateNumber << "{";
+ const QString &partitionName = partition->name();
+ WeightedSourceViewGraph &sourceViewGraph = mEagerViewSourceGraph[partitionName];
+ foreach (const ViewEdgeWeights &edgeWeights, sourceViewGraph) {
+ for (ViewEdgeWeights::const_iterator jt = edgeWeights.begin(); jt != edgeWeights.end(); ++jt) {
+ const QString &viewType = jt.key();
+ if (jt.value() == 0)
+ continue;
+ JsonDbView *view = partition->findView(viewType);
+ if (view)
+ view->updateViewStateNumber(partitionStateNumber);
+ else
+ if (jsondbSettings->debug())
+ qCritical() << "no view for" << viewType << partition->name();
+ }
+ }
+ if (jsondbSettings->verbose())
+ qDebug() << "updateEagerViewStateNumbers" << (partition ? partition->name() : "no partition") << partitionStateNumber << "}";
+}
- if (mEagerViewSourceTypes.contains(type))
- updateEagerViews(partition, QSet<QString>() << type);
+void DBServer::emitStateChanged(JsonDbPartition *partition)
+{
+ if (!partition)
+ return;
+ quint32 lastStateNumber = partition->mainObjectTable()->stateNumber();
+ QJsonObject stateChange;
+ stateChange.insert("_state", static_cast<int>(lastStateNumber));
+ foreach (const JsonDbNotification *n, mNotificationMap) {
+ if (n->lastStateNumber() == lastStateNumber
+ && n->partition() == partition->name())
+ emit notified(n->uuid(), lastStateNumber, stateChange, "stateChange");
+ }
}
-void DBServer::updateEagerViews(JsonDbPartition *partition, const QSet<QString> &viewTypes)
+void DBServer::updateEagerViews(JsonDbPartition *partition, QSet<QString> viewTypes, QList<JsonDbUpdate> changeList)
{
- foreach (const QString &type, viewTypes) {
- const QSet<QString> &targetTypes = mEagerViewSourceTypes[type];
- for (QSet<QString>::const_iterator it = targetTypes.begin(); it != targetTypes.end(); ++it) {
- if (partition)
- partition->updateView(*it);
+ QSet<QString> viewsUpdated;
+
+ if (jsondbSettings->verbose())
+ qDebug() << "updateEagerViews {" << partition->mainObjectTable()->stateNumber() << viewTypes;
+ quint32 partitionStateNumber = partition->mainObjectTable()->stateNumber();
+ const QString &partitionName = partition->name();
+ WeightedSourceViewGraph &sourceViewGraph = mEagerViewSourceGraph[partitionName];
+
+ while (!viewTypes.isEmpty()) {
+ bool madeProgress = false;
+ foreach (const QString &targetType, viewTypes) {
+ JsonDbView *view = partition->findView(targetType);
+ if (!view) {
+ if (jsondbSettings->verbose())
+ qWarning() << "non-view viewType?" << targetType << "eager views to update" << viewTypes;
+ viewTypes.remove(targetType);
+ madeProgress = true;
+ continue;
+ }
+ QSet<QString> typesNeeded(view->sourceTypeSet());
+ typesNeeded.intersect(viewTypes);
+ if (!typesNeeded.isEmpty())
+ continue;
+ viewTypes.remove(targetType);
+ QList<JsonDbUpdate> additionalChanges;
+ view->updateEagerView(changeList, &additionalChanges);
+ if (jsondbSettings->verbose())
+ qDebug() << "updated view" << targetType << additionalChanges.size() << additionalChanges;
+ changeList.append(additionalChanges);
+ viewsUpdated.insert(targetType);
+ // if this triggers other eager types, we need to update that also
+ if (sourceViewGraph.contains(targetType)) {
+ const ViewEdgeWeights &edgeWeights = sourceViewGraph[targetType];
+ for (ViewEdgeWeights::const_iterator it = edgeWeights.begin(); it != edgeWeights.end(); ++it) {
+ if (it.value() == 0)
+ continue;
+ const QString &viewType = it.key();
+ if (viewsUpdated.contains(viewType))
+ qWarning() << "View update cycle detected" << targetType << viewType << viewsUpdated;
+ else
+ viewTypes.insert(viewType);
+ }
+ }
+
+ madeProgress = true;
+ }
+ if (!madeProgress) {
+ qCritical() << "Failed to update any views" << viewTypes;
+ break;
}
}
+
+ updateEagerViewStateNumbers(partition, partitionStateNumber);
+ emitStateChanged(partition);
+
+ if (jsondbSettings->verbose())
+ qDebug() << "updateEagerViews }" << partition->mainObjectTable()->stateNumber() << viewsUpdated;
}
void DBServer::objectUpdated(const QString &partitionName, quint32 stateNumber, JsonDbNotification *n,
@@ -549,10 +721,23 @@ void DBServer::objectUpdated(const QString &partitionName, quint32 stateNumber,
r = object;
}
if (!r.isEmpty()&& (n->actions() & effectiveAction)) {
+ JsonDbPartition *partition = findPartition(partitionName);
+ if (partition && !n->parsedQuery()->orderTerms.isEmpty()) {
+ const QString &indexName = n->parsedQuery()->orderTerms[0].propertyName;
+ QString objectType = r.type();
+ JsonDbObjectTable *objectTable = partition->findObjectTable(objectType);
+ IndexSpec *indexSpec = objectTable->indexSpec(indexName);
+ if (indexSpec) {
+ QList<QJsonValue> indexValues = indexSpec->index->indexValues(r);
+ if (!indexValues.isEmpty())
+ r.insert(JsonDbString::kIndexValueStr, indexValues[0]);
+ }
+ }
QString actionStr = (effectiveAction == JsonDbNotification::Create ? JsonDbString::kCreateStr :
(effectiveAction == JsonDbNotification::Update ? JsonDbString::kUpdateStr :
JsonDbString::kRemoveStr));
notified(n->uuid(), stateNumber, r, actionStr);
+ n->setLastStateNumber(stateNumber);
}
}
}
@@ -587,10 +772,6 @@ void DBServer::processWrite(JsonStream *stream, JsonDbOwner *owner, const JsonDb
removeNotification(object);
if (!object.isDeleted())
createNotification(object, stream);
-
- // handle partitions
- } else if (object.type() == JsonDbString::kPartitionTypeStr) {
- loadPartitions();
}
}
@@ -652,18 +833,11 @@ void DBServer::processRead(JsonStream *stream, JsonDbOwner *owner, const QJsonVa
response.insert(JsonDbString::kResultStr, QJsonValue());
} else {
QJsonObject result;
- if (queryResult.values.size()) {
- result.insert(JsonDbString::kDataStr, queryResult.values);
- result.insert(JsonDbString::kLengthStr, queryResult.values.size());
- } else {
- QJsonArray values;
- for (int i = 0; i < queryResult.data.size(); i++) {
- JsonDbObject d = queryResult.data.at(i);
- values.append(d);
- }
- result.insert(JsonDbString::kDataStr, values);
- result.insert(JsonDbString::kLengthStr, values.size());
- }
+ QJsonArray data;
+ for (int i = 0; i < queryResult.data.size(); i++)
+ data.append(queryResult.data.at(i));
+ result.insert(JsonDbString::kDataStr, data);
+ result.insert(JsonDbString::kLengthStr, data.size());
result.insert(JsonDbString::kOffsetStr, queryResult.offset);
result.insert(JsonDbString::kExplanationStr, queryResult.explanation);
result.insert("sortKeys", queryResult.sortKeys);
@@ -678,6 +852,7 @@ void DBServer::processRead(JsonStream *stream, JsonDbOwner *owner, const QJsonVa
void DBServer::processChangesSince(JsonStream *stream, JsonDbOwner *owner, const QJsonValue &object, const QString &partitionName, int id)
{
+ Q_UNUSED(owner);
QJsonObject result;
if (object.type() == QJsonValue::Object) {
@@ -703,6 +878,7 @@ void DBServer::processChangesSince(JsonStream *stream, JsonDbOwner *owner, const
void DBServer::processFlush(JsonStream *stream, JsonDbOwner *owner, const QString &partitionName, int id)
{
+ Q_UNUSED(owner);
JsonDbPartition *partition = mPartitions.value(partitionName, mDefaultPartition);
QJsonObject result = partition->flush();
result.insert(JsonDbString::kIdStr, id);
@@ -715,14 +891,15 @@ void DBServer::debugQuery(JsonDbQuery *query, int limit, int offset, const JsonD
for (int i = 0; i < orQueryTerms.size(); i++) {
const OrQueryTerm &orQueryTerm = orQueryTerms[i];
foreach (const QueryTerm &queryTerm, orQueryTerm.terms()) {
- if (jsondbSettings->verbose())
+ if (jsondbSettings->verbose()) {
qDebug() << __FILE__ << __LINE__
- << QString(" %1%2%3 %4 %5 ")
- .arg(queryTerm.propertyName())
- .arg(queryTerm.joinField().size() ? "->" : "")
- .arg(queryTerm.joinField())
- .arg(queryTerm.op())
- .arg(JsonWriter().toString(queryTerm.value().toVariant()));
+ << QString(" %1%2%3 %4")
+ .arg(queryTerm.propertyName())
+ .arg(queryTerm.joinField().size() ? "->" : "")
+ .arg(queryTerm.joinField())
+ .arg(queryTerm.op())
+ << queryTerm.value();
+ }
}
}
@@ -797,18 +974,20 @@ void DBServer::createNotification(const JsonDbObject &object, JsonStream *stream
QStringList actions = QVariant(object.value(JsonDbString::kActionsStr).toArray().toVariantList()).toStringList();
QString query = object.value(JsonDbString::kQueryStr).toString();
QJsonObject bindings = object.value("bindings").toObject();
- QString partition = object.value(JsonDbString::kPartitionStr).toString();
-
- bool ok;
- quint32 stateNumber = object.value("initialStateNumber").toVariant().toInt(&ok);
- if (!ok)
- stateNumber = 0;
+ QString partitionName = object.value(JsonDbString::kPartitionStr).toString();
+ quint32 stateNumber = 0;
- if (partition.isEmpty())
- partition = mDefaultPartition->name();
+ if (partitionName.isEmpty())
+ partitionName = mDefaultPartition->name();
+ JsonDbPartition *partition = findPartition(partitionName);
- JsonDbNotification *n = new JsonDbNotification(getOwner(stream), uuid, query, actions, partition);
+ JsonDbNotification *n = new JsonDbNotification(getOwner(stream), uuid, query, actions, partitionName);
+ if (object.contains("initialStateNumber") && object.value("initialStateNumber").isDouble())
+ stateNumber = static_cast<quint32>(object.value("initialStateNumber").toDouble());
+ else if (partition)
+ stateNumber = partition->mainObjectTable()->stateNumber();
n->setInitialStateNumber(stateNumber);
+
JsonDbQuery *parsedQuery = JsonDbQuery::parse(query, bindings);
n->setCompiledQuery(parsedQuery);
const QList<OrQueryTerm> &orQueryTerms = parsedQuery->queryTerms;
@@ -840,11 +1019,10 @@ void DBServer::createNotification(const JsonDbObject &object, JsonStream *stream
mNotifications[uuid] = stream;
foreach (const QString &objectType, parsedQuery->matchedTypes())
- updateEagerViewTypes(objectType, mPartitions.value(partition, mDefaultPartition), stateNumber);
+ updateEagerViewTypes(objectType, mPartitions.value(partitionName, mDefaultPartition), stateNumber, 1);
- if (stateNumber)
+ if (partition)
notifyHistoricalChanges(n);
-
}
void DBServer::removeNotification(const JsonDbObject &object)
@@ -873,6 +1051,10 @@ void DBServer::removeNotification(const JsonDbObject &object)
mKeyedNotifications.remove("__generic_notification__", n);
+ const QString &partitionName = n->partition();
+ foreach (const QString &objectType, parsedQuery->matchedTypes())
+ updateEagerViewTypes(objectType, mPartitions.value(partitionName, mDefaultPartition), 0, -1);
+
delete n;
}
}
@@ -885,7 +1067,7 @@ void DBServer::notifyHistoricalChanges(JsonDbNotification *n)
JsonDbQuery *parsedQuery = n->parsedQuery();
QSet<QString> matchedTypes = parsedQuery->matchedTypes();
bool matchAnyType = matchedTypes.isEmpty();
- if (stateNumber == static_cast<quint32>(-1)) {
+ if (stateNumber == 0) {
QString indexName = JsonDbString::kTypeStr;
if (matchAnyType) {
matchedTypes.insert(QString());
@@ -894,62 +1076,96 @@ void DBServer::notifyHistoricalChanges(JsonDbNotification *n)
}
foreach (const QString matchedType, matchedTypes) {
JsonDbObjectTable *objectTable = partition->findObjectTable(matchedType);
+ lastStateNumber = objectTable->stateNumber();
+ if (lastStateNumber == stateNumber)
+ continue;
// views dont have a _type index
if (partition->findView(matchedType))
indexName = JsonDbString::kUuidStr;
lastStateNumber = objectTable->stateNumber();
- JsonDbIndexQuery *indexQuery = JsonDbIndexQuery::indexQuery(partition, objectTable,
+ QScopedPointer<JsonDbIndexQuery> indexQuery(JsonDbIndexQuery::indexQuery(partition, objectTable,
indexName, QString("string"),
- n->owner());
+ n->owner()));
if (!matchAnyType) {
- indexQuery->setMin(matchedType);
- indexQuery->setMax(matchedType);
+ indexQuery.data()->setMin(matchedType);
+ indexQuery.data()->setMax(matchedType);
}
JsonDbObject oldObject;
- for (JsonDbObject o = indexQuery->first(); !o.isEmpty(); o = indexQuery->next()) {
+ int c = 0;
+ for (JsonDbObject o = indexQuery.data()->first(); !o.isEmpty(); o = indexQuery.data()->next()) {
JsonDbNotification::Action action = JsonDbNotification::Create;
- objectUpdated(partition->name(), stateNumber, n, action, oldObject, o);
+ objectUpdated(partition->name(), lastStateNumber, n, action, oldObject, o);
+ c++;
}
}
} else {
- QJsonObject changesSince = partition->changesSince(stateNumber, matchedTypes);
- QJsonObject changes(changesSince.value("result").toObject());
- lastStateNumber = changes.value("currentStateNumber").toDouble();
- QJsonArray changeList(changes.value("changes").toArray());
- quint32 count = changeList.size();
- for (quint32 i = 0; i < count; i++) {
- QJsonObject change = changeList.at(i).toObject();
- QJsonObject before = change.value("before").toObject();
- QJsonObject after = change.value("after").toObject();
-
- JsonDbNotification::Action action = JsonDbNotification::Update;
- if (before.isEmpty())
- action = JsonDbNotification::Create;
- else if (after.contains(JsonDbString::kDeletedStr))
- action = JsonDbNotification::Delete;
- objectUpdated(partition->name(), stateNumber, n, action, before, after);
+ foreach (const QString matchedType, matchedTypes) {
+ JsonDbObjectTable *objectTable = partition->findObjectTable(matchedType);
+ if (objectTable->stateNumber() == stateNumber)
+ continue;
+ QList<JsonDbUpdate> updateList;
+ lastStateNumber = objectTable->changesSince(stateNumber, matchedTypes, &updateList);
+ foreach (const JsonDbUpdate &update, updateList) {
+ QJsonObject before = update.oldObject;
+ QJsonObject after = update.newObject;
+
+ JsonDbNotification::Action action = JsonDbNotification::Update;
+ if (before.isEmpty())
+ action = JsonDbNotification::Create;
+ else if (after.contains(JsonDbString::kDeletedStr))
+ action = JsonDbNotification::Delete;
+ objectUpdated(partition->name(), lastStateNumber, n, action, before, after);
+ }
}
}
QJsonObject stateChange;
stateChange.insert("_state", static_cast<int>(lastStateNumber));
- emit notified(n->uuid(), stateNumber, stateChange, "stateChange");
+ emit notified(n->uuid(), lastStateNumber, stateChange, "stateChange");
}
-void DBServer::updateEagerViewTypes(const QString &objectType, JsonDbPartition *partition, quint32 stateNumber)
+/*!
+ Updates the per-partition information on eager views.
+
+ For each partition, we maintain a graph with weighted edges from
+ source types to \a viewType.
+
+ Adds \a increment weight to the edge from each of the source types
+ of the view to the target type. Call with \a increment of 1 when
+ adding an eager view type, and -1 when removing an eager view type.
+
+ It recursively updates the graph for each source type that is also a
+ view.
+
+ If \a stateNumber is non-zero, do a full update of each the views so
+ that they will be ready for eager updates.
+ */
+void DBServer::updateEagerViewTypes(const QString &viewType, JsonDbPartition *partition, quint32 stateNumber, int increment)
{
- // FIXME: eager view types should be broken down by partition
- JsonDbView *view = partition->findView(objectType);
+ JsonDbView *view = partition->findView(viewType);
if (!view)
return;
+ QString partitionName = partition->name();
+ WeightedSourceViewGraph &sourceViewGraph = mEagerViewSourceGraph[partitionName];
+
+ // An update of the Map/Reduce definition also causes the view to
+ // need to be updated, so we count it as a view source.
+
+ // this is a bit conservative, since we add both Map and Reduce as
+ // sources, but it shortens the code
+ sourceViewGraph[JsonDbString::kMapTypeStr][viewType] += increment;
+ sourceViewGraph[JsonDbString::kReduceTypeStr][viewType] += increment;
foreach (const QString sourceType, view->sourceTypes()) {
- mEagerViewSourceTypes[sourceType].insert(objectType);
+ sourceViewGraph[sourceType][viewType] += increment;
+ if (jsondbSettings->verbose())
+ qDebug() << "SourceView" << sourceType << viewType << sourceViewGraph[sourceType][viewType].count;
// now recurse until we get to a non-view sourceType
- updateEagerViewTypes(sourceType, partition, stateNumber);
+ updateEagerViewTypes(sourceType, partition, stateNumber, increment);
}
- partition->updateView(objectType, stateNumber);
+ if (stateNumber)
+ partition->updateView(viewType, stateNumber);
}
JsonDbPartition *DBServer::findPartition(const QString &partitionName)
@@ -965,6 +1181,67 @@ JsonDbPartition *DBServer::findPartition(const QString &partitionName)
return partition;
}
+QList<QJsonObject> DBServer::findPartitionDefinitions() const
+{
+ QList<QJsonObject> partitions;
+
+ bool defaultSpecified = false;
+
+ foreach (const QString &path, jsondbSettings->configSearchPath()) {
+ QDir searchPath(path);
+ if (!searchPath.exists())
+ continue;
+
+ if (jsondbSettings->debug())
+ qDebug() << QString("Searching %1 for partition definition files").arg(path);
+
+ QStringList files = searchPath.entryList(QStringList() << "partitions*.json",
+ QDir::CaseSensitive | QDir::Files | QDir::Readable);
+ foreach (const QString file, files) {
+ if (jsondbSettings->debug())
+ qDebug() << QString("Loading partition definitions from %1").arg(file);
+
+ QFile partitionFile(searchPath.absoluteFilePath(file));
+ partitionFile.open(QFile::ReadOnly);
+
+ QJsonArray partitionList = QJsonDocument::fromJson(partitionFile.readAll()).array();
+ if (partitionList.isEmpty())
+ continue;
+
+ for (int i = 0; i < partitionList.count(); i++) {
+ QJsonObject def = partitionList[i].toObject();
+ if (def.contains(JsonDbString::kNameStr)) {
+ if (!def.contains(JsonDbString::kPathStr))
+ def.insert(JsonDbString::kPathStr, QDir::currentPath());
+ if (def.contains(JsonDbString::kDefaultStr))
+ defaultSpecified = true;
+ partitions.append(def);
+ }
+ }
+ }
+ }
+
+ // if no partitions are specified just make a partition in the current working
+ // directory and call it "default"
+ if (partitions.isEmpty()) {
+ QJsonObject defaultPartition;
+ defaultPartition.insert(JsonDbString::kNameStr, QLatin1String("default"));
+ defaultPartition.insert(JsonDbString::kPathStr, QDir::currentPath());
+ defaultPartition.insert(JsonDbString::kDefaultStr, true);
+ partitions.append(defaultPartition);
+ defaultSpecified = true;
+ }
+
+ // ensure that at least one partition is marked as default
+ if (!defaultSpecified) {
+ QJsonObject defaultPartition = partitions.takeFirst();
+ defaultPartition.insert(JsonDbString::kDefaultStr, true);
+ partitions.append(defaultPartition);
+ }
+
+ return partitions;
+}
+
void DBServer::receiveMessage(const QJsonObject &message)
{
JsonStream *stream = qobject_cast<JsonStream *>(sender());
@@ -975,7 +1252,7 @@ void DBServer::receiveMessage(const QJsonObject &message)
JsonDbPartition *partition = findPartition(partitionName);
if (!(partitionName.isEmpty() || partition || partitionName == mEphemeralPartition->name())) {
- sendError(stream, JsonDbError::InvalidRequest,
+ sendError(stream, JsonDbError::InvalidPartition,
QString("Invalid partition '%1'").arg(partitionName), id);
return;
}
@@ -1017,12 +1294,12 @@ void DBServer::receiveMessage(const QJsonObject &message)
// TODO: remove at the same time that clientcompat is dropped
if (action == JsonDbString::kRemoveStr && object.toObject().contains(JsonDbString::kQueryStr)) {
- JsonDbQuery *query = JsonDbQuery::parse(object.toObject().value(JsonDbString::kQueryStr).toString());
+ QScopedPointer<JsonDbQuery> parsedQuery(JsonDbQuery::parse(object.toObject().value(JsonDbString::kQueryStr).toString()));
JsonDbQueryResult res;
if (partition)
- res = partition->queryObjects(owner, query);
+ res = partition->queryObjects(owner, parsedQuery.data());
else
- res = mEphemeralPartition->queryObjects(owner, query);
+ res = mEphemeralPartition->queryObjects(owner, parsedQuery.data());
QJsonArray toRemove;
foreach (const QJsonValue &value, res.data)
@@ -1134,5 +1411,3 @@ void DBServer::removeConnection()
}
#include "moc_dbserver.cpp"
-
-QT_END_NAMESPACE_JSONDB