8000 Fixing collection name collection handling in Syncer (#3710) · Kitter/arangodb@0e485f7 · GitHub
[go: up one dir, main page]

Skip to content

Commit 0e485f7

Browse files
graetzerjsteemann
authored andcommitted
Fixing collection name collection handling in Syncer (arangodb#3710)
1 parent 987daca commit 0e485f7

File tree

16 files changed

+193
-71
lines changed

16 files changed

+193
-71
lines changed

UnitTests/unittest.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,11 @@ function main (argv) {
203203
// create output directory
204204
fs.makeDirectoryRecursive(testOutputDirectory);
205205

206+
if (options.hasOwnProperty('cluster') && options.cluster) {
207+
// cluster beats resilient single server
208+
options.singleresilient = false;
209+
}
210+
206211
// run the test and store the result
207212
let r = {}; // result
208213
try {

arangod/Cluster/HeartbeatThread.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -549,8 +549,7 @@ void HeartbeatThread::runSingleServer() {
549549
VPackSlice const res = result.slice();
550550
TRI_ASSERT(res.length() == 1 && res[0].isObject());
551551
leaderSlice = res[0].get(AgencyCommManager::slicePath(leaderPath));
552-
LOG_TOPIC(INFO, Logger::HEARTBEAT) << "Did not become leader, "
553-
<< "following " << leaderSlice.copyString();
552+
LOG_TOPIC(INFO, Logger::HEARTBEAT) << "Did not become leader";
554553
TRI_ASSERT(leaderSlice.isString() && leaderSlice.compareString(_myId) != 0);
555554
// intentionally falls through to case 3
556555

@@ -611,6 +610,7 @@ void HeartbeatThread::runSingleServer() {
611610
applier->stopAndJoin();
612611
}
613612

613+
LOG_TOPIC(INFO, Logger::HEARTBEAT) << "Starting replication from " << endpoint;
614614
ReplicationApplierConfiguration config = applier-&g A3E2 t;configuration();
615615
config._endpoint = endpoint;
616616
config._autoResync = true;

arangod/Replication/Syncer.cpp

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,9 @@ TRI_vocbase_t* Syncer::resolveVocbase(VPackSlice const& slice) {
351351
arangodb::LogicalCollection* Syncer::resolveCollection(TRI_vocbase_t* vocbase,
352352
VPackSlice const& slice) {
353353
TRI_ASSERT(vocbase != nullptr);
354-
if (!simulate32Client()) {
354+
// extract "cid"
355+
TRI_voc_cid_t cid = getCid(slice);
356+
if (!simulate32Client() || cid == 0) {
355357
VPackSlice uuid;
356358
if ((uuid = slice.get("cuid")).isString()) {
357359
return vocbase->lookupCollectionByUuid(uuid.copyString());
@@ -360,8 +362,6 @@ arangodb::LogicalCollection* Syncer::resolveCollection(TRI_vocbase_t* vocbase,
360362
}
361363
}
362364

363-
// extract "cid"
364-
TRI_voc_cid_t cid = getCid(slice);
365365
if (cid == 0) {
366366
LOG_TOPIC(ERR, Logger::REPLICATION) <<
367367
TRI_errno_string(TRI_ERROR_REPLICATION_INVALID_RESPONSE);
@@ -487,11 +487,38 @@ Result Syncer::createCollection(TRI_vocbase_t* vocbase,
487487
TRI_col_type_e const type = static_cast<TRI_col_type_e>(VelocyPackHelper::getNumericValue<int>(
488488
slice, "type", TRI_COL_TYPE_DOCUMENT));
489489

490+
// resolve collection by uuid, name, cid (in that order of preference)
490491
arangodb::LogicalCollection* col = resolveCollection(vocbase, slice);
491492
if (col != nullptr && col->type() == type) {
492493
// collection already exists. TODO: compare attributes
493494
return Result();
494495
}
496+
// conflicting collections need to be dropped from 3.3 onwards
497+
col = vocbase->lookupCollection(name);
498+
if (col != nullptr) {
499+
TRI_ASSERT(!simulate32Client()); // < 3.3 should never get here
500+
if (col->isSystem()) {
501+
TRI_ASSERT(col->globallyUniqueId() == col->name());
502+
CollectionGuard guard(vocbase, col);
503+
if (guard.collection() == nullptr) {
504+
return Result(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND);
505+
}
506+
SingleCollectionTransaction trx(transaction::StandaloneContext::Create(vocbase),
507+
guard.collection()->cid(), AccessMode::Type::WRITE);
508+
Result res = trx.begin();
509+
if (!res.ok()) {
510+
return res;
511+
}
512+
OperationOptions opts;
513+
OperationResult opRes = trx.truncate(col->name(), opts);
514+
if (opRes.fail()) {
515+
return opRes.result;
516+
}
517+
return trx.finish(opRes.result);
518+
} else {
519+
vocbase->dropCollection(col, false, -1.0);
520+
}
521+
}
495522

496523
VPackSlice uuid = slice.get("globallyUniqueId");
497524
// merge in "isSystem" attribute, doesn't matter if name does not start with '_'
@@ -571,7 +598,6 @@ Result Syncer::createIndex(VPackSlice const& slice) {
571598

572599
try {
573600
CollectionGuard guard(vocbase, col);
574-
575601
if (guard.collection() == nullptr) {
576602
return Result(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND);
577603
}

arangod/Replication/TailingSyncer.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -560,11 +560,17 @@ Result TailingSyncer::renameCollection(VPackSlice const& slice) {
560560
arangodb::LogicalCollection* col = nullptr;
561561
if (slice.hasKey("cuid")) {
562562
col = resolveCollection(vocbase, slice);
563+
if (col == nullptr) {
564+
return Result(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, "unknown cuid");
565+
}
563566
} else if (collection.hasKey("oldName")) {
564567
col = vocbase->lookupCollection(collection.get("oldName").copyString());
568+
if (col == nullptr) {
569+
return Result(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, "unknown old collection name");
570+
}
565571
}
566-
if (col == nullptr) {
567-
return Result(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND);
572+
if (col->isSystem()) {
573+
LOG_TOPIC(WARN, Logger::REPLICATION) << "Renaming system collection " << col->name();
568574
}
569575
return Result(vocbase->renameCollection(col, name, true));
570576
}

arangod/RestHandler/RestReplicationHandler.cpp

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -942,6 +942,16 @@ Result RestReplicationHandler::processRestoreCollection(
942942
if (res != TRI_ERROR_NO_ERROR) {
943943
return Result(res, std::string("unable to create collection: ") + TRI_errno_string(res));
944944
}
945+
946+
// might be also called on dbservers
947+
ExecContext const* exe = ExecContext::CURRENT;
948+
if (name[0] != '_' && exe != nullptr && !exe->isSuperuser() &&
949+
ServerState::instance()->isSingleServer()) {
950+
AuthenticationFeature *auth = AuthenticationFeature::INSTANCE;
951+
auth->authInfo()->updateUser(exe->user(), [&](AuthUserEntry& entry) {
952+
entry.grantCollection(_vocbase->name(), col->name(), AuthLevel::RW);
953+
});
954+
}
945955

946956
return Result();
947957
}
@@ -2452,15 +2462,6 @@ int RestReplicationHandler::createCollection(VPackSlice slice,
24522462
return TRI_ERROR_INTERNAL;
24532463
}
24542464

2455-
ExecContext const* exe = ExecContext::CURRENT;
2456-
if (name[0] != '_' && exe != nullptr && !exe->isSuperuser() &&
2457-
ServerState::instance()->isSingleServer()) {
2458-
AuthenticationFeature *auth = AuthenticationFeature::INSTANCE;
2459-
auth->authInfo()->updateUser(exe->user(), [&](AuthUserEntry& entry) {
2460-
entry.grantCollection(_vocbase->name(), col->name(), AuthLevel::RW);
2461-
});
2462-
}
2463-
24642465
/* Temporary ASSERTS to prove correctness of new constructor */
24652466
TRI_ASSERT(col->isSystem() == (name[0] == '_'));
24662467
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE

arangod/RocksDBEngine/RocksDBReplicationTailing.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,13 @@ class WALParser : public rocksdb::WriteBatch::Handler {
358358
if (!shouldHandleKey(column_family_id, false, key) ||
359359
column_family_id != _documentsCF) {
360360
if (column_family_id == _documentsCF) {
361-
_removeDocumentKey.clear();
361+
if (_lastLogType == RocksDBLogType::SingleRemove) {
362+
TRI_ASSERT(!_seenBeginTransaction);
363+
resetTransientState(); // ignoring the entire op
364+
} else {
365+
TRI_ASSERT(!_singleOp);
366+
_removeDocumentKey.clear(); // just ignoring this key
367+
}
362368
}
363369
return rocksdb::Status();
364370
}
@@ -557,6 +563,7 @@ class WALParser : public rocksdb::WriteBatch::Handler {
557563
// these parameters are relevant to determine if we can print
558564
// a specific marker from the WAL
559565
TRI_vocbase_t* const _vocbase;
566+
560567
bool const _includeSystem;
561568
TRI_voc_cid_t const _onlyCollectionId;
562569
/// result builder

arangod/RocksDBEngine/RocksDBTransactionState.cpp

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,20 +198,35 @@ void RocksDBTransactionState::createTransaction() {
198198

199199
// set begin marker
200200
if (!hasHint(transaction::Hints::Hint::SINGLE_OPERATION)) {
201-
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
202-
TRI_ASSERT(_numLogdata == 0);
203-
#endif
204201
RocksDBLogValue header =
205202
RocksDBLogValue::BeginTransaction(_vocbase->id(), _id);
206203
_rocksTransaction->PutLogData(header.slice());
207204
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
205+
TRI_ASSERT(_numLogdata == 0);
208206
++_numLogdata;
209207
#endif
210208
}
211209
}
212210

213211
arangodb::Result RocksDBTransactionState::internalCommit() {
214212
TRI_ASSERT(_rocksTransaction != nullptr);
213+
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
214+
uint64_t x = _numInserts + _numRemoves + _numUpdates;
215+
if (hasHint(transaction::Hints::Hint::SINGLE_OPERATION)) {
216+
TRI_ASSERT(x <= 1);
217+
TRI_ASSERT(_numLogdata == x);
218+
} else {
219+
if (_numLogdata < 1 + (x > 0 ? 1 : 0) + _numRemoves) {
220+
LOG_TOPIC(ERR, Logger::FIXME)
221+
<< "_numInserts " << _numInserts << " "
222+
<< "_numRemoves " << _numRemoves << " "
223+
<< "_numUpdates " << _numUpdates << " "
224+
<< "_numLogdata " << _numLogdata;
225+
}
226+
// begin transaction + n DocumentOpsPrologue + m doc removes
227+
TRI_ASSERT(_numLogdata >= 1 + (x > 0 ? 1 : 0) + _numRemoves);
228+
}
229+
#endif
215230

216231
ExecContext const* exe = ExecContext::CURRENT;
217232
if (!isReadOnlyTransaction() && exe != nullptr) {
@@ -536,6 +551,7 @@ void RocksDBTransactionState::checkIntermediateCommit(uint64_t newSize) {
536551
_options.intermediateCommitSize <= newSize) {
537552
// LOG_TOPIC(ERR, Logger::FIXME) << "INTERMEDIATE COMMIT!";
538553
internalCommit();
554+
_lastUsedCollection = 0;
539555
_numInternal = 0;
540556
_numInserts = 0;
541557
_numUpdates = 0;

arangod/RocksDBEngine/RocksDBWalAccess.cpp

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler,
150150
marker->add("cuid", VPackValuePair(uuid.data(), uuid.size(),
151151
VPackValueType::String));
152152
}
153-
_callback(loadVocbase(_currentDbId), _builder.slice());
153+
_callback(vocbase, _builder.slice());
154154
_builder.clear();
155155
}
156156
break;
@@ -161,18 +161,19 @@ class MyWALParser : public rocksdb::WriteBatch::Handler,
161161
_currentCid = RocksDBLogValue::collectionId(blob);
162162
// only print markers from this collection if it is set
163163
if (shouldHandleCollection(_currentDbId, _currentCid)) {
164+
TRI_vocbase_t* vocbase = loadVocbase(_currentDbId);
164165
LogicalCollection* col = loadCollection(_currentDbId, _currentCid);
165-
if (col != nullptr) {
166+
if (vocbase != nullptr && col != nullptr) {
166167
{
167168
uint64_t tick = _currentSequence + (_startOfBatch ? 0 : 1);
168169
VPackObjectBuilder marker(&_builder, true);
169170
marker->add("tick", VPackValue(std::to_string(tick)));
170171
marker->add("type", VPackValue(rocksutils::convertLogType(type)));
171-
marker->add("db", VPackValue(loadVocbase(_currentDbId)->name()));
172+
marker->add("db", VPackValue(vocbase->name()));
172173
marker->add("cuid", VPackValue(col->globallyUniqueId()));
173174
marker->add("data", RocksDBLogValue::indexSlice(blob));
174175
}
175-
_callback(loadVocbase(_currentDbId), _builder.slice());
176+
_callback(vocbase, _builder.slice());
176177
_builder.clear();
177178
}
178179
}
@@ -284,6 +285,11 @@ class MyWALParser : public rocksdb::WriteBatch::Handler,
284285
rocksdb::Slice const& value) override {
285286
tick();
286287
if (!shouldHandleMarker(column_family_id, true, key)) {
288+
if (column_family_id == _documentsCF && // ignoring collection
289+
_lastLogType == RocksDBLogType::SinglePut) {
290+
TRI_ASSERT(!_seenBeginTransaction);
291+
resetTransientState(); // ignoring the put
292+
}
287293
return rocksdb::Status();
288294
}
289295
//LOG_TOPIC(ERR, Logger::ROCKSDB) << "[PUT] cf: " << column_family_id
@@ -433,8 +439,15 @@ class MyWALParser : public rocksdb::WriteBatch::Handler,
433439
if (column_family_id != _documentsCF ||
434440
!shouldHandleMarker(column_family_id, false, key)) {
435441
if (column_family_id == _documentsCF) {
436-
_removeDocumentKey.clear();
442+
if (_lastLogType == RocksDBLogType::SingleRemove) {
443+
TRI_ASSERT(!_seenBeginTransaction);
444+
resetTransientState(); // ignoring the entire op
445+
} else {
446+
TRI_ASSERT(!_singleOp);
447+
_removeDocumentKey.clear(); // just ignoring this key
448+
}
437449
}
450+
438451
return rocksdb::Status();
439452
}
440453

@@ -501,15 +514,18 @@ class MyWALParser : public rocksdb::WriteBatch::Handler,
501514

502515
void writeCommitMarker() {
503516
TRI_ASSERT(_seenBeginTransaction && !_singleOp);
504-
_builder.openObject();
505-
_builder.add("tick", VPackValue(std::to_string(_currentSequence)));
506-
_builder.add("type", VPackValue(static_cast<uint64_t>(REPLICATION_TRANSACTION_COMMIT)));
507-
_builder.add("db", VPackValue(loadVocbase(_currentDbId)->name()));
508-
_builder.add("tid", VPackValue(std::to_string(_currentTrxId)));
509-
_builder.close();
510-
_callback(loadVocbase(_currentDbId), _builder.slice());
511-
_builder.clear();
512-
_seenBeginTransaction = false;
517+
TRI_vocbase_t* vocbase = loadVocbase(_currentDbId);
518+
if (vocbase != nullptr) { // we be in shutdown
519+
_builder.openObject();
520+
_builder.add("tick", VPackValue(std::to_string(_currentSequence)));
521+
_builder.add("type", VPackValue(static_cast<uint64_t>(REPLICATION_TRANSACTION_COMMIT)));
522+
_builder.add("db", VPackValue(vocbase->name()));
523+
_builder.add("tid", VPackValue(std::to_string(_currentTrxId)));
524+
_builder.close();
525+
_callback(vocbase, _builder.slice());
526+
_builder.clear();
527+
_seenBeginTransaction = false;
528+
}
513529
}
514530

515531
// should reset state flags which are only valid between
@@ -575,7 +591,6 @@ class MyWALParser : public rocksdb::WriteBatch::Handler,
575591
// happens when dropping a collection or log markers
576592
// are ignored for dbs and collections
577593
if (!(_seenBeginTransaction || _singleOp)) {
578-
TRI_ASSERT(dbId == 0 && cid == 0);
579594
return false;
580595
}
581596
} else {

arangod/StorageEngine/WalAccess.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@
2727

2828
using namespace arangodb;
2929

30-
bool WalAccessContext::shouldHandleDB(TRI_voc_tick_t dbid) {
30+
bool WalAccessContext::shouldHandleDB(TRI_voc_tick_t dbid) const {
3131
return _filter.vocbase == 0 || _filter.vocbase == dbid;
3232
}
3333

3434
/// @brief Check if collection is in filter
3535
bool WalAccessContext::shouldHandleCollection(TRI_voc_tick_t dbid,
36-
TRI_voc_cid_t cid) {
36+
TRI_voc_cid_t cid) const {
3737
return _filter.vocbase == 0 || (_filter.vocbase == dbid &&
3838
(_filter.collection == 0 || _filter.collection == cid));
3939
}

arangod/StorageEngine/WalAccess.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,10 @@ struct WalAccessContext {
150150
~WalAccessContext() {}
151151

152152

153-
bool shouldHandleDB(TRI_voc_tick_t dbid);
153+
bool shouldHandleDB(TRI_voc_tick_t dbid) const;
154154

155155
/// @brief Check if collection is in filter
156-
bool shouldHandleCollection(TRI_voc_tick_t dbid, TRI_voc_cid_t cid);
156+
bool shouldHandleCollection(TRI_voc_tick_t dbid, TRI_voc_cid_t cid) const;
157157

158158
/// @brief try to get collection, may return null
159159
TRI_vocbase_t* loadVocbase(TRI_voc_tick_t dbid);

0 commit comments

Comments
 (0)
0