diff options
author | Ali Akhtarzada <ali.akhtarzada@nokia.com> | 2012-03-21 16:11:20 +0100 |
---|---|---|
committer | Qt by Nokia <qt-info@nokia.com> | 2012-03-22 12:15:21 +0100 |
commit | 826f0ca0dd3ade126549d458533fdc485d6f1765 (patch) | |
tree | 46a902e2b85edca5c1315007ee5b7fb0a608b731 | |
parent | 263a7df1c0fd228fe0455387dcd80ac66f1f859b (diff) |
Hbtree bug fixes
Fixed rebalancing, cursors, caching and marker pages
Change-Id: I2cdc1b8b9666efdc92a33fd9897a5fddfef850d4
Reviewed-by: Denis Dzyubenko <denis.dzyubenko@nokia.com>
-rw-r--r-- | src/hbtree/hbtree.cpp | 413 | ||||
-rw-r--r-- | src/hbtree/hbtree_p.h | 19 | ||||
-rw-r--r-- | src/hbtree/hbtreecursor.cpp | 29 | ||||
-rw-r--r-- | src/hbtree/hbtreecursor.h | 3 | ||||
-rw-r--r-- | tests/auto/hbtree/main.cpp | 266 |
5 files changed, 450 insertions, 280 deletions
diff --git a/src/hbtree/hbtree.cpp b/src/hbtree/hbtree.cpp index 1123dbe..9c02f67 100644 --- a/src/hbtree/hbtree.cpp +++ b/src/hbtree/hbtree.cpp @@ -359,6 +359,7 @@ HBtreePrivate::Page *HBtreePrivate::newDeserializePage(const QByteArray &buffer) deletePage(page); return 0; } + return page; } @@ -429,20 +430,20 @@ bool HBtreePrivate::writeMarker(HBtreePrivate::MarkerPage *page) mp.info.lowerOffset = 0; mp.info.upperOffset = mp.residueHistory.size() * sizeof(quint32); + if (mp.info.number != 1) + mp.residueHistory.clear(); + bool useOverflow = mp.info.upperOffset > capacity(&mp); if (useOverflow) mp.meta.flags |= MarkerPage::DataOnOverflow; QByteArray buffer = qInitializedByteArray(); - memcpy(buffer.data(), &mp.info, sizeof(PageInfo)); - memcpy(buffer.data() + sizeof(PageInfo), &mp.meta, sizeof(MarkerPage::Meta)); - if (mp.info.hasPayload()) { QByteArray extra; char *ptr = buffer.data() + sizeof(PageInfo) + sizeof(MarkerPage::Meta); - if (useOverflow) { + if (useOverflow && mp.info.number == 1) { extra.resize(mp.info.upperOffset); ptr = extra.data(); } @@ -455,7 +456,8 @@ bool HBtreePrivate::writeMarker(HBtreePrivate::MarkerPage *page) if (useOverflow) { Q_ASSERT(dirtyPages_.isEmpty()); NodeHeader node; - node.context.overflowPage = putDataOnOverflow(extra); + // Sync marker 2 does not need to rewrite overflow pages if sync 1 did it. + node.context.overflowPage = mp.info.number == 1 ? putDataOnOverflow(extra) : mp.overflowPage; memcpy(buffer.data() + sizeof(PageInfo) + sizeof(MarkerPage::Meta), &node, sizeof(NodeHeader)); PageMap::const_iterator it = dirtyPages_.constBegin(); while (it != dirtyPages_.constEnd()) { @@ -469,12 +471,20 @@ bool HBtreePrivate::writeMarker(HBtreePrivate::MarkerPage *page) HBTREE_DEBUG("failed to write" << it.value()->info << "for" << mp.info); return false; } + ++it; } dirtyPages_.clear(); mp.overflowPage = node.context.overflowPage; } } + // If we set the size manually, trust it. + if (!mp.meta.size) + size_ = mp.meta.size = lseek(fd_, 0, SEEK_END); + + memcpy(buffer.data(), &mp.info, sizeof(PageInfo)); + memcpy(buffer.data() + sizeof(PageInfo), &mp.meta, sizeof(MarkerPage::Meta)); + if (!writePage(&buffer)) { HBTREE_DEBUG("failed to write" << mp.info); return false; @@ -756,7 +766,6 @@ bool HBtreePrivate::sync() MarkerPage synced1(2); copy(marker_, &synced0); - copy(marker_, &synced1); if (fsync(fd_) != 0) { HBTREE_ERROR("failed to sync data"); @@ -782,7 +791,6 @@ bool HBtreePrivate::sync() lastSyncedId_++; copy(synced0, &synced_); - copy(synced0, &marker_); // Collect residue pages collectiblePages_.unite(marker_.residueHistory); @@ -797,6 +805,8 @@ bool HBtreePrivate::sync() Q_Q(HBtree); q->stats_.numSyncs++; + copy(synced0, &synced1); + if (!writeMarker(&synced1)) { HBTREE_ERROR("failed to write sync marker 0"); return false; @@ -846,24 +856,11 @@ bool HBtreePrivate::commit(HBtreeTransaction *transaction, quint64 tag) PageMap::iterator it = dirtyPages_.begin(); while (it != dirtyPages_.constEnd()) { - switch (it.value()->info.type) { - case PageInfo::Branch: - case PageInfo::Leaf: { -// NodePage *np = static_cast<NodePage *>(it.value()); -// np->meta.historySize -= collectHistory(np); - break; - } - case PageInfo::Overflow: - break; - default: - Q_ASSERT(0); - } - Q_ASSERT(verifyIntegrity(it.value())); QByteArray ba = serializePage(*it.value()); if (!writePage(&ba)) return false; - it.value()->commited = true; + it.value()->dirty = false; if (it.value()->info.type == PageInfo::Overflow) cacheDelete(it.value()->info.number); @@ -879,7 +876,7 @@ bool HBtreePrivate::commit(HBtreeTransaction *transaction, quint64 tag) mp.meta.syncId = lastSyncedId_ + 1; mp.meta.root = transaction->rootPage_; mp.meta.tag = tag; - mp.meta.size = size_; + mp.meta.size = 0; mp.residueHistory = residueHistory_; mp.info.upperOffset = residueHistory_.size() * sizeof(quint32); @@ -1069,6 +1066,11 @@ bool HBtreePrivate::put(HBtreeTransaction *transaction, const QByteArray &keyDat return false; } + if (keyData.size() > (int)spec_.overflowThreshold) { + HBTREE_ERROR("cannot insert keys larger than overflow threshold. Key size:" << keyData.size() << "Threshold:" << spec_.overflowThreshold); + return false; + } + NodeKey nkey(compareFunction_, keyData); NodeValue nval(valueData); @@ -1180,13 +1182,12 @@ HBtreePrivate::Page *HBtreePrivate::newPage(HBtreePrivate::PageInfo::Type type) if (collectiblePages_.size()) { quint32 n = *collectiblePages_.constBegin(); - collectiblePages_.erase(collectiblePages_.begin()); // TODO: remove from here after page is commited. Or use a different queue + collectiblePages_.erase(collectiblePages_.begin()); pageNumber = n; } else { pageNumber = lastPage_++; } - Page *page = cacheRemove(pageNumber); if (page) { @@ -1237,10 +1238,11 @@ HBtreePrivate::Page *HBtreePrivate::newPage(HBtreePrivate::PageInfo::Type type) HBtreePrivate::NodePage *HBtreePrivate::touchNodePage(HBtreePrivate::NodePage *page) { + Q_ASSERT(page); Q_ASSERT(page->info.type == PageInfo::Branch || page->info.type == PageInfo::Leaf); - if (page->commited == false) { - HBTREE_DEBUG(page->info << "not commited yet, no need to touch"); + if (page->dirty) { + HBTREE_DEBUG(page->info << "is dirty, no need to touch"); return page; } @@ -1250,7 +1252,7 @@ HBtreePrivate::NodePage *HBtreePrivate::touchNodePage(HBtreePrivate::NodePage *p if (page->meta.syncId > lastSyncedId_) { HBTREE_DEBUG(page->info << "not synced, reusing"); - page->commited = false; + page->dirty = true; dirtyPages_.insert(page->info.number, page); return page; } @@ -1270,7 +1272,19 @@ HBtreePrivate::NodePage *HBtreePrivate::touchNodePage(HBtreePrivate::NodePage *p val.overflowPage = touched->info.number; } - touched->commited = false; + if (touched->rightPageNumber != PageInfo::INVALID_PAGE) { + NodePage *right = static_cast<NodePage *>(cacheFind(touched->rightPageNumber)); + if (right) + right->leftPageNumber = touched->info.number; + } + + if (touched->leftPageNumber != PageInfo::INVALID_PAGE) { + NodePage *left = static_cast<NodePage *>(cacheFind(touched->leftPageNumber)); + if (left) + left->rightPageNumber = touched->info.number; + } + + touched->dirty = true; addHistoryNode(touched, HistoryNode(page)); @@ -1304,7 +1318,7 @@ quint32 HBtreePrivate::putDataOnOverflow(const QByteArray &value) return overflowPageNumber; } -QByteArray HBtreePrivate::getDataFromNode(const HBtreePrivate::NodeValue &nval) const +QByteArray HBtreePrivate::getDataFromNode(const HBtreePrivate::NodeValue &nval) { if (nval.flags & NodeHeader::Overflow) { QByteArray data; @@ -1315,19 +1329,28 @@ QByteArray HBtreePrivate::getDataFromNode(const HBtreePrivate::NodeValue &nval) } } -bool HBtreePrivate::getOverflowData(quint32 startPage, QByteArray *data) const +bool HBtreePrivate::walkOverflowPages(quint32 startPage, QByteArray *data, QList<quint32> *pages) { - Q_ASSERT(data); - data->clear(); + Q_ASSERT(data || pages); + + if (data) + data->clear(); + if (pages) + pages->clear(); while (startPage != PageInfo::INVALID_PAGE) { - OverflowPage *page = static_cast<OverflowPage *>(const_cast<HBtreePrivate *>(this)->getPage(startPage, false)); + OverflowPage *page = static_cast<OverflowPage *>(getPage(startPage)); if (page) { - data->append(page->data); + if (data) + data->append(page->data); + if (pages) + pages->append(startPage); startPage = page->nextPage; - deletePage(page); } else { - data->clear(); + if (data) + data->clear(); + if (pages) + pages->clear(); return false; } } @@ -1335,23 +1358,14 @@ bool HBtreePrivate::getOverflowData(quint32 startPage, QByteArray *data) const return true; } -bool HBtreePrivate::getOverflowPageNumbers(quint32 startPage, QList<quint32> *pages) const +bool HBtreePrivate::getOverflowData(quint32 startPage, QByteArray *data) { - Q_ASSERT(pages); - pages->clear(); + return walkOverflowPages(startPage, data, 0); +} - while (startPage != PageInfo::INVALID_PAGE) { - OverflowPage *page = static_cast<OverflowPage *>(const_cast<HBtreePrivate *>(this)->getPage(startPage, false)); - if (page) { - pages->append(startPage); - startPage = page->nextPage; - deletePage(page); - } else { - pages->clear(); - return false; - } - } - return true; +bool HBtreePrivate::getOverflowPageNumbers(quint32 startPage, QList<quint32> *pages) +{ + return walkOverflowPages(startPage, 0, pages); } quint16 HBtreePrivate::collectHistory(NodePage *page) @@ -1366,7 +1380,7 @@ quint16 HBtreePrivate::collectHistory(NodePage *page) collectiblePages_.insert(it->pageNumber); Page *cached = cacheFind(it->pageNumber); if (cached) { - Q_ASSERT(cached->commited); + Q_ASSERT(!cached->dirty); cacheDelete(it->pageNumber); } numRemoved++; @@ -1435,7 +1449,7 @@ void HBtreePrivate::cachePrune() if (lru_.size() <= (int)cacheSize_) break; Page *page = *it; - if (page->commited) { + if (!page->dirty) { cache_.remove(page->info.number); Q_ASSERT(page); it = lru_.erase(it); @@ -1486,7 +1500,7 @@ bool HBtreePrivate::searchPage(HBtreeCursor *cursor, HBtreeTransaction *transact NodePage *page = static_cast<NodePage *>(getPage(root)); - if (page && modify && page->commited) { + if (page && modify) { page = touchNodePage(page); transaction->rootPage_ = page->info.number; } @@ -1507,6 +1521,8 @@ bool HBtreePrivate::searchPageRoot(HBtreeCursor *cursor, HBtreePrivate::NodePage Node parentIter; while (child->info.type == PageInfo::Branch) { + Q_ASSERT(child->nodes.size() > 1); + if (searchType == SearchLast) { parentIter = (child->nodes.constEnd() - 1); } else if (searchType == SearchFirst) { @@ -1532,6 +1548,8 @@ bool HBtreePrivate::searchPageRoot(HBtreeCursor *cursor, HBtreePrivate::NodePage parent = child; child = static_cast<NodePage *>(getPage(parentIter.value().overflowPage)); + Q_ASSERT(child); + if (child->info.type == PageInfo::Branch) { child->leftPageNumber = PageInfo::INVALID_PAGE; child->rightPageNumber = PageInfo::INVALID_PAGE; @@ -1540,12 +1558,10 @@ bool HBtreePrivate::searchPageRoot(HBtreeCursor *cursor, HBtreePrivate::NodePage child->parent = parent; child->parentKey = parentIter.key(); - if (modify && child->commited && (child = touchNodePage(child)) == NULL) { + if (modify && (child = touchNodePage(child)) == NULL) { HBTREE_ERROR("failed to touch page" << child->info); return false; } - - Q_ASSERT(verifyIntegrity(parent)); } Q_ASSERT(child->info.type == PageInfo::Leaf); @@ -1554,7 +1570,6 @@ bool HBtreePrivate::searchPageRoot(HBtreeCursor *cursor, HBtreePrivate::NodePage child->rightPageNumber = getRightSibling(rightQ); child->leftPageNumber = getLeftSibling(leftQ); cursor->lastLeaf_ = child->info.number; - cursor->initialized_ = true; HBTREE_DEBUG("set right sibling of" << child->info << "to" << child->rightPageNumber); HBTREE_DEBUG("set left sibling of" << child->info << "to" << child->leftPageNumber); @@ -1620,21 +1635,18 @@ quint32 HBtreePrivate::getLeftSibling(QStack<quint32> leftQ) return pageNumber; } -HBtreePrivate::Page *HBtreePrivate::getPage(quint32 pageNumber, bool insertInCache) +HBtreePrivate::Page *HBtreePrivate::getPage(quint32 pageNumber) { HBTREE_DEBUG("getting page #" << pageNumber); - // Only check cache if we want to insert in cache. Otherwise everything is the caller's responsibility - if (insertInCache) { - Page *page = cacheFind(pageNumber); - if (page) { - Q_Q(HBtree); - q->stats_.hits++; - HBTREE_DEBUG("got" << page->info << "from cache"); - lru_.removeOne(page); - lru_.append(page); - return page; - } + Page *page = cacheFind(pageNumber); + if (page) { + Q_Q(HBtree); + q->stats_.hits++; + HBTREE_DEBUG("got" << page->info << "from cache"); + lru_.removeOne(page); + lru_.append(page); + return page; } QByteArray buffer; @@ -1645,17 +1657,15 @@ HBtreePrivate::Page *HBtreePrivate::getPage(quint32 pageNumber, bool insertInCac return 0; } - Page *page = newDeserializePage(buffer); + page = newDeserializePage(buffer); if (!page) { HBTREE_ERROR("failed to deserialize page" << pageNumber); return 0; } - page->commited = true; - Q_ASSERT(verifyIntegrity(page)); + page->dirty = false; + cacheInsert(pageNumber, page); - if (insertInCache) - cacheInsert(pageNumber, page); return page; } @@ -1747,7 +1757,7 @@ quint16 HBtreePrivate::capacity(const Page *page) const return spec_.pageSize - headerSize(page); } -quint32 HBtreePrivate::pageFullEnough(HBtreePrivate::NodePage *page) const +bool HBtreePrivate::pageFullEnough(HBtreePrivate::NodePage *page) const { double pageFill = 1.0 - (float)spaceLeft(page) / (float)capacity(page); pageFill *= 100.0f; @@ -1767,7 +1777,7 @@ bool HBtreePrivate::insertNode(NodePage *page, const NodeKey &key, const NodeVal HBTREE_DEBUG("inserting" << key << "in" << page->info); Q_ASSERT(page); NodeValue valueCopy; - Q_ASSERT(page->commited == false); + Q_ASSERT(page->dirty); // Careful here. value could be coming in from put (in which case it won't have the overflow flag // set but it will have overflow data in value.data @@ -1814,7 +1824,7 @@ bool HBtreePrivate::insertNode(NodePage *page, const NodeKey &key, const NodeVal bool HBtreePrivate::removeNode(HBtreePrivate::NodePage *page, const HBtreePrivate::NodeKey &key) { - Q_ASSERT(page->commited == false); + Q_ASSERT(page->dirty); HBTREE_DEBUG("removing" << key << "from" << page->info); if (!page->nodes.contains(key)) { @@ -1957,20 +1967,20 @@ bool HBtreePrivate::split(HBtreePrivate::NodePage *page, const NodeKey &key, con while (node != copy.nodes.constEnd()) { if (index++ < splitIndex) { - // There's a corner case where a 3-way split becomes necessary, when the new node - // is too big for the left page. If this is true then key should be <= than node.key - // (since it may have been already inserted) and value should not be an overflow. - if (spaceNeededForNode(node.key().data, node.value().data) > spaceLeft(left)) { - Q_ASSERT(left->info.type != PageInfo::Branch); // This should never happen with a branch page - Q_ASSERT(key <= node.key()); - Q_ASSERT(willCauseOverflow(key.data, value.data) == false); - if (!split(left, node.key(), node.value(), &left)) { - HBTREE_ERROR("3-way split fail"); - return false; - } - ++node; - continue; - } +// // There's a corner case where a 3-way split becomes necessary, when the new node +// // is too big for the left page. If this is true then key should be <= than node.key +// // (since it may have been already inserted) and value should not be an overflow. +// if (spaceNeededForNode(node.key().data, node.value().data) > spaceLeft(left)) { +// Q_ASSERT(left->info.type != PageInfo::Branch); // This should never happen with a branch page +// Q_ASSERT(key <= node.key()); +// Q_ASSERT(willCauseOverflow(key.data, value.data) == false); +// if (!split(left, node.key(), node.value(), &left)) { +// HBTREE_ERROR("3-way split fail"); +// return false; +// } +// ++node; +// continue; +// } Q_ASSERT(spaceNeededForNode(node.key().data, node.value().data) <= spaceLeft(left)); @@ -1994,8 +2004,21 @@ bool HBtreePrivate::split(HBtreePrivate::NodePage *page, const NodeKey &key, con HBTREE_DEBUG("left:" << *left); HBTREE_DEBUG("right:" << *right); - HBTREE_DEBUG("left parent:" << *left->parent); - HBTREE_DEBUG("right parent:" << *right->parent); + HBTREE_DEBUG("parent:" << *left->parent); + + // adjust right/left siblings + Q_ASSERT(left->info.type == right->info.type); + if (left->info.type == PageInfo::Leaf) { + // since we have a new right, the left's right sibling has a new left. + if (left->rightPageNumber != PageInfo::INVALID_PAGE) { + NodePage *outerRight = static_cast<NodePage *>(cacheFind(left->rightPageNumber)); + if (outerRight) + outerRight->leftPageNumber = right->info.number; + } + + left->rightPageNumber = right->info.number; + right->leftPageNumber = left->info.number; + } Q_ASSERT(verifyIntegrity(right)); if (rightOut) @@ -2048,7 +2071,7 @@ bool HBtreePrivate::rebalance(HBtreePrivate::NodePage *page) neighbourBranchNode = pageBranchNode + 1; neighbour = static_cast<NodePage *>(getPage(neighbourBranchNode.value().overflowPage)); sourceNode = neighbour->nodes.constBegin(); - destNode = page->nodes.constEnd(); + destNode = page->nodes.size() ? page->nodes.constEnd() - 1 : page->nodes.constBegin(); } else { // take left neighbour HBTREE_DEBUG("taking left neighbour"); neighbourBranchNode = pageBranchNode - 1; @@ -2134,7 +2157,6 @@ bool HBtreePrivate::moveNode(HBtreePrivate::NodePage *src, HBtreePrivate::NodePa NodeKey nkey = node.key(); insertNode(dst, node.key(), node.value()); - removeNode(src, node.key()); if (dst->parentKey > nkey) { // must change destination parent key @@ -2145,6 +2167,8 @@ bool HBtreePrivate::moveNode(HBtreePrivate::NodePage *src, HBtreePrivate::NodePa insertNode(dst->parent, dst->parentKey, NodeValue(dst->info.number)); } + removeNode(src, node.key()); + if (src->parentKey <= nkey && decending) { Q_ASSERT(!src->parentKey.data.isEmpty()); // must change source parent key @@ -2225,6 +2249,15 @@ bool HBtreePrivate::mergePages(HBtreePrivate::NodePage *page, HBtreePrivate::Nod // with smaller key values. Just delete the key to // greater page form parent removeNode(dst->parent, page->parentKey); + + // right page becomes insignificant, change left/right siblings + if (page->rightPageNumber != PageInfo::INVALID_PAGE) { + NodePage *right = static_cast<NodePage *>(cacheFind(page->rightPageNumber)); + if (right) { + right->leftPageNumber = dst->info.number; + dst->rightPageNumber = right->info.number; + } + } } else { // Merging page with smaller keys in to page // with bigger keys. Change dst parent key. @@ -2235,6 +2268,16 @@ bool HBtreePrivate::mergePages(HBtreePrivate::NodePage *page, HBtreePrivate::Nod nval.overflowPage = dst->info.number; insertNode(dst->parent, nkey, nval); dst->parentKey = page->parentKey; + + + // left page becomes insignificant, change left/right siblings + if (page->leftPageNumber != PageInfo::INVALID_PAGE) { + NodePage *left = static_cast<NodePage *>(cacheFind(page->leftPageNumber)); + if (left) { + left->rightPageNumber = dst->info.number; + dst->leftPageNumber = left->info.number; + } + } } // Overflow pages may have been transfered over @@ -2270,11 +2313,26 @@ bool HBtreePrivate::addHistoryNode(HBtreePrivate::NodePage *src, const HBtreePri void HBtreePrivate::dump() { - qDebug() << "dumping tree from marker" << marker_; + qDebug() << "Dumping tree from marker" << marker_; + if (marker_.meta.root == PageInfo::INVALID_PAGE) { + qDebug() << "This be empty laddy"; + return; + } NodePage *root = static_cast<NodePage *>(getPage(marker_.meta.root)); dumpPage(root, 0); } +void HBtreePrivate::dump(HBtreeTransaction *transaction) +{ + qDebug() << "Dumping tree from transaction @" << transaction; + if (transaction->rootPage_ == PageInfo::INVALID_PAGE) { + qDebug() << "This be empty laddy"; + return; + } + NodePage *root = static_cast<NodePage *>(getPage(transaction->rootPage_)); + dumpPage(root, 0); +} + void HBtreePrivate::dumpPage(HBtreePrivate::NodePage *page, int depth) { QByteArray tabs(depth, '\t'); @@ -2307,13 +2365,12 @@ bool HBtreePrivate::cursorLast(HBtreeCursor *cursor, QByteArray *keyOut, QByteAr { NodePage *page = 0; searchPage(cursor, cursor->transaction_, NodeKey(), SearchLast, false, &page); - if (page && page->nodes.size()) { + if (page) { Node node = page->nodes.constEnd() - 1; if (keyOut) *keyOut = node.key().data; if (valueOut) *valueOut = getDataFromNode(node.value()); - cursor->eof_ = true; return true; } return false; @@ -2324,13 +2381,12 @@ bool HBtreePrivate::cursorFirst(HBtreeCursor *cursor, QByteArray *keyOut, QByteA { NodePage *page = 0; searchPage(cursor, cursor->transaction_, NodeKey(), SearchFirst, false, &page); - if (page && page->nodes.size()) { + if (page) { Node node = page->nodes.constBegin(); if (keyOut) *keyOut = node.key().data; if (valueOut) *valueOut = getDataFromNode(node.value()); - cursor->eof_ = false; return true; } return false; @@ -2338,19 +2394,24 @@ bool HBtreePrivate::cursorFirst(HBtreeCursor *cursor, QByteArray *keyOut, QByteA bool HBtreePrivate::cursorNext(HBtreeCursor *cursor, QByteArray *keyOut, QByteArray *valueOut) { - Q_ASSERT(cursor->initialized_); - if (cursor->eof_) - return false; + if (!cursor->valid_) + return cursorFirst(cursor, keyOut, valueOut); + NodeKey nkey(compareFunction_, cursor->key_); + NodePage *page = 0; - searchPage(cursor, cursor->transaction_, nkey, SearchKey, false, &page); - Node node = page->nodes.find(nkey); + if (!searchPage(cursor, cursor->transaction_, nkey, SearchKey, false, &page)) + return false; + + Node node = page->nodes.lowerBound(nkey); + + bool checkRight = node == page->nodes.constEnd() || node.key() == nkey; // Could've been deleted so check if node == end. - if (node == page->nodes.constEnd() || ++node == page->nodes.constEnd()) { + if (checkRight && (node == page->nodes.constEnd() || ++node == page->nodes.constEnd())) { if (page->rightPageNumber != PageInfo::INVALID_PAGE) { NodePage *right = static_cast<NodePage *>(getPage(page->rightPageNumber)); node = right->nodes.constBegin(); - if (node != right->nodes.constEnd()) { // can a right page be empty and point to another right? + if (node != right->nodes.constEnd()) { if (keyOut) *keyOut = node.key().data; if (valueOut) @@ -2368,16 +2429,21 @@ bool HBtreePrivate::cursorNext(HBtreeCursor *cursor, QByteArray *keyOut, QByteAr *valueOut = getDataFromNode(node.value()); return true; } - cursor->eof_ = true; + return false; } bool HBtreePrivate::cursorPrev(HBtreeCursor *cursor, QByteArray *keyOut, QByteArray *valueOut) { - Q_ASSERT(cursor->initialized_); + if (!cursor->valid_) + return cursorLast(cursor, keyOut, valueOut); + NodeKey nkey(compareFunction_, cursor->key_); + NodePage *page = 0; - searchPage(cursor, cursor->transaction_, nkey, SearchKey, false, &page); + if (!searchPage(cursor, cursor->transaction_, nkey, SearchKey, false, &page)) + return false; + Node node = page->nodes.find(nkey); if (node == page->nodes.constBegin()) { if (page->leftPageNumber != PageInfo::INVALID_PAGE) { @@ -2386,7 +2452,7 @@ bool HBtreePrivate::cursorPrev(HBtreeCursor *cursor, QByteArray *keyOut, QByteAr node = left->nodes.constEnd() - 1; else node = left->nodes.constBegin(); - if (node != left->nodes.constEnd()) { // can a left page be empty and point to another left?? + if (node != left->nodes.constEnd()) { if (keyOut) *keyOut = node.key().data; if (valueOut) @@ -2405,21 +2471,22 @@ bool HBtreePrivate::cursorPrev(HBtreeCursor *cursor, QByteArray *keyOut, QByteAr *valueOut = getDataFromNode(node.value()); return true; } - cursor->eof_ = true; + return false; } bool HBtreePrivate::cursorSet(HBtreeCursor *cursor, QByteArray *keyOut, QByteArray *valueOut, const QByteArray &matchKey, bool exact) { HBTREE_DEBUG("searching for" << (exact? "exactly" : "") << matchKey); + QByteArray keyData, valueData; NodeKey nkey(compareFunction_, matchKey); + NodePage *page = 0; if (!searchPage(cursor, cursor->transaction_, nkey, SearchKey, false, &page)) return false; bool ok = false; - if (page->nodes.contains(nkey)) { keyData = nkey.data; valueData = getDataFromNode(page->nodes.value(nkey)); @@ -2431,7 +2498,7 @@ bool HBtreePrivate::cursorSet(HBtreeCursor *cursor, QByteArray *keyOut, QByteArr valueData = getDataFromNode(node.value()); ok = true; } else { // check sibling - HBTREE_DEBUG("Checking sibling of" << page->meta); + HBTREE_DEBUG("Checking sibling of" << page->info); if (page->rightPageNumber != PageInfo::INVALID_PAGE) { NodePage *right = static_cast<NodePage *>(getPage(page->rightPageNumber)); node = right->nodes.lowerBound(nkey); @@ -2444,13 +2511,53 @@ bool HBtreePrivate::cursorSet(HBtreeCursor *cursor, QByteArray *keyOut, QByteArr } } - if (ok) { - if (keyOut) - *keyOut = keyData; - if (valueOut) - *valueOut = valueData; + if (keyOut) + *keyOut = keyData; + if (valueOut) + *valueOut = valueData; + + return ok; +} + +bool HBtreePrivate::doCursorOp(HBtreeCursor *cursor, HBtreeCursor::Op op, const QByteArray &key) +{ + bool ok = false; + QByteArray keyOut, valueOut; + switch (op) { + case HBtreeCursor::ExactMatch: + ok = cursorSet(cursor, &keyOut, &valueOut, key, true); + break; + case HBtreeCursor::FuzzyMatch: + ok = cursorSet(cursor, &keyOut, &valueOut, key, false); + break; + case HBtreeCursor::Next: + ok = cursorNext(cursor, &keyOut, &valueOut); + break; + case HBtreeCursor::Previous: + ok = cursorPrev(cursor, &keyOut, &valueOut); + break; + case HBtreeCursor::First: + ok = cursorFirst(cursor, &keyOut, &valueOut); + break; + case HBtreeCursor::Last: + ok = cursorLast(cursor, &keyOut, &valueOut); + break; + default: + Q_ASSERT(!"Not a valid cursor op"); + ok = false; } + cursor->valid_ = ok; + + if (!ok) { + cursor->key_ = QByteArray(); + cursor->value_ = QByteArray(); + } else { + cursor->key_ = keyOut; + cursor->value_ = valueOut; + } + + cachePrune(); return ok; } @@ -2491,10 +2598,13 @@ bool HBtreePrivate::verifyIntegrity(const HBtreePrivate::Page *page) const HBTREE_DEBUG("verifying" << page->info); CHECK_TRUE(page->info.isTypeValid()); CHECK_TRUE(page->info.isValid()); - CHECK_TRUE(capacity(page) >= (page->info.upperOffset + page->info.lowerOffset)); - CHECK_TRUE(spaceLeft(page) <= capacity(page)); - CHECK_TRUE(spaceUsed(page) <= capacity(page)); - CHECK_TRUE((spaceUsed(page) + spaceLeft(page)) == capacity(page)); + if (page->info.type != PageInfo::Marker || marker_.meta.syncId == lastSyncedId_) { + // These checks are only valid for a marker on sync. + CHECK_TRUE(capacity(page) >= (page->info.upperOffset + page->info.lowerOffset)); + CHECK_TRUE(spaceLeft(page) <= capacity(page)); + CHECK_TRUE(spaceUsed(page) <= capacity(page)); + CHECK_TRUE((spaceUsed(page) + spaceLeft(page)) == capacity(page)); + } if (page->info.type == PageInfo::Marker) { const MarkerPage *mp = static_cast<const MarkerPage *>(page); CHECK_TRUE(mp->info.number == marker_.info.number); // only verify current marker @@ -2512,10 +2622,10 @@ bool HBtreePrivate::verifyIntegrity(const HBtreePrivate::Page *page) const const NodePage *np = static_cast<const NodePage *>(page); CHECK_TRUE(np->history.size() == np->meta.historySize); CHECK_TRUE((np->info.lowerOffset / 2) == np->nodes.size()); - if (np->commited) { - CHECK_TRUE(np->meta.syncId <= marker_.meta.syncId); - } else { + if (np->dirty) { CHECK_TRUE(np->meta.syncId == (lastSyncedId_ + 1)); + } else { + CHECK_TRUE(np->meta.syncId <= marker_.meta.syncId); } Node it = np->nodes.constBegin(); @@ -2717,54 +2827,7 @@ bool HBtree::del(HBtreeTransaction *transaction, const QByteArray &key) bool HBtree::doCursorOp(HBtreeCursor *cursor, HBtreeCursor::Op op, const QByteArray &key) { Q_D(HBtree); - bool ok = false; - QByteArray keyOut, valueOut; - switch (op) { - case HBtreeCursor::ExactMatch: - case HBtreeCursor::FuzzyMatch: - cursor->initialized_ = false; - if (op == HBtreeCursor::ExactMatch) - ok = d->cursorSet(cursor, &keyOut, &valueOut, key, true); - else - ok = d->cursorSet(cursor, &keyOut, &valueOut, key, false); - break; - case HBtreeCursor::Next: - if (!cursor->initialized_) - ok = d->cursorFirst(cursor, &keyOut, &valueOut); - else - ok = d->cursorNext(cursor, &keyOut, &valueOut); - break; - case HBtreeCursor::Previous: - if (!cursor->initialized_) - ok = d->cursorLast(cursor, &keyOut, &valueOut); - else - ok = d->cursorPrev(cursor, &keyOut, &valueOut); - break; - case HBtreeCursor::First: - cursor->initialized_ = false; - ok = d->cursorFirst(cursor, &keyOut, &valueOut); - break; - case HBtreeCursor::Last: - cursor->initialized_ = false; - ok = d->cursorLast(cursor, &keyOut, &valueOut); - break; - default: - HBTREE_ERROR("cursor op unknown"); - ok = false; - } - - cursor->valid_ = ok; - - if (!ok) { - cursor->key_ = QByteArray(); - cursor->value_ = QByteArray(); - cursor->initialized_ = false; - } else { - cursor->key_ = keyOut; - cursor->value_ = valueOut; - } - - return ok; + return d->doCursorOp(cursor, op, key); } // ###################################################################### diff --git a/src/hbtree/hbtree_p.h b/src/hbtree/hbtree_p.h index f637c4c..e84bf4e 100644 --- a/src/hbtree/hbtree_p.h +++ b/src/hbtree/hbtree_p.h @@ -171,13 +171,13 @@ public: // form just the page header. struct Page { PageInfo info; - bool commited; // runtime information on whether a page has been commited or not ( + bool dirty; protected: Page(PageInfo::Type type) - : info(type), commited(false) + : info(type), dirty(true) {} Page(PageInfo::Type type, quint32 pageNumber) - : info(type, pageNumber), commited(false) + : info(type, pageNumber), dirty(true) {} }; @@ -317,14 +317,15 @@ public: bool rollback(); Page *newPage(PageInfo::Type type); - Page *getPage(quint32 pageNumber, bool insertInCache = true); + Page *getPage(quint32 pageNumber); void deletePage(Page *page) const; void destructPage(Page *page) const; NodePage *touchNodePage(NodePage *page); quint32 putDataOnOverflow(const QByteArray &value); - QByteArray getDataFromNode(const NodeValue &nval) const; - bool getOverflowData(quint32 startPage, QByteArray *data) const; - bool getOverflowPageNumbers(quint32 startPage, QList<quint32> *pages) const; + QByteArray getDataFromNode(const NodeValue &nval); + bool walkOverflowPages(quint32 startPage, QByteArray *data, QList<quint32> *pages); + bool getOverflowData(quint32 startPage, QByteArray *data); + bool getOverflowPageNumbers(quint32 startPage, QList<quint32> *pages); quint16 collectHistory(NodePage *page); Page *cacheFind(quint32 pgno) const; Page *cacheRemove(quint32 pgno); @@ -351,7 +352,7 @@ public: quint16 spaceLeft(const Page *page) const; quint16 spaceUsed(const Page *page) const; quint16 capacity(const Page *page) const; - quint32 pageFullEnough(NodePage *page) const; + bool pageFullEnough(NodePage *page) const; bool hasSpaceFor(NodePage *page, const NodeKey &key, const NodeValue &value) const; bool insertNode(NodePage *page, const NodeKey &key, const NodeValue &value); @@ -363,6 +364,7 @@ public: bool addHistoryNode(NodePage *src, const HistoryNode &hn); void dump(); + void dump(HBtreeTransaction *transaction); void dumpPage(NodePage *page, int depth); bool cursorLast(HBtreeCursor *cursor, QByteArray *keyOut, QByteArray *valueOut); @@ -370,6 +372,7 @@ public: bool cursorNext(HBtreeCursor *cursor, QByteArray *keyOut, QByteArray *valueOut); bool cursorPrev(HBtreeCursor *cursor, QByteArray *keyOut, QByteArray *valueOut); bool cursorSet(HBtreeCursor *cursor, QByteArray *keyOut, QByteArray *valueOut, const QByteArray &matchKey, bool exact); + bool doCursorOp(HBtreeCursor *cursor, HBtreeCursor::Op op, const QByteArray &key = QByteArray()); void copy(const Page &src, Page *dst); diff --git a/src/hbtree/hbtreecursor.cpp b/src/hbtree/hbtreecursor.cpp index 2066b3d..c76679f 100644 --- a/src/hbtree/hbtreecursor.cpp +++ b/src/hbtree/hbtreecursor.cpp @@ -45,18 +45,18 @@ HBtreeCursor::HBtreeCursor() - : transaction_(0), btree_(0), initialized_(false), eof_(false), valid_(false) + : transaction_(0), btree_(0) { } HBtreeCursor::HBtreeCursor(HBtreeTransaction *transaction) - : transaction_(transaction), initialized_(false), eof_(false), valid_(false) + : transaction_(transaction), lastLeaf_(0), valid_(false) { btree_ = transaction->btree_; } HBtreeCursor::HBtreeCursor(HBtree *btree, bool commited) - : transaction_(0), btree_(btree), initialized_(false), eof_(false), valid_(false) + : transaction_(0), btree_(btree), lastLeaf_(0), valid_(false) { if (!commited) transaction_ = btree_->writeTransaction(); @@ -94,35 +94,36 @@ bool HBtreeCursor::current(QByteArray *pKey, QByteArray *pValue) bool HBtreeCursor::last() { - return doOp(Last); + Q_ASSERT(btree_); + return btree_->doCursorOp(this, Last); } bool HBtreeCursor::first() { - return doOp(First); + Q_ASSERT(btree_); + return btree_->doCursorOp(this, First); } bool HBtreeCursor::next() { - return doOp(Next); + Q_ASSERT(btree_); + return btree_->doCursorOp(this, Next); } bool HBtreeCursor::previous() { - return doOp(Previous); + Q_ASSERT(btree_); + return btree_->doCursorOp(this, Previous); } bool HBtreeCursor::seek(const QByteArray &key) { - return doOp(ExactMatch, key); + Q_ASSERT(btree_); + return btree_->doCursorOp(this, ExactMatch, key); } bool HBtreeCursor::seekRange(const QByteArray &key) { - return doOp(FuzzyMatch, key); -} - -bool HBtreeCursor::doOp(HBtreeCursor::Op op, const QByteArray &key) -{ - return btree_->doCursorOp(this, op, key); + Q_ASSERT(btree_); + return btree_->doCursorOp(this, FuzzyMatch, key); } diff --git a/src/hbtree/hbtreecursor.h b/src/hbtree/hbtreecursor.h index 72beb25..d797f61 100644 --- a/src/hbtree/hbtreecursor.h +++ b/src/hbtree/hbtreecursor.h @@ -86,12 +86,9 @@ private: QByteArray value_; HBtreeTransaction *transaction_; HBtree *btree_; - bool initialized_; - bool eof_; quint32 lastLeaf_; bool valid_; - bool doOp(Op op, const QByteArray &key = QByteArray()); }; diff --git a/tests/auto/hbtree/main.cpp b/tests/auto/hbtree/main.cpp index 7037b1b..9aab15c 100644 --- a/tests/auto/hbtree/main.cpp +++ b/tests/auto/hbtree/main.cpp @@ -97,7 +97,6 @@ private slots: void multipleRollbacks(); void createWithCmp(); void variableSizeKeysAndData(); - void transactionTag(); void compareSequenceOfVarLengthKeys(); void asciiAsSortedNumbers(); void deleteReinsertVerify_data(); @@ -113,6 +112,9 @@ private slots: void corruptSyncMarker1(); void corruptBothSyncMarkers_data(); void corruptBothSyncMarkers(); + void cursorWhileDelete_data(); + void cursorWhileDelete(); + void getDataFromLastSync(); private: void corruptSinglePage(int psize, int pgno = -1, qint32 type = -1); @@ -187,6 +189,65 @@ void TestHBtree::cleanup() QFile::remove(dbname); } +void TestHBtree::orderedList() +{ + OrderedList<HBtreePrivate::NodeKey, HBtreePrivate::NodeValue> list; + + typedef HBtreePrivate::NodeKey Key; + typedef HBtreePrivate::NodeValue Value; + + Key key; + + key = Key(0, QByteArray("B")); + list.insert(key, Value("_B_")); + + key = Key(0, QByteArray("A")); + list.insert(key, Value("_A_")); + + key = Key(0, QByteArray("C")); + list.insert(key, Value("_C_")); + + QCOMPARE(list.size(), 3); + QVERIFY((list.constBegin() + 0).key().data == QByteArray("A")); + QVERIFY((list.constBegin() + 1).key().data == QByteArray("B")); + QVERIFY((list.constBegin() + 2).key().data == QByteArray("C")); + + QVERIFY(list.contains(Key(0, "A"))); + QVERIFY(!list.contains(Key(0, "AA"))); + QVERIFY(!list.contains(Key(0, "D"))); + + QVERIFY(list.lowerBound(Key(0, "A")) == list.constBegin()); + QVERIFY(list.lowerBound(Key(0, "AA")) == list.constBegin()+1); + QVERIFY(list.lowerBound(Key(0, "B")) == list.constBegin()+1); + QVERIFY(list.lowerBound(Key(0, "D")) == list.constEnd()); + + QVERIFY(list.upperBound(Key(0, "A")) == list.constBegin()+1); + QVERIFY(list.upperBound(Key(0, "AA")) == list.constBegin()+1); + QVERIFY(list.upperBound(Key(0, "B")) == list.constBegin()+2); + QVERIFY(list.upperBound(Key(0, "D")) == list.constEnd()); + + QCOMPARE(list.size(), 3); + QVERIFY(list[Key(0, "C")].data == QByteArray("_C_")); + QCOMPARE(list.size(), 3); + list[Key(0, "C")].data = QByteArray("_C2_"); + QVERIFY(list[Key(0, "C")].data == QByteArray("_C2_")); + QCOMPARE(list.size(), 3); + QVERIFY(list[Key(0, "AA")].data.isEmpty()); + QVERIFY(list.contains(Key(0, "AA"))); + QCOMPARE(list.size(), 4); + + QVERIFY(list.find(Key(0, "D")) == list.constEnd()); + + QCOMPARE(list.size(), 4); + list.insert(Key(0, "B"), Value("_B2_")); + QCOMPARE(list.size(), 4); + + QCOMPARE(list.value(Key(0, "A")).data, QByteArray("_A_")); + QCOMPARE(list.value(Key(0, "B")).data, QByteArray("_B2_")); + QCOMPARE(list.value(Key(0, "C")).data, QByteArray("_C2_")); + QCOMPARE(list.value(Key(0, "AA")).data, QByteArray("")); +} + void TestHBtree::openClose() { // init/cleanup does open and close; @@ -960,9 +1021,9 @@ void TestHBtree::prev2() void TestHBtree::multiBranchSplits() { - const int numItems = 40; // must cause multiple branch splits - const int valueSize = 1; - const int keySize = 1500; + const int numItems = 1000; // must cause multiple branch splits + const int valueSize = 1000; + const int keySize = 1000; QMap<QByteArray, QByteArray> keyValues; for (int i = 0; i < numItems; ++i) { @@ -1185,39 +1246,6 @@ void TestHBtree::variableSizeKeysAndData() printOutCollectibles_ = false; } -void TestHBtree::transactionTag() -{ - QSKIP("beginTransaction doesn't check for updated marker page. This won't work yet."); - HBtreeTransaction *txn = db->beginTransaction(HBtreeTransaction::ReadWrite); - QVERIFY(txn); - QVERIFY(txn->put(QByteArray("foo"), QByteArray("bar"))); - QVERIFY(txn->put(QByteArray("bla"), QByteArray("bla"))); - QVERIFY(txn->commit(1)); - QCOMPARE(db->tag(), int(1)); - - HBtree rdb; - rdb.setFileName(dbname); - rdb.setOpenMode(HBtree::ReadOnly); - QVERIFY(rdb.open()); - QCOMPARE(rdb.tag(), (int)1); - HBtreeTransaction *rdbtxn = rdb.beginTransaction(HBtreeTransaction::ReadOnly); - QCOMPARE(rdb.tag(), (int)1); - QCOMPARE(rdbtxn->tag(), (quint64)1); - - txn = db->beginTransaction(HBtreeTransaction::ReadWrite); - QVERIFY(txn); - QVERIFY(txn->put(QByteArray("foo"), QByteArray("bar"))); - QVERIFY(txn->commit(2)); - QCOMPARE(db->tag(), (int)2); - - QCOMPARE(rdb.tag(), (int)1); - rdbtxn->abort(); - - rdbtxn = rdb.beginTransaction(HBtreeTransaction::ReadOnly); - QCOMPARE(rdbtxn->tag(), (quint64)2); - rdbtxn->abort(); -} - int findLongestSequenceOf(const char *a, size_t size, char x) { int result = 0; @@ -1788,63 +1816,141 @@ void TestHBtree::corruptBothSyncMarkers() } } -void TestHBtree::orderedList() +void TestHBtree::cursorWhileDelete_data() { - OrderedList<HBtreePrivate::NodeKey, HBtreePrivate::NodeValue> list; + const int lessItems = 100; + const int smallKeys = 100; + const int smallData = 100; + const int moreItems = 1000; + const int bigKeys = 1000; + const int bigData = 1000; + const int overflowData = 2000; + const int multiOverflowData = 5000; + + QTest::addColumn<int>("numItems"); + QTest::addColumn<int>("keySize"); + QTest::addColumn<int>("valueSize"); + + QTest::newRow("Less items, small keys, small data") << lessItems << smallKeys << smallData; + QTest::newRow("more items, small keys, small data") << moreItems << smallKeys << smallData; + QTest::newRow("Less items, big keys, small data") << lessItems << bigKeys << smallData; + QTest::newRow("more items, big keys, small data") << moreItems << bigKeys << smallData; + QTest::newRow("Less items, small keys, big data") << lessItems << smallKeys << bigData; + QTest::newRow("more items, small keys, big data") << moreItems << smallKeys << bigData; + QTest::newRow("Less items, big keys, big data") << lessItems << bigKeys << bigData; + QTest::newRow("more items, big keys, big data") << moreItems << bigKeys << bigData; + + QTest::newRow("Less items, small keys, overflow data") << lessItems << smallKeys << overflowData; + QTest::newRow("more items, small keys, overflow data") << moreItems << smallKeys << overflowData; + QTest::newRow("Less items, big keys, overflow data") << lessItems << bigKeys << overflowData; + QTest::newRow("more items, big keys, overflow data") << moreItems << bigKeys << overflowData; + + QTest::newRow("Less items, small keys, multi-overflow data") << lessItems << smallKeys << multiOverflowData; + QTest::newRow("more items, small keys, multi-overflow data") << moreItems << smallKeys << multiOverflowData; + QTest::newRow("Less items, big keys, multi-overflow data") << lessItems << bigKeys << multiOverflowData; + QTest::newRow("more items, big keys, multi-overflow data") << moreItems << bigKeys << multiOverflowData; +} - typedef HBtreePrivate::NodeKey Key; - typedef HBtreePrivate::NodeValue Value; +void TestHBtree::cursorWhileDelete() +{ + QFETCH(int, numItems); + QFETCH(int, keySize); + QFETCH(int, valueSize); - Key key; + // Insert data + QMap<QByteArray, QByteArray> keyValues; + for (int i = 0; i < numItems; ++i) { + QByteArray key = QString::number(i).toAscii(); + if (key.size() < keySize) + key += QByteArray(keySize - key.size(), '-'); + QByteArray value(valueSize, 'a' + (i % ('z' - 'a'))); + HBtreeTransaction *transaction = db->beginTransaction(HBtreeTransaction::ReadWrite); + QVERIFY(transaction); + QVERIFY(transaction->put(key, value)); + keyValues.insert(key, value); + QVERIFY(transaction->commit(i)); + } - key = Key(0, QByteArray("B")); - list.insert(key, Value("_B_")); + // Delete all + HBtreeTransaction *txn = db->beginTransaction(HBtreeTransaction::ReadWrite); + QVERIFY(txn); + HBtreeCursor cursor1(txn); + while (cursor1.next()) { + QVERIFY(txn->del(cursor1.key())); + } + txn->commit(0); - key = Key(0, QByteArray("A")); - list.insert(key, Value("_A_")); + // Verify + txn = db->beginTransaction(HBtreeTransaction::ReadOnly); + QVERIFY(txn); + QMap<QByteArray, QByteArray>::iterator it = keyValues.begin(); + while (it != keyValues.end()) { + QCOMPARE(txn->get(it.key()), QByteArray()); + ++it; + } + txn->abort(); - key = Key(0, QByteArray("C")); - list.insert(key, Value("_C_")); + // Reinsert + txn = db->beginTransaction(HBtreeTransaction::ReadWrite); + QVERIFY(txn); + it = keyValues.begin(); + while (it != keyValues.end()) { + QVERIFY(txn->put(it.key(), it.value())); + ++it; + } + txn->commit(0); - QCOMPARE(list.size(), 3); - QVERIFY((list.constBegin() + 0).key().data == QByteArray("A")); - QVERIFY((list.constBegin() + 1).key().data == QByteArray("B")); - QVERIFY((list.constBegin() + 2).key().data == QByteArray("C")); + // Verify + txn = db->beginTransaction(HBtreeTransaction::ReadOnly); + QVERIFY(txn); + HBtreeCursor cursor2(txn); + it = keyValues.begin(); + while (cursor2.next()) { + QCOMPARE(it.key(), cursor2.key()); + QCOMPARE(it.value(), cursor2.value()); + ++it; + } + txn->abort(); +} - QVERIFY(list.contains(Key(0, "A"))); - QVERIFY(!list.contains(Key(0, "AA"))); - QVERIFY(!list.contains(Key(0, "D"))); +void TestHBtree::getDataFromLastSync() +{ + db->setAutoSyncRate(0); - QVERIFY(list.lowerBound(Key(0, "A")) == list.constBegin()); - QVERIFY(list.lowerBound(Key(0, "AA")) == list.constBegin()+1); - QVERIFY(list.lowerBound(Key(0, "B")) == list.constBegin()+1); - QVERIFY(list.lowerBound(Key(0, "D")) == list.constEnd()); + int numCommits = 1000; + for (int i = 0; i < numCommits / 2; ++i) { + HBtreeTransaction *txn = db->beginTransaction(HBtreeTransaction::ReadWrite); + QVERIFY(txn); + QVERIFY(txn->put(QByteArray::number(i), QByteArray::number(i))); + QVERIFY(txn->commit(i)); + } + db->sync(); - QVERIFY(list.upperBound(Key(0, "A")) == list.constBegin()+1); - QVERIFY(list.upperBound(Key(0, "AA")) == list.constBegin()+1); - QVERIFY(list.upperBound(Key(0, "B")) == list.constBegin()+2); - QVERIFY(list.upperBound(Key(0, "D")) == list.constEnd()); + for (int i = numCommits / 2; i < numCommits; ++i) { + HBtreeTransaction *txn = db->beginTransaction(HBtreeTransaction::ReadWrite); + QVERIFY(txn); + QVERIFY(txn->put(QByteArray::number(i), QByteArray::number(i))); + QVERIFY(txn->commit(i)); + } - QCOMPARE(list.size(), 3); - QVERIFY(list[Key(0, "C")].data == QByteArray("_C_")); - QCOMPARE(list.size(), 3); - list[Key(0, "C")].data = QByteArray("_C2_"); - QVERIFY(list[Key(0, "C")].data == QByteArray("_C2_")); - QCOMPARE(list.size(), 3); - QVERIFY(list[Key(0, "AA")].data.isEmpty()); - QVERIFY(list.contains(Key(0, "AA"))); - QCOMPARE(list.size(), 4); + d->close(false); - QVERIFY(list.find(Key(0, "D")) == list.constEnd()); + QVERIFY(db->open()); + QCOMPARE((int)db->tag(), numCommits / 2 - 1); - QCOMPARE(list.size(), 4); - list.insert(Key(0, "B"), Value("_B2_")); - QCOMPARE(list.size(), 4); + for (int i = 0; i < numCommits / 2; ++i) { + HBtreeTransaction *txn = db->beginTransaction(HBtreeTransaction::ReadOnly); + QVERIFY(txn); + QCOMPARE(txn->get(QByteArray::number(i)), QByteArray::number(i)); + txn->abort(); + } - QCOMPARE(list.value(Key(0, "A")).data, QByteArray("_A_")); - QCOMPARE(list.value(Key(0, "B")).data, QByteArray("_B2_")); - QCOMPARE(list.value(Key(0, "C")).data, QByteArray("_C2_")); - QCOMPARE(list.value(Key(0, "AA")).data, QByteArray("")); + for (int i = numCommits / 2; i < numCommits; ++i) { + HBtreeTransaction *txn = db->beginTransaction(HBtreeTransaction::ReadWrite); + QVERIFY(txn); + QVERIFY(txn->get(QByteArray::number(i)).isEmpty()); + txn->abort(); + } } QTEST_MAIN(TestHBtree) |