@@ -777,6 +777,93 @@ class MyWALDumper final : public rocksdb::WriteBatch::Handler, public WalAccessC
777
777
#endif
778
778
};
779
779
780
+ // / @brief helper function to print WAL contents. this is only used for
781
+ // / debugging
782
+ #ifdef ARANGODB_ENABLE_MAINTAINER_MODE
783
+ void RocksDBWalAccess::printWal (Filter const & filter, size_t chunkSize,
784
+ MarkerCallback const & func) const {
785
+ rocksdb::TransactionDB* db = _engine.db ();
786
+
787
+ if (chunkSize < 16384 ) { // we need to have some sensible minimum
788
+ chunkSize = 16384 ;
789
+ }
790
+
791
+ // pre 3.4 breaking up write batches is not supported
792
+ size_t maxTrxChunkSize = filter.tickLastScanned > 0 ? chunkSize : SIZE_MAX;
793
+
794
+ MyWALDumper dumper (_engine, filter, func, maxTrxChunkSize);
795
+ const uint64_t since = dumper.safeBeginTick ();
796
+ TRI_ASSERT (since <= filter.tickStart );
797
+ TRI_ASSERT (since <= filter.tickEnd );
798
+
799
+ uint64_t firstTick = UINT64_MAX; // first tick to actually print (exclusive)
800
+ uint64_t lastScannedTick = since; // last (begin) tick of batch we looked at
801
+ uint64_t latestTick = db->GetLatestSequenceNumber ();
802
+
803
+ std::unique_ptr<rocksdb::TransactionLogIterator> iterator;
804
+ rocksdb::TransactionLogIterator::ReadOptions ro (false );
805
+ rocksdb::Status s = db->GetUpdatesSince (since, &iterator, ro);
806
+ TRI_ASSERT (s.ok ());
807
+
808
+ // we need to check if the builder is bigger than the chunksize,
809
+ // only after we printed a full WriteBatch. Otherwise a client might
810
+ // never read the full writebatch
811
+ LOG_TOPIC (" caefa" , WARN, Logger::ENGINES)
812
+ << " WAL tailing call. Scan since: " << since << " , tick start: " << filter.tickStart
813
+ << " , tick end: " << filter.tickEnd << " , chunk size: " << chunkSize << " , latesttick: " << latestTick;
814
+ while (iterator->Valid () && lastScannedTick <= filter.tickEnd ) {
815
+ rocksdb::BatchResult batch = iterator->GetBatch ();
816
+ // record the first tick we are actually considering
817
+ if (firstTick == UINT64_MAX) {
818
+ firstTick = batch.sequence ;
819
+ }
820
+
821
+ if (batch.sequence > filter.tickEnd ) {
822
+ break ; // cancel out
823
+ }
824
+
825
+ LOG_TOPIC (" a9d9c" , WARN, Logger::REPLICATION)
826
+ << " found batch-seq: " << batch.sequence << " , count: "
827
+ << batch.writeBatchPtr ->Count ()
828
+ << " , last scanned: " << lastScannedTick;
829
+ lastScannedTick = batch.sequence ; // start of the batch
830
+ TRI_ASSERT (lastScannedTick <= db->GetLatestSequenceNumber ());
831
+
832
+ if (batch.sequence < since) {
833
+ iterator->Next (); // skip
834
+ continue ;
835
+ }
836
+
837
+ dumper.startNewBatch (batch.sequence );
838
+ s = batch.writeBatchPtr ->Iterate (&dumper);
839
+ if (batch.writeBatchPtr ->Count () == 0 ) {
840
+ // there can be completely empty write batches. in case we encounter
841
+ // some, we cannot assume the tick gets increased next time
842
+ dumper.disableTickCheck ();
843
+ }
844
+ TRI_ASSERT (s.ok ());
845
+
846
+ uint64_t batchEndSeq = dumper.endBatch (); // end tick of the batch
847
+ TRI_ASSERT (batchEndSeq >= lastScannedTick);
848
+
849
+ if (dumper.responseSize () >= chunkSize) { // break if response gets big
850
+ LOG_TOPIC (" 205d3" , WARN, Logger::REPLICATION) << " reached maximum result size. finishing tailing" ;
851
+ break ;
852
+ }
853
+ // we need to set this here again, to avoid re-scanning WriteBatches
854
+ lastScannedTick = batchEndSeq; // do not remove, tailing take forever
855
+ TRI_ASSERT (lastScannedTick <= db->GetLatestSequenceNumber ());
856
+
857
+ iterator->Next ();
858
+ }
859
+
860
+ latestTick = db->GetLatestSequenceNumber ();
861
+
862
+ LOG_TOPIC (" 5b5a1" , WARN, Logger::REPLICATION)
863
+ << " lastScannedTick: " << lastScannedTick << " , latestTick: " << latestTick;
864
+ }
865
+ #endif
866
+
780
867
// iterates over WAL starting at 'from' and returns up to 'chunkSize' documents
781
868
// from the corresponding database
782
869
WalAccessResult RocksDBWalAccess::tail (Filter const & filter, size_t chunkSize,
@@ -805,19 +892,19 @@ WalAccessResult RocksDBWalAccess::tail(Filter const& filter, size_t chunkSize,
805
892
// prevent purging of WAL files while we are in here
806
893
RocksDBFilePurgePreventer purgePreventer (_engine.disallowPurging ());
807
894
808
- std::unique_ptr<rocksdb::TransactionLogIterator> iterator; // reader();
895
+ std::unique_ptr<rocksdb::TransactionLogIterator> iterator;
809
896
// no need verifying the WAL contents
810
897
rocksdb::TransactionLogIterator::ReadOptions ro (false );
811
898
rocksdb::Status s = db->GetUpdatesSince (since, &iterator, ro);
812
899
<
A3E2
span class="pl-k">if (!s.ok ()) {
813
900
Result r = convertStatus (s, rocksutils::StatusHint::wal);
814
- return WalAccessResult (r.errorNumber (), filter.tickStart == latestTick, 0 , 0 , latestTick);
901
+ return WalAccessResult (r.errorNumber (), filter.tickStart == latestTick, 0 , /* lastScannedTick */ 0 , latestTick);
815
902
}
816
903
817
904
// we need to check if the builder is bigger than the chunksize,
818
905
// only after we printed a full WriteBatch. Otherwise a client might
819
906
// never read the full writebatch
820
- LOG_TOPIC (" caefa " , DEBUG, Logger::ENGINES)
907
+ LOG_TOPIC (" caefb " , DEBUG, Logger::ENGINES)
821
908
<< " WAL tailing call. Scan since: " << since << " , tick start: " << filter.tickStart
822
909
<< " , tick end: " << filter.tickEnd << " , chunk size: " << chunkSize;
823
910
while (iterator->Valid () && lastScannedTick <= filter.tickEnd ) {
@@ -833,6 +920,14 @@ WalAccessResult RocksDBWalAccess::tail(Filter const& filter, size_t chunkSize,
833
920
834
921
// LOG_TOPIC("1eccb", TRACE, Logger::REPLICATION) << "found batch-seq: " << batch.sequence;
835
922
lastScannedTick = batch.sequence ; // start of the batch
923
+ #ifdef ARANGODB_ENABLE_MAINTAINER_MODE
924
+ if (lastScannedTick > db->GetLatestSequenceNumber ()) {
925
+ // this is an unexpected condition. in this case we print the WAL for
926
+ // debug purposes and error out
927
+ printWal (filter, chunkSize, func);
928
+ }
929
+ #endif
930
+ TRI_ASSERT (lastScannedTick <= db->GetLatestSequenceNumber ());
836
931
837
932
if (batch.sequence < since) {
838
933
// LOG_TOPIC("a5e56", TRACE, Logger::REPLICATION) << "skipping batch from " << batch.sequence << " to " << (batch.sequence + batch.writeBatchPtr->Count());
@@ -863,6 +958,14 @@ WalAccessResult RocksDBWalAccess::tail(Filter const& filter, size_t chunkSize,
863
958
}
864
959
// we need to set this here again, to avoid re-scanning WriteBatches
865
960
lastScannedTick = batchEndSeq; // do not remove, tailing take forever
961
+ #ifdef ARANGODB_ENABLE_MAINTAINER_MODE
962
+ if (lastScannedTick > db->GetLatestSequenceNumber ()) {
963
+ // this is an unexpected condition. in this case we print the WAL for
964
+ // debug purposes and error out
965
+ printWal (filter, chunkSize, func);
966
+ }
967
+ #endif
968
+ TRI_ASSERT (lastScannedTick <= db->GetLatestSequenceNumber ());
866
969
867
970
iterator->Next ();
868
971
}
@@ -871,6 +974,16 @@ WalAccessResult RocksDBWalAccess::tail(Filter const& filter, size_t chunkSize,
871
974
// while scanning the WAL
872
975
latestTick = db->GetLatestSequenceNumber ();
873
976
977
+ #ifdef ARANGODB_ENABLE_MAINTAINER_MODE
978
+ if (s.ok () && lastScannedTick > latestTick) {
979
+ // this is an unexpected condition. in this case we print the WAL for
980
+ // debug purposes and error out
981
+ printWal (filter, chunkSize, func);
982
+ }
983
+ #endif
984
+
985
+ TRI_ASSERT (!s.ok () || lastScannedTick <= latestTick);
986
+
874
987
WalAccessResult result (TRI_ERROR_NO_ERROR, firstTick <= filter.tickStart ,
875
988
lastWrittenTick, lastScannedTick, latestTick);
876
989
if (!s.ok ()) {
0 commit comments