@@ -170,9 +170,8 @@ arangodb::Result fetchRevisions(arangodb::transaction::Methods& trx,
170
170
" &batchId=" + std::to_string (config.batch .id );
171
171
auto headers = arangodb::replutils::createHeaders ();
172
172
173
- std::string msg = " fetching documents by revision for collection '" +
174
- collection.name () + " ' from " + url;
175
- config.progress .set (msg);
173
+ config.progress .set (" fetching documents by revision for collection '" +
174
+ collection.name () + " ' from " + url);
176
175
177
176
auto removeConflict = [&](auto const & conflictingKey) -> Result {
178
177
keyBuilder->clear ();
@@ -187,10 +186,18 @@ arangodb::Result fetchRevisions(arangodb::transaction::Methods& trx,
187
186
return res;
188
187
};
189
188
189
+ std::size_t numUniqueIndexes = [&]() {
190
+ std::size_t numUnique = 0 ;
191
+ for (auto const & idx : collection.getIndexes ()) {
192
+ numUnique += idx->unique () ? 1 : 0 ;
193
+ }
194
+ return numUnique;
195
+ }();
196
+
190
197
std::size_t current = 0 ;
191
198
auto guard = arangodb::scopeGuard (
192
199
[¤t, &stats]() -> void { stats.numDocsRequested += current; });
193
- char ridBuffer[11 ];
200
+ char ridBuffer[arangodb::basics::maxUInt64StringSize ];
194
201
std::unique_ptr<arangodb::httpclient::SimpleHttpResult> response;
195
202
while (current < toFetch.size ()) {
196
203
arangodb::transaction::BuilderLeaser requestBuilder (&trx);
@@ -230,6 +237,8 @@ arangodb::Result fetchRevisions(arangodb::transaction::Methods& trx,
230
237
config.leader .endpoint , url,
231
238
" : response is not an array" ));
232
239
}
240
+
241
+ config.progress .set (" applying documents by revision for collection '" + collection.name () + " '" );
233
242
234
243
for (VPackSlice leaderDoc : VPackArrayIterator (docs)) {
235
244
if (!leaderDoc.isObject ()) {
@@ -254,41 +263,50 @@ arangodb::Result fetchRevisions(arangodb::transaction::Methods& trx,
254
263
" : document revision is invalid" );
255
264
}
256
265
257
- TRI_ASSERT (options.indexOperationMode == arangodb::IndexOperationMode::internal);
258
-
259
- Result res = physical->insert (&trx, leaderDoc, mdr, options);
260
-
261
- if (res.fail ()) {
262
- if (res.is (TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) &&
263
- res.errorMessage () > keySlice.stringView ()) {
264
- arangodb::RevisionId rid = arangodb::RevisionId::fromSlice (leaderDoc);
265
- if (physical->readDocument (&trx, arangodb::LocalDocumentId (rid.id ()), mdr)) {
266
- // already have exactly this revision no need to insert
267
- continue ;
268
- }
269
- // remove conflict and retry
270
- // errorMessage() is this case contains the conflicting key
271
- auto inner = removeConflict (res.errorMessage ());
272
- if (inner.fail ()) {
273
- return res;
274
- }
266
+ options.indexOperationMode = arangodb::IndexOperationMode::internal;
267
+
268
+ // we need a retry loop here for unique indexes (we will always have at least
269
+ // one unique index, which is the primary index, but there can be more). as
270
+ // documents can be presented in any state on the follower, simply inserting
271
+ // them in leader order may trigger a unique constraint violation on the follower.
272
+ // in this case we may need to remove the conflicting document. this can
273
+ // happen multiple times if there are multiple unique indexes! we can only
274
+ // stop trying once we have tried often enough, or if inserting succeeds.
275
+ std::size_t tries = 1 + numUniqueIndexes;
276
+ while (tries-- > 0 ) {
277
+ if (tries == 0 ) {
275
278
options.indexOperationMode = arangodb::IndexOperationMode::normal;
276
- res = physical->insert (&trx, leaderDoc, mdr, options);
279
+ }
280
+
281
+ Result res = physical->insert (&trx, leaderDoc, mdr, options);
277
282
278
- options.indexOperationMode = arangodb::IndexOperationMode::internal;
279
- if (res.fail ()) {
280
- return res;
281
- }
282
- // fall-through
283
- } else {
283
+ options.indexOperationMode = arangodb::IndexOperationMode::internal;
284
+
285
+ if (res.ok ()) {
286
+ ++stats.numDocsInserted ;
287
+ break ;
288
+ }
289
+
290
+ if (!res.is (TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED)) {
284
291
auto errorNumber = res.errorNumber ();
285
292
res.reset (errorNumber, concatT (TRI_errno_string (errorNumber), " : " ,
286
293
res.errorMessage ()));
287
294
return res;
288
295
}
289
- }
290
296
291
- ++stats.numDocsInserted ;
297
+ arangodb::RevisionId rid = arangodb::RevisionId::fromSlice (leaderDoc);
298
+ if (physical->readDocument (&trx, arangodb::LocalDocumentId (rid.id ()), mdr)) {
299
+ // already have exactly this revision no need to insert
300
+ break ;
301
+ }
302
+
303
+ // remove conflict and retry
304
+ // errorMessage() is this case contains the conflicting key
305
+ auto inner = removeConflict (res.errorMessage ());
306
+ if (inner.fail ()) {
307
+ return res;
308
+ }
309
+ }
292
310
}
293
311
current += docs.length ();
294
312
}
@@ -1145,8 +1163,7 @@ Result DatabaseInitialSyncer::fetchCollectionSyncByKeys(arangodb::LogicalCollect
1145
1163
1146
1164
if (!res.ok ()) {
1147
1165
return Result (res.errorNumber (),
1148
- concatT (" unable to start transaction (" , __FILE__, " :" ,
1149
- __LINE__, " ): " , res.errorMessage ()));
1166
+ concatT (" unable to start transaction: " , res.errorMessage ()));
1150
1167
}
1151
1168
1152
1169
OperationOptions options;
@@ -1260,8 +1277,7 @@ Result DatabaseInitialSyncer::fetchCollectionSyncByRevisions(arangodb::LogicalCo
1260
1277
1261
1278
if (!res.ok ()) {
1262
1279
return Result (res.errorNumber (),
1263
- concatT (" unable to start transaction (" , __FILE__, " :" ,
1264
- __LINE__, " ): " , res.errorMessage ()));
1280
+ concatT (" unable to start transaction: " , res.errorMessage ()));
1265
1281
}
1266
1282
1267
1283
OperationOptions options;
@@ -1315,7 +1331,10 @@ Result DatabaseInitialSyncer::fetchCollectionSyncByRevisions(arangodb::LogicalCo
1315
1331
}
F438
td>1316
1332
return Result (ex.code ());
1317
1333
}
1318
- trx->addHint (Hints::Hint::NO_INDEXING);
1334
+
1335
+ // we must be able to read our own writes here - otherwise the end result
1336
+ // can be wrong. do not enable NO_INDEXING here!
1337
+
1319
1338
// turn on intermediate commits as the number of keys to delete can be huge
1320
1339
// here
1321
1340
trx->addHint (Hints::Hint::INTERMEDIATE_COMMITS);
@@ -1355,7 +1374,7 @@ Result DatabaseInitialSyncer::fetchCollectionSyncByRevisions(arangodb::LogicalCo
1355
1374
{
1356
1375
VPackBuilder requestBuilder;
1357
1376
{
1358
- char ridBuffer[11 ];
1377
+ char ridBuffer[arangodb::basics::maxUInt64StringSize ];
1359
1378
VPackArrayBuilder list (&requestBuilder);
1360
1379
for (auto & pair : ranges) {
1361
1380
VPackArrayBuilder range (&requestBuilder);
0 commit comments