8000 Working to fix issue #2270 · githubcs/orientdb@aeff00c · GitHub
[go: up one dir, main page]

Skip to content

Commit aeff00c

Browse files
committed
Working to fix issue orientechnologies#2270
1 parent da7717a commit aeff00c

File tree

8 files changed

+194
-56
lines changed

8 files changed

+194
-56
lines changed

distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastDistributedDatabase.java

Lines changed: 117 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.orientechnologies.orient.core.db.ODatabaseRecordThreadLocal;
2525
import com.orientechnologies.orient.core.db.OScenarioThreadLocal;
2626
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
27+
import com.orientechnologies.orient.core.record.ORecord;
2728
import com.orientechnologies.orient.server.config.OServerUserConfiguration;
2829
import com.orientechnologies.orient.server.distributed.ODistributedAbstractPlugin;
2930
import com.orientechnologies.orient.server.distributed.ODistributedConfiguration;
@@ -36,6 +37,13 @@
3637
import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION;
3738
import com.orientechnologies.orient.server.distributed.ODistributedServerManager;
3839
import com.orientechnologies.orient.server.distributed.task.OAbstractRemoteTask;
40+
import com.orientechnologies.orient.server.distributed.task.OCreateRecordTask;
41+
import com.orientechnologies.orient.server.distributed.task.ODeleteRecordTask;
42+
import com.orientechnologies.orient.server.distributed.task.OFixTxTask;
43+
import com.orientechnologies.orient.server.distributed.task.OResurrectRecordTask;
44+
import com.orientechnologies.orient.server.distributed.task.OSQLCommandTask;
45+
import com.orientechnologies.orient.server.distributed.task.OTxTask;
46+
import com.orientechnologies.orient.server.distributed.task.OUpdateRecordTask;
3947

4048
import java.io.Serializable;
4149
import java.util.Collection;
@@ -54,19 +62,19 @@
5462
*/
5563
public class OHazelcastDistributedDatabase implements ODistributedDatabase {
5664

57-
public static final String NODE_QUEUE_PREFIX = "orientdb.node.";
58-
public static final String NODE_QUEUE_UNDO_POSTFIX = ".undo";
59-
private static final String NODE_LOCK_PREFIX = "orientdb.reqlock.";
65+
public static final String NODE_QUEUE_PREFIX = "orientdb.node.";
66+
public static final String NODE_QUEUE_PENDING_POSTFIX = ".pending";
67+
private static final String NODE_LOCK_PREFIX = "orientdb.reqlock.";
6068
protected final OHazelcastPlugin manager;
6169
protected final OHazelcastDistributedMessageService msgService;
6270
protected final String databaseName;
6371
protected final Lock requestLock;
6472
protected volatile ODatabaseDocumentTx database;
65-
protected volatile boolean restoringMessages = false;
66-
protected AtomicBoolean status = new AtomicBoolean(false);
67-
protected Object waitForOnline = new Object();
73+
protected volatile boolean restoringMessages = false;
74+
protected AtomicBoolean status = new AtomicBoolean(false);
75+
protected Object waitForOnline = new Object();
6876
protected Thread listenerThread;
69-
protected AtomicLong waitForMessageId = new AtomicLong(-1);
77+
protected AtomicLong waitForMessageId = new AtomicLong(-1);
7078

7179
public OHazelcastDistributedDatabase(final OHazelcastPlugin manager, final OHazelcastDistributedMessageService msgService,
7280
final String iDatabaseName) {
@@ -200,7 +208,7 @@ public OHazelcastDistributedDatabase configureDatabase(final boolean iRestoreMes
200208
queueName);
201209

202210
// UNDO PREVIOUS MESSAGE IF ANY
203-
final IMap<Object, Object> undoMap = restoreMessagesBeforeFailure(iRestoreMessages);
211+
final IMap<String, Object> lastPendingMessagesMap = restoreMessagesBeforeFailure(iRestoreMessages);
204212

205213
restoringMessages = msgService.checkForPendingMessages(requestQueue, queueName, iUnqueuePendingMessages);
206214

@@ -211,6 +219,9 @@ public void run() {
211219
while (!Thread.interrupted()) {
212220
if (restoringMessages && requestQueue.isEmpty()) {
213221
// END OF RESTORING MESSAGES, SET IT ONLINE
222+
ODistributedServerLog.info(this, getLocalNodeName(), null, DIRECTION.NONE,
223+
"executed all pending tasks in queue, set restoringMessages=false and database '%s' as online...", databaseName);
224+
214225
restoringMessages = false;
215226
setOnline();
216227
}
@@ -220,16 +231,21 @@ public void run() {
220231
try {
221232
message = readRequest(requestQueue);
222233

223-
// SAVE THE MESSAGE IN THE UNDO MAP IN CASE OF FAILURE
224-
undoMap.put(databaseName, message);
234+
// DECIDE TO USE THE HZ MAP ONLY IF THE COMMAND IS NOT IDEMPOTENT (ALL BUT READ-RECORD/SQL SELECT/SQL TRAVERSE
235+
final boolean saveAsPending = !message.getTask().isIdempotent();
236+
237+
if (saveAsPending)
238+
// SAVE THE MESSAGE IN TO THE UNDO MAP IN CASE OF FAILURE
239+
lastPendingMessagesMap.put(databaseName, message);
225240

226241
if (message != null) {
227242
senderNode = message.getSenderNodeName();
228243
onMessage(message);
229244
}
230245

231-
// OK: REMOVE THE UNDO BUFFER
232-
undoMap.remove(databaseName);
246+
if (saveAsPending)
247+
// OK: REMOVE THE UNDO BUFFER
248+
lastPendingMessagesMap.remove(databaseName);
233249

234250
} catch (InterruptedException e) {
235251
// EXIT CURRENT THREAD
@@ -507,39 +523,59 @@ protected IQueue<ODistributedRequest>[] getRequestQueues(final String iDatabaseN
507523
/**
508524
* Composes the undo queue name based on node name.
509525
*/
510-
protected String getUndoMapName(final String iDatabaseName) {
526+
protected String getPendingRequestMapName() {
511527
final StringBuilder buffer = new StringBuilder();
512528
buffer.append(NODE_QUEUE_PREFIX);
513529
buffer.append(manager.getLocalNodeName());
514-
if (iDatabaseName != null) {
515-
buffer.append('.');
516-
buffer.append(iDatabaseName);
517-
}
518-
buffer.append(NODE_QUEUE_UNDO_POSTFIX);
530+
buffer.append(NODE_QUEUE_PENDING_POSTFIX);
519531
return buffer.toString();
520532
}
521533

522534
protected String getLocalNodeName() {
523535
return manager.getLocalNodeName();
524536
}
525537

526-
protected IMap<Object, Object> restoreMessagesBeforeFailure(final boolean iRestoreMessages) {
527-
final IMap<Object, Object> undoMap = manager.getHazelcastInstance().getMap(getUndoMapName(databaseName));
528-
final ODistributedRequest undoRequest = (ODistributedRequest) undoMap.remove(databaseName);
529-
if (undoRequest != null && iRestoreMessages) {
530-
ODistributedServerLog.warn(this, getLocalNodeName(), null, DIRECTION.NONE,
531-
"restore last replication message before the crash for database %s: %s", databaseName, undoRequest);
538+
protected IMap<String, Object> restoreMessagesBeforeFailure(final boolean iRestoreMessages) {
539+
final IMap<String, Object> lastPendingRequestMap = manager.getHazelcastInstance().getMap(getPendingRequestMapName());
540+
if (iRestoreMessages) {
541+
// RESTORE LAST UNDO MESSAGE
542+
final ODistributedRequest lastPendingRequest = (ODistributedRequest) lastPendingRequestMap.remove(databaseName);
543+
if (lastPendingRequest != null) {
544+
// RESTORE LAST REQUEST
545+
ODistributedServerLog.warn(this, getLocalNodeName(), null, DIRECTION.NONE,
546+
"restore last replication message before the crash for database '%s': %s...", databaseName, lastPendingRequest);
547+
548+
try {
549+
initDatabaseInstance();
532550

533-
try {
534-
initDatabaseInstance();
535-
onMessage(undoRequest);
536-
} catch (Throwable t) {
537-
ODistributedServerLog.error(this, getLocalNodeName(), null, DIRECTION.NONE,
538-
"error on executing restored message for database %s", t, databaseName);
539-
}
551+
final boolean executeLastPendingRequest = checkIfOperationHasBeenExecuted(lastPendingRequest,
552+
lastPendingRequest.getTask());
540553

554+
if (executeLastPendingRequest)
555+
onMessage(lastPendingRequest);
556+
557+
} catch (Throwable t) {
558+
ODistributedServerLog.error(this, getLocalNodeName(), null, DIRECTION.NONE,
559+
"error on executing restored message for database %s", t, databaseName);
560+
}
561+
}
541562
}
542-
return undoMap;
563+
564+
return lastPendingRequestMap;
565+
}
566+
567+
/**
568+
* Checks if last pending operation must be re-executed or not. In some circustamces the exception
569+
* OHotAlignmentNotPossibleExeption is raised because it's not possible to recover the database state.
570+
*
571+
* @throws OHotAlignmentNotPossibleExeption
572+
*/
573+
protected void hotAlignmentError(final ODistributedRequest iLastPendingRequest, final String iMessage, final Object... iParams)
574+
throws OHotAlignmentNotPossibleExeption {
575+
final String msg = String.format(iMessage, iParams);
576+
577+
ODistributedServerLog.warn(this, getLocalNodeName(), iLastPendingRequest.getSenderNodeName(), DIRECTION.IN, "- " + msg);
578+
throw new OHotAlignmentNotPossibleExeption(msg);
543579
}
544580

545581
protected void checkLocalNodeInConfiguration() {
@@ -578,4 +614,53 @@ protected void removeNodeInConfiguration(final String iNode, final boolean iForc
578614
"unable to remove node '%s' in distributed configuration, db=%s", e, iNode, databaseName);
579615
}
580616
}
617+
618+
protected boolean checkIfOperationHasBeenExecuted(final ODistributedRequest lastPendingRequest, final OAbstractRemoteTask task) {
619+
boolean executeLastPendingRequest = false;
620+
621+
// ASK FOR RECORD
622+
if (task instanceof ODeleteRecordTask) {
623+
// EXECUTE ONLY IF THE RECORD HASN'T BEEN DELETED YET
624+
executeLastPendingRequest = ((ODeleteRecordTask) task).getRid().getRecord() != null;
625+
} else if (task instanceof OUpdateRecordTask) {
626+
final ORecord<?> rec = ((OUpdateRecordTask) task).getRid().getRecord();
627+
if (rec == null)
628+
ODistributedServerLog.warn(this, getLocalNodeName(), lastPendingRequest.getSenderNodeName(), DIRECTION.IN,
629+
"- cannot update deleted record %s, database could be not aligned", ((OUpdateRecordTask) task).getRid());
630+
else
631+
// EXECUTE ONLY IF VERSIONS DIFFER
632+
executeLastPendingRequest = !rec.getRecordVersion().equals(((OUpdateRecordTask) task).getVersion());
633+
} else if (task instanceof OCreateRecordTask) {
634+
// EXECUTE ONLY IF THE RECORD HASN'T BEEN CREATED YET
635+
executeLastPendingRequest = ((OCreateRecordTask) task).getRid().getRecord() == null;
636+
} else if (task instanceof OSQLCommandTask) {
637+
if (!task.isIdempotent()) {
638+
hotAlignmentError(lastPendingRequest, "Not able to assure last command has been completed before last crash. Command='%s'",
639+
((OSQLCommandTask) task).getPayload());
640+
}
641+
} else if (task instanceof OResurrectRecordTask) {
642+
if (((OResurrectRecordTask) task).getRid().getRecord() == null)
643+
// ALREADY DELETED: CANNOT RESTORE IT
644+
hotAlignmentError(lastPendingRequest, "Not able to resurrect deleted record '%s'", ((OResurrectRecordTask) task).getRid());
645+
} else if (task instanceof OTxTask) {
646+
// CHECK EACH TX ITEM IF HAS BEEN COMMITTED
647+
for (OAbstractRemoteTask t : ((OTxTask) task).getTasks()) {
648+
executeLastPendingRequest = checkIfOperationHasBeenExecuted(lastPendingRequest, t);
649+
if (executeLastPendingRequest)
650+
// REPEAT THE ENTIRE TX
651+
return true;
652+
}
653+
} else if (task instanceof OFixTxTask) {
654+
// CHECK EACH FIX-TX ITEM IF HAS BEEN COMMITTED
655+
for (OAbstractRemoteTask t : ((OFixTxTask) task).getTasks()) {
656+
executeLastPendingRequest = checkIfOperationHasBeenExecuted(lastPendingRequest, t);
657+
if (executeLastPendingRequest)
658+
// REPEAT THE ENTIRE TX
659+
return true;
660+
}
661+
} else
662+
hotAlignmentError(lastPendingRequest, "Not able to assure last operation has been completed before last crash. Task='%s'",
663+
task);
664+
return executeLastPendingRequest;
665+
}
581666
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
*
3+
* * Copyright 2010-2014 Orient Technologies LTD (info(at)orientechnologies.com)
4+
* *
5+
* * Licensed under the Apache License, Version 2.0 (the "License");
6+
* * you may not use this file except in compliance with the License.
7+
* * You may obtain a copy of the License at
8+
* *
9+
* * http://www.apache.org/licenses/LICENSE-2.0
10+
* *
11+
* * Unless required by applicable law or agreed to in writing, software
12+
* * distributed under the License is distributed on an "AS IS" BASIS,
13+
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* * See the License for the specific language governing permissions and
15+
* * limitations under the License.
16+
*
17+
*/
18+
19+
package com.orientechnologies.orient.server.hazelcast;
20+
21+
import com.orientechnologies.orient.server.distributed.ODistributedException;
22+
23+
/**
24+
* Hot Aligment is not possible, if autoDeploy:true restore of entire database
25+
*
26+
* @author Luca Garulli (l.garulli--at--orientechnologies.com)
27+
*/
28+
public class OHotAlignmentNotPossibleExeption extends ODistributedException {
29+
public OHotAlignmentNotPossibleExeption(String s) {
30+
super(s);
31+
}
32+
}

distributed/src/test/java/com/orientechnologies/orient/server/distributed/TestSharding.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,14 @@
66
import com.tinkerpop.blueprints.impls.orient.OrientVertex;
77
import com.tinkerpop.blueprints.impls.orient.OrientVertexType;
88
import junit.framework.Assert;
9-
import org.junit.Ignore;
109
import org.junit.Test;
1110

1211
public class TestSharding extends AbstractServerClusterTest {
1312

1413
protected OrientVertex[] vertices;
1514

1615
@Test
17-
@Ignore
16+
//@Ignore
1817
public void test() throws Exception {
1918
init(3);
2019
prepare(false);

server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedStorage.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,10 @@ public void commit(final OTransaction iTx, final Runnable callback) {
574574
final OStorageOperationResult<ORawBuffer> previousContent = wrapped.readRecord(rid, null, false, null, false,
575575
LOCKING_STRATEGY.DEFAULT);
576576

577+
if (previousContent.getResult() == null)
578+
// DELETED
579+
throw new OTransactionException("Cannot update record '" + rid + "' because has been deleted");
580+
577581
task = new OUpdateRecordTask(rid, previousContent.getResult().getBuffer(), previousContent.getResult().version,
578582
record.toStream(), record.getRecordVersion());
579583
break;

server/src/main/java/com/orientechnologies/orient/server/distributed/task/OAbstractRemoteTask.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,23 @@
1515
*/
1616
package com.orientechnologies.orient.server.distributed.task;
1717

18+
import java.io.Externalizable;
19+
1820
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
1921
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
2022
import com.orientechnologies.orient.server.OServer;
2123
import com.orientechnologies.orient.server.distributed.ODistributedServerManager;
2224

23-
import java.io.Externalizable;
24-
2525
/**
2626
* Base class for Tasks to be executed remotely.
2727
*
2828
* @author Luca Garulli (l.garulli--at--orientechnologies.com)
2929
*
3030
*/
3131
public abstract class OAbstractRemoteTask implements Externalizable {
32-
private static final long serialVersionUID = 1L;
32+
private static final long serialVersionUID = 1L;
33+
protected transient boolean inheritedDatabase;
34+
protected transient String nodeSource;
3335

3436
public enum RESULT_STRATEGY {
3537
ANY, UNION
@@ -39,9 +41,6 @@ public enum QUORUM_TYPE {
3941
NONE, READ, WRITE, ALL
4042
}
4143

42-
protected transient boolean inheritedDatabase;
43-
protected transient String nodeSource;
44-
4544
/**
4645
* Constructor used from unmarshalling.
4746
*/
@@ -94,4 +93,8 @@ public boolean isRequireNodeOnline() {
9493
public boolean isRequiredOpenDatabase() {
9594
return true;
9695
}
96+
97+
public boolean isIdempotent() {
98+
return false;
99+
}
97100
}

server/src/main/java/com/orientechnologies/orient/server/distributed/task/OFixTxTask.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,19 @@
1515
*/
1616
package com.orientechnologies.orient.server.distributed.task;
1717

18+
import java.io.IOException;
19+
import java.io.ObjectInput;
20+
import java.io.ObjectOutput;
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
1824
import com.orientechnologies.orient.core.db.ODatabaseRecordThreadLocal;
1925
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
2026
import com.orientechnologies.orient.server.OServer;
2127
import com.orientechnologies.orient.server.distributed.ODistributedServerLog;
2228
import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION;
2329
import com.orientechnologies.orient.server.distributed.ODistributedServerManager;
2430

25-
import java.io.IOException;
26-
import java.io.ObjectInput;
27-
import java.io.ObjectOutput;
28-
import java.util.ArrayList;
29-
import java.util.List;
30-
3131
/**
3232
* Distributed create record task used for synchronization.
3333
*
@@ -36,12 +36,15 @@
3636
*/
3737
public class OFixTxTask extends OAbstractRemoteTask {
3838
private static final long serialVersionUID = 1L;
39-
4039
private List<OAbstractRemoteTask> tasks = new ArrayList<OAbstractRemoteTask>();
4140

4241
public OFixTxTask() {
4342
}
4443

44+
public List<OAbstractRemoteTask> getTasks() {
45+
return tasks;
46+
}
47+
4548
public void add(final OAbstractRemoteTask iTask) {
4649
tasks.add(iTask);
4750
}

0 commit comments

Comments
 (0)
0