24
24
import com .orientechnologies .orient .core .db .ODatabaseRecordThreadLocal ;
25
25
import com .orientechnologies .orient .core .db .OScenarioThreadLocal ;
26
26
import com .orientechnologies .orient .core .db .document .ODatabaseDocumentTx ;
27
+ import com .orientechnologies .orient .core .record .ORecord ;
27
28
import com .orientechnologies .orient .server .config .OServerUserConfiguration ;
28
29
import com .orientechnologies .orient .server .distributed .ODistributedAbstractPlugin ;
29
30
import com .orientechnologies .orient .server .distributed .ODistributedConfiguration ;
36
37
import com .orientechnologies .orient .server .distributed .ODistributedServerLog .DIRECTION ;
37
38
import com .orientechnologies .orient .server .distributed .ODistributedServerManager ;
38
39
import com .orientechnologies .orient .server .distributed .task .OAbstractRemoteTask ;
40
+ import com .orientechnologies .orient .server .distributed .task .OCreateRecordTask ;
41
+ import com .orientechnologies .orient .server .distributed .task .ODeleteRecordTask ;
42
+ import com .orientechnologies .orient .server .distributed .task .OFixTxTask ;
43
+ import com .orientechnologies .orient .server .distributed .task .OResurrectRecordTask ;
44
+ import com .orientechnologies .orient .server .distributed .task .OSQLCommandTask ;
45
+ import com .orientechnologies .orient .server .distributed .task .OTxTask ;
46
+ import com .orientechnologies .orient .server .distributed .task .OUpdateRecordTask ;
39
47
40
48
import java .io .Serializable ;
41
49
import java .util .Collection ;
54
62
*/
55
63
public class OHazelcastDistributedDatabase implements ODistributedDatabase {
56
64
57
- public static final String NODE_QUEUE_PREFIX = "orientdb.node." ;
58
- public static final String NODE_QUEUE_UNDO_POSTFIX = ".undo " ;
59
- private static final String NODE_LOCK_PREFIX = "orientdb.reqlock." ;
65
+ public static final String NODE_QUEUE_PREFIX = "orientdb.node." ;
66
+ public static final String NODE_QUEUE_PENDING_POSTFIX = ".pending " ;
67
+ private static final String NODE_LOCK_PREFIX = "orientdb.reqlock." ;
60
68
protected final OHazelcastPlugin manager ;
61
69
protected final OHazelcastDistributedMessageService msgService ;
62
70
protected final String databaseName ;
63
71
protected final Lock requestLock ;
64
72
protected volatile ODatabaseDocumentTx database ;
65
- protected volatile boolean restoringMessages = false ;
66
- protected AtomicBoolean status = new AtomicBoolean (false );
67
- protected Object waitForOnline = new Object ();
73
+ protected volatile boolean restoringMessages = false ;
74
+ protected AtomicBoolean status = new AtomicBoolean (false );
75
+ protected Object waitForOnline = new Object ();
68
76
protected Thread listenerThread ;
69
- protected AtomicLong waitForMessageId = new AtomicLong (-1 );
77
+ protected AtomicLong waitForMessageId = new AtomicLong (-1 );
70
78
71
79
public OHazelcastDistributedDatabase (final OHazelcastPlugin manager , final OHazelcastDistributedMessageService msgService ,
72
80
final String iDatabaseName ) {
@@ -200,7 +208,7 @@ public OHazelcastDistributedDatabase configureDatabase(final boolean iRestoreMes
200
208
queueName );
201
209
202
210
// UNDO PREVIOUS MESSAGE IF ANY
203
- final IMap <Object , Object > undoMap = restoreMessagesBeforeFailure (iRestoreMessages );
211
+ final IMap <String , Object > lastPendingMessagesMap = restoreMessagesBeforeFailure (iRestoreMessages );
204
212
205
213
restoringMessages = msgService .checkForPendingMessages (requestQueue , queueName , iUnqueuePendingMessages );
206
214
@@ -211,6 +219,9 @@ public void run() {
211
219
while (!Thread .interrupted ()) {
212
220
if (restoringMessages && requestQueue .isEmpty ()) {
213
221
// END OF RESTORING MESSAGES, SET IT ONLINE
222
+ ODistributedServerLog .info (this , getLocalNodeName (), null , DIRECTION .NONE ,
223
+ "executed all pending tasks in queue, set restoringMessages=false and database '%s' as online..." , databaseName );
224
+
214
225
restoringMessages = false ;
215
226
setOnline ();
216
227
}
@@ -220,16 +231,21 @@ public void run() {
220
231
try {
221
232
message = readRequest (requestQueue );
222
233
223
- // SAVE THE MESSAGE IN THE UNDO MAP IN CASE OF FAILURE
224
- undoMap .put (databaseName , message );
234
+ // DECIDE TO USE THE HZ MAP ONLY IF THE COMMAND IS NOT IDEMPOTENT (ALL BUT READ-RECORD/SQL SELECT/SQL TRAVERSE
235
+ final boolean saveAsPending = !message .getTask ().isIdempotent ();
236
+
237
+ if (saveAsPending )
238
+ // SAVE THE MESSAGE IN TO THE UNDO MAP IN CASE OF FAILURE
239
+ lastPendingMessagesMap .put (databaseName , message );
225
240
226
241
if (message != null ) {
227
242
senderNode = message .getSenderNodeName ();
228
243
onMessage (message );
229
244
}
230
245
231
- // OK: REMOVE THE UNDO BUFFER
232
- undoMap .remove (databaseName );
246
+ if (saveAsPending )
247
+ // OK: REMOVE THE UNDO BUFFER
248
+ lastPendingMessagesMap .remove (databaseName );
233
249
234
250
} catch (InterruptedException e ) {
235
251
// EXIT CURRENT THREAD
@@ -507,39 +523,59 @@ protected IQueue<ODistributedRequest>[] getRequestQueues(final String iDatabaseN
507
523
/**
508
524
* Composes the undo queue name based on node name.
509
525
*/
510
- protected String getUndoMapName ( final String iDatabaseName ) {
526
+ protected String getPendingRequestMapName ( ) {
511
527
final StringBuilder buffer = new StringBuilder ();
512
528
buffer .append (NODE_QUEUE_PREFIX );
513
529
buffer .append (manager .getLocalNodeName ());
514
- if (iDatabaseName != null ) {
515
- buffer .append ('.' );
516
- buffer .append (iDatabaseName );
517
- }
518
- buffer .append (NODE_QUEUE_UNDO_POSTFIX );
530
+ buffer .append (NODE_QUEUE_PENDING_POSTFIX );
519
531
return buffer .toString ();
520
532
}
521
533
522
534
protected String getLocalNodeName () {
523
535
return manager .getLocalNodeName ();
524
536
}
525
537
526
- protected IMap <Object , Object > restoreMessagesBeforeFailure (final boolean iRestoreMessages ) {
527
- final IMap <Object , Object > undoMap = manager .getHazelcastInstance ().getMap (getUndoMapName (databaseName ));
528
- final ODistributedRequest undoRequest = (ODistributedRequest ) undoMap .remove (databaseName );
529
- if (undoRequest != null && iRestoreMessages ) {
530
- ODistributedServerLog .warn (this , getLocalNodeName (), null , DIRECTION .NONE ,
531
- "restore last replication message before the crash for database %s: %s" , databaseName , undoRequest );
538
+ protected IMap <String , Object > restoreMessagesBeforeFailure (final boolean iRestoreMessages ) {
539
+ final IMap <String , Object > lastPendingRequestMap = manager .getHazelcastInstance ().getMap (getPendingRequestMapName ());
540
+ if (iRestoreMessages ) {
541
+ // RESTORE LAST UNDO MESSAGE
542
+ final ODistributedRequest lastPendingRequest = (ODistributedRequest ) lastPendingRequestMap .remove (databaseName );
543
+ if (lastPendingRequest != null ) {
544
+ // RESTORE LAST REQUEST
545
+ ODistributedServerLog .warn (this , getLocalNodeName (), null , DIRECTION .NONE ,
546
+ "restore last replication message before the crash for database '%s': %s..." , databaseName , lastPendingRequest );
547
+
548
+ try {
549
+ initDatabaseInstance ();
532
550
533
- try {
534
- initDatabaseInstance ();
535
- onMessage (undoRequest );
536
- } catch (Throwable t ) {
537
- ODistributedServerLog .error (this , getLocalNodeName (), null , DIRECTION .NONE ,
538
- "error on executing restored message for database %s" , t , databaseName );
539
- }
551
+ final boolean executeLastPendingRequest = checkIfOperationHasBeenExecuted (lastPendingRequest ,
552
+ lastPendingRequest .getTask ());
540
553
554
+ if (executeLastPendingRequest )
555
+ onMessage (lastPendingRequest );
556
+
557
+ } catch (Throwable t ) {
558
+ ODistributedServerLog .error (this , getLocalNodeName (), null , DIRECTION .NONE ,
559
+ "error on executing restored message for database %s" , t , databaseName );
560
+ }
561
+ }
541
562
}
542
- return undoMap ;
563
+
564
+ return lastPendingRequestMap ;
565
+ }
566
+
567
+ /**
568
+ * Checks if last pending operation must be re-executed or not. In some circustamces the exception
569
+ * OHotAlignmentNotPossibleExeption is raised because it's not possible to recover the database state.
570
+ *
571
+ * @throws OHotAlignmentNotPossibleExeption
572
+ */
573
+ protected void hotAlignmentError (final ODistributedRequest iLastPendingRequest , final String iMessage , final Object ... iParams )
574
+ throws OHotAlignmentNotPossibleExeption {
575
+ final String msg = String .format (iMessage , iParams );
576
+
577
+ ODistributedServerLog .warn (this , getLocalNodeName (), iLastPendingRequest .getSenderNodeName (), DIRECTION .IN , "- " + msg );
578
+ throw new OHotAlignmentNotPossibleExeption (msg );
543
579
}
544
580
545
581
protected void checkLocalNodeInConfiguration () {
@@ -578,4 +614,53 @@ protected void removeNodeInConfiguration(final String iNode, final boolean iForc
578
614
"unable to remove node '%s' in distributed configuration, db=%s" , e , iNode , databaseName );
579
615
}
580
616
}
617
+
618
+ protected boolean checkIfOperationHasBeenExecuted (final ODistributedRequest lastPendingRequest , final OAbstractRemoteTask task ) {
619
+ boolean executeLastPendingRequest = false ;
620
+
621
+ // ASK FOR RECORD
622
+ if (task instanceof ODeleteRecordTask ) {
623
+ // EXECUTE ONLY IF THE RECORD HASN'T BEEN DELETED YET
624
+ executeLastPendingRequest = ((ODeleteRecordTask ) task ).getRid ().getRecord () != null ;
625
+ } else if (task instanceof OUpdateRecordTask ) {
626
+ final ORecord <?> rec = ((OUpdateRecordTask ) task ).getRid ().getRecord ();
627
+ if (rec == null )
628
+ ODistributedServerLog .warn (this , getLocalNodeName (), lastPendingRequest .getSenderNodeName (), DIRECTION .IN ,
629
+ "- cannot update deleted record %s, database could be not aligned" , ((OUpdateRecordTask ) task ).getRid ());
630
+ else
631
+ // EXECUTE ONLY IF VERSIONS DIFFER
632
+ executeLastPendingRequest = !rec .getRecordVersion ().equals (((OUpdateRecordTask ) task ).getVersion ());
633
+ } else if (task instanceof OCreateRecordTask ) {
634
+ // EXECUTE ONLY IF THE RECORD HASN'T BEEN CREATED YET
635
+ executeLastPendingRequest = ((OCreateRecordTask ) task ).getRid ().getRecord () == null ;
636
+ } else if (task instanceof OSQLCommandTask ) {
637
+ if (!task .isIdempotent ()) {
638
+ hotAlignmentError (lastPendingRequest , "Not able to assure last command has been completed before last crash. Command='%s'" ,
639
+ ((OSQLCommandTask ) task ).getPayload ());
640
+ }
641
+ } else if (task instanceof OResurrectRecordTask ) {
642
+ if (((OResurrectRecordTask ) task ).getRid ().getRecord () == null )
643
+ // ALREADY DELETED: CANNOT RESTORE IT
644
+ hotAlignmentError (lastPendingRequest , "Not able to resurrect deleted record '%s'" , ((OResurrectRecordTask ) task ).getRid ());
645
+ } else if (task instanceof OTxTask ) {
646
+ // CHECK EACH TX ITEM IF HAS BEEN COMMITTED
647
+ for (OAbstractRemoteTask t : ((OTxTask ) task ).getTasks ()) {
648
+ executeLastPendingRequest = checkIfOperationHasBeenExecuted (lastPendingRequest , t );
649
+ if (executeLastPendingRequest )
650
+ // REPEAT THE ENTIRE TX
651
+ return true ;
652
+ }
653
+ } else if (task instanceof OFixTxTask ) {
654
+ // CHECK EACH FIX-TX ITEM IF HAS BEEN COMMITTED
655
+ for (OAbstractRemoteTask t : ((OFixTxTask ) task ).getTasks ()) {
656
+ executeLastPendingRequest = checkIfOperationHasBeenExecuted (lastPendingRequest , t );
657
+ if (executeLastPendingRequest )
658
+ // REPEAT THE ENTIRE TX
659
+ return true ;
660
+ }
661
+ } else
662
+ hotAlignmentError (lastPendingRequest , "Not able to assure last operation has been completed before last crash. Task='%s'" ,
663
+ task );
664
+ return executeLastPendingRequest ;
665
+ }
581
666
}
0 commit comments