8000 Fixed issue #2369 · githubcs/orientdb@ec7778d · GitHub
[go: up one dir, main page]

Skip to content

Commit ec7778d

Browse files
committed
1 parent e40fac4 commit ec7778d

File tree

5 files changed

+99
-135
lines changed

5 files changed

+99
-135
lines changed

commons/src/main/java/com/orientechnologies/common/concur/lock/OAdaptiveLock.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,4 +127,8 @@ public boolean isConcurrent() {
127127
public ReentrantLock getUnderlying() {
128128
return lock;
129129
}
130+
131+
public boolean isHeldByCurrentThread() {
132+
return lock.isHeldByCurrentThread();
133+
}
130134
}

enterprise/src/main/java/com/orientechnologies/orient/enterprise/channel/OChannel.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,6 @@
1515
*/
1616
package com.orientechnologies.orient.enterprise.channel;
1717

18-
import java.io.IOException;
19-
import java.io.InputStream;
20-
import java.io.OutputStream;
21-
import java.net.Socket;
22-
import java.util.concurrent.atomic.AtomicLong;
23-
2418
import com.orientechnologies.common.concur.lock.OAdaptiveLock;
2519
import com.orientechnologies.common.listener.OListenerManger;
2620
import com.orientechnologies.common.profiler.OAbstractProfiler.OProfilerHookValue;
@@ -31,13 +25,19 @@
3125
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
3226
import com.orientechnologies.orient.enterprise.channel.binary.OChannelListener;
3327

28+
import java.io.IOException;
29+
import java.io.InputStream;
30+
import java.io.OutputStream;
31+
import java.net.Socket;
32+
import java.util.concurrent.atomic.AtomicLong;
33+
3434
public abstract class OChannel extends OListenerManger<OChannelListener> {
3535
private static final OProfilerMBean PROFILER = Orient.instance().getProfiler();
3636
private static final AtomicLong metricGlobalTransmittedBytes = new AtomicLong();
3737
private static final AtomicLong metricGlobalReceivedBytes = new AtomicLong();
3838
private static final AtomicLong metricGlobalFlushes = new AtomicLong();
39-
protected final OAdaptiveLock lockRead = new OAdaptiveLock();
40-
protected final OAdaptiveLock lockWrite = new OAdaptiveLock();
39+
private final OAdaptiveLock lockRead = new OAdaptiveLock();
40+
private final OAdaptiveLock lockWrite = new OAdaptiveLock();
4141
public volatile Socket socket;
4242
public InputStream inStream;
4343
public OutputStream outStream;
@@ -98,6 +98,14 @@ public void flush() throws IOException {
9898
outStream.flush();
9999
}
100100

101+
public OAdaptiveLock getLockRead() {
102+
return lockRead;
103+
}
104+
105+
public OAdaptiveLock getLockWrite() {
106+
return lockWrite;
107+
}
108+
101109
public synchronized void close() {
102110
PROFILER.unregisterHookValue(profilerMetric + ".transmittedBytes");
103111
PROFILER.unregisterHookValue(profilerMetric + ".receivedBytes");

enterprise/src/main/java/com/orientechnologies/orient/enterprise/channel/binary/OChannelBinaryAsynchClient.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package com.orientechnologies.orient.enterprise.channel.binary;
1717

1818
import com.orientechnologies.common.concur.OTimeoutException;
19-
import com.orientechnologies.common.concur.lock.OAdaptiveLock;
2019
import com.orientechnologies.common.concur.lock.OLockException;
2120
import com.orientechnologies.common.exception.OException;
2221
import com.orientechnologies.common.log.OLogManager;
@@ -43,9 +42,9 @@
4342
import java.util.concurrent.locks.Condition;
4443

4544
public class OChannelBinaryAsynchClient extends OChannelBinary {
46-
protected final int socketTimeout; // IN MS
45+
protected final int socketTimeout; // IN MS
4746
protected final short srvProtocolVersion;
48-
private final Condition readCondition = lockRead.getUnderlying().newCondition();
47+
private final Condition readCondition = getLockRead().getUnderlying().newCondition();
4948
private final int maxUnreadResponses;
5049
private String serverURL;
5150
private volatile boolean channelRead = false;
@@ -170,7 +169,7 @@ public void beginResponse(final int iRequesterId, final long iTimeout) throws IO
170169
do {
171170
if (iTimeout <= 0)
172171
acquireReadLock();
173-
else if (!lockRead.tryAcquireLock(iTimeout, TimeUnit.MILLISECONDS))
172+
else if (!getLockRead().tryAcquireLock(iTimeout, TimeUnit.MILLISECONDS))
174173
throw new OTimeoutException("Cannot acquire read lock against channel: " + this);
175174

176175
if (!isConnected())
@@ -279,7 +278,7 @@ public void endResponse() {
279278

280279
@Override
281280
public void close() {
282-
if (lockRead.tryAcquireLock())
281+
if (getLockRead().tryAcquireLock())
283282
try {
284283
readCondition.signalAll();
285284
} finally {
@@ -327,14 +326,6 @@ public short getSrvProtocolVersion() {
327326
return srvProtocolVersion;
328327
}
329328

330-
public OAdaptiveLock getLockRead() {
331-
return lockRead;
332-
}
333-
334-
public OAdaptiveLock getLockWrite() {
335-
return lockWrite;
336-
}
337-
338329
public String getServerURL() {
339330
return serverURL;
340331
}

server/src/main/java/com/orientechnologies/orient/server/network/protocol/binary/OBinaryNetworkProtocolAbstract.java

Lines changed: 13 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package com.orientechnologies.orient.server.network.protocol.binary;
1717

18-
import com.orientechnologies.common.concur.lock.OLockException;
1918
import com.orientechnologies.common.exception.OException;
2019
import com.orientechnologies.common.log.OLogManager;
2120
import com.orientechnologies.orient.core.Orient;
@@ -54,7 +53,6 @@
5453

5554
import java.io.IOException;
5655
import java.net.Socket;
57-
import java.net.SocketException;
5856
import java.util.logging.Level;
5957

6058
/**
@@ -77,11 +75,17 @@ public OBinaryNetworkProtocolAbstract(final String iThreadName) {
7775
}
7876

7977
@Override
80-
public void config(final OServerNetworkListener iListener, final OServer iServer, final Socket iSocket, final OContextConfiguration iConfig) throws IOException {
78+
public void config(final OServerNetworkListener iListener, final OServer iServer, final Socket iSocket,
79+
final OContextConfiguration iConfig) throws IOException {
8180
server = iServer;
8281
channel = new OChannelBinaryServer(iSocket, iConfig);
8382
}
8483

84+
@Override
85+
public int getVersion() {
86+
return OChannelBinaryProtocol.CURRENT_PROTOCOL_VERSION;
87+
}
88+
8589
@Override
8690
public void shutdown() {
8791
channel.close();
@@ -115,6 +119,10 @@ else if (o instanceof ORecordId) {
115119
}
116120
}
117121

122+
public String getType() {
123+
return "binary";
124+
}
125+
118126
/**
119127
* Executes the request.
120128
*
@@ -123,6 +131,8 @@ else if (o instanceof ORecordId) {
123131
*/
124132
protected abstract boolean executeRequest() throws IOException;
125133

134+
protected abstract void sendError(final int iClientTxId, final Throwable t) throws IOException;
135+
126136
/**
127137
* Executed before the request.
128138
*
@@ -187,49 +197,6 @@ protected void sendOk(final int iClientTxId) throws IOException {
187197
channel.writeInt(iClientTxId);
188198
}
189199

190-
protected void sendError(final int iClientTxId, final Throwable t) throws IOException {
191-
channel.acquireWriteLock();
192-
try {
193-
194-
channel.writeByte(OChannelBinaryProtocol.RESPONSE_STATUS_ERROR);
195-
channel.writeInt(iClientTxId);
196-
197-
Throwable current;
198-
if (t instanceof OLockException && t.getCause() instanceof ODatabaseException)
199-
// BYPASS THE DB POOL EXCEPTION TO PROPAGATE THE RIGHT SECURITY ONE
200-
current = t.getCause();
201-
else
202-
current = t;
203-
204-
while (current != null) {
205-
// MORE DETAILS ARE COMING AS EXCEPTION
206-
channel.writeByte((byte) 1);
207-
208-
channel.writeString(current.getClass().getName());
209-
channel.writeString(current != null ? current.getMessage() : null);
210-
211-
current = current.getCause();
212-
}
213-
channel.writeByte((byte) 0);
214-
215-
channel.flush();
216-
217-
if (OLogManager.instance().isLevelEnabled(logClientExceptions)) {
218-
if (logClientFullStackTrace)
219-
OLogManager.instance().log(this, logClientExceptions, "Sent run-time exception to the client %s: %s", t,
220-
channel.socket.getRemoteSocketAddress(), t.toString());
221-
else
222-
OLogManager.instance().log(this, logClientExceptions, "Sent run-time exception to the client %s: %s", null,
223-
channel.socket.getRemoteSocketAddress(), t.toString());
224-
}
225-
} catch (Exception e) {
226-
if (e instanceof SocketException)
227-
shutdown();
228-
} finally {
229-
channel.releaseWriteLock();
230-
}
231-
}
232-
233200
protected void checkStorageExistence(final String iDatabaseName) {
234201
for (OStorage stg : Orient.instance().getStorages()) {
235202
if (!(stg instanceof OStorageProxy) && stg.getName().equalsIgnoreCase(iDatabaseName) && stg.exists())

server/src/main/java/com/orientechnologies/orient/server/network/protocol/binary/ONetworkProtocolBinary.java

Lines changed: 62 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -112,26 +112,22 @@ public ONetworkProtocolBinary(final String iThreadName) {
112112
}
113113

114114
@Override
115-
public void config(final OServerNetworkListener iListener, final OServer iServer, final Socket iSocket, final OContextConfiguration iConfig) throws IOException {
115+
public void config(final OServerNetworkListener iListener, final OServer iServer, final Socket iSocket,
116+
final OContextConfiguration iConfig) throws IOException {
116117
// CREATE THE CLIENT CONNECTION
117118
connection = OClientConnectionManager.instance().connect(this);
118119

119120
super.config(iListener, iServer, iSocket, iConfig);
120121

121122
// SEND PROTOCOL VERSION
122-
channel.writeShort((short) OChannelBinaryProtocol.CURRENT_PROTOCOL_VERSION);
123+
channel.writeShort((short) getVersion());
123124

124125
channel.flush();
125126
start();
126127

127128
setName("OrientDB <- BinaryClient (" + iSocket.getRemoteSocketAddress() + ")");
128129
}
129130

130-
@Override
131-
public int getVersion() {
132-
return OChannelBinaryProtocol.CURRENT_PROTOCOL_VERSION;
133-
}
134-
135131
@Override
136132
public void startup() {
137133
super.startup();
@@ -151,10 +147,6 @@ public void shutdown() {
151147
OClientConnectionManager.instance().disconnect(connection);
152148
}
153149

154-
public String getType() {
155-
return "binary";
156-
}
157-
158150
@Override
159151
protected void onBeforeRequest() throws IOException {
160152
waitNodeIsOnline();
@@ -686,6 +678,65 @@ protected void connect() throws IOException {
686678
}
687679
}
688680

681+
protected void sendError(final int iClientTxId, final Throwable t) throws IOException {
682+
channel.acquireWriteLock();
683+
try {
684+
685+
channel.writeByte(OChannelBinaryProtocol.RESPONSE_STATUS_ERROR);
686+
channel.writeInt(iClientTxId);
687+
688+
Throwable current;
689+
if (t instanceof OLockException && t.getCause() instanceof ODatabaseException)
690+
// BYPASS THE DB POOL EXCEPTION TO PROPAGATE THE RIGHT SECURITY ONE
691+
current = t.getCause();
692+
else
693+
current = t;
694+
695+
final Throwable original = current;
696+
while (current != null) {
697+
// MORE DETAILS ARE COMING AS EXCEPTION
698+
channel.writeByte((byte) 1);
699+
700+
channel.writeString(current.getClass().getName());
701+
channel.writeString(current.getMessage());
702+
703+
current = current.getCause();
704+
}
705+
channel.writeByte((byte) 0);
706+
707+
if (connection != null && connection.data.protocolVersion >= 19) {
708+
final OMemoryStream memoryStream = new OMemoryStream();
709+
final ObjectOutputStream objectOutputStream = new ObjectOutputStream(memoryStream);
710+
711+
objectOutputStream.writeObject(original);
712+
objectOutputStream.flush();
713+
714+
final byte[] result = memoryStream.toByteArray();
715+
objectOutputStream.close();
716+
717+
channel.writeBytes(result);
718+
}
719+
720+
channel.flush();
721+
722+
if (OLogManager.instance().isLevelEnabled(logClientExceptions)) {
723+
if (logClientFullStackTrace)
724+
OLogManager.instance().log(this, logClientExceptions, "Sent run-time exception to the client %s: %s", t,
725+
channel.socket.getRemoteSocketAddress(), t.toString());
726+
else
727+
OLogManager.instance().log(this, logClientExceptions, "Sent run-time exception to the 10000 client %s: %s", null,
728+
channel.socket.getRemoteSocketAddress(), t.toString());
729+
}
730+
} catch (Exception e) {
731+
if (e instanceof SocketException)
732+
shutdown();
733+
} finally {
734+
if (channel.getLockWrite().isHeldByCurrentThread())
735+
// NO EXCEPTION SO FAR: UNLOCK IT
736+
channel.releaseWriteLock();
737+
}
738+
}
739+
689740
protected void shutdownConnection() throws IOException {
690741
setDataCommandInfo("Shutdowning");
691742

@@ -1435,63 +1486,6 @@ protected void sendOk(final int iClientTxId) throws IOException {
14351486
channel.writeInt(iClientTxId);
14361487
}
14371488

1438-
protected void sendError(final int iClientTxId, final Throwable t) throws IOException {
1439-
channel.acquireWriteLock();
1440-
try {
1441-
1442-
channel.writeByte(OChannelBinaryProtocol.RESPONSE_STATUS_ERROR);
1443-
channel.writeInt(iClientTxId);
1444-
1445-
Throwable current;
1446-
if (t instanceof OLockException && t.getCause() instanceof ODatabaseException)
1447-
// BYPASS THE DB POOL EXCEPTION TO PROPAGATE THE RIGHT SECURITY ONE
1448-
current = t.getCause();
1449-
else
1450-
current = t;
1451-
1452-
final Throwable original = current;
1453-
while (current != null) {
1454-
// MORE DETAILS ARE COMING AS EXCEPTION
1455-
channel.writeByte((byte) 1);
1456-
1457-
channel.writeString(current.getClass().getName());
1458-
channel.writeString(current.getMessage());
1459-
1460-
current = current.getCause();
1461-
}
1462-
channel.writeByte((byte) 0);
1463-
1464-
if (connection != null && connection.data.protocolVersion >= 19) {
1465-
final OMemoryStream memoryStream = new OMemoryStream();
1466-
final ObjectOutputStream objectOutputStream = new ObjectOutputStream(memoryStream);
1467-
1468-
objectOutputStream.writeObject(original);
1469-
objectOutputStream.flush();
1470-
1471-
final byte[] result = memoryStream.toByteArray();
1472-
objectOutputStream.close();
1473-
1474-
channel.writeBytes(result);
1475-
}
1476-
1477-
channel.flush();
1478-
1479-
if (OLogManager.instance().isLevelEnabled(logClientExceptions)) {
1480-
if (logClientFullStackTrace)
1481-
OLogManager.instance().log(this, logClientExceptions, "Sent run-time exception to the client %s: %s", t,
1482-
channel.socket.getRemoteSocketAddress(), t.toString());
1483-
else
1484-
OLogManager.instance().log(this, logClientExceptions, "Sent run-time exception to the client %s: %s", null,
1485-
channel.socket.getRemoteSocketAddress(), t.toString());
1486-
}
1487-
} catch (Exception e) {
1488-
if (e instanceof SocketException)
1489-
shutdown();
1490-
} finally {
1491-
channel.releaseWriteLock();
1492-
}
1493-
}
1494-
14951489
@Override
14961490
protected void handleConnectionError(final OChannelBinaryServer iChannel, final Throwable e) {
14971491
super.handleConnectionError(channel, e);

0 commit comments

Comments
 (0)
0