8000
We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
2 parents 0a5a666 + 61b14c8 commit 2e6b899Copy full SHA for 2e6b899
src/main/java/io/reactivesocket/internal/ThreadSafeFramePool.java
@@ -0,0 +1,109 @@
1
+/**
2
+ * Copyright 2015 Netflix, Inc.
3
+ *
4
+ * Licensed under the Apache License, Version 2.0 (the "License");
5
+ * you may not use this file except in compliance with the License.
6
+ * You may obtain a copy of the License at
7
8
+ * http://www.apache.org/licenses/LICENSE-2.0
9
10
+ * Unless required by applicable law or agreed to in writing, software
11
+ * distributed under the License is distributed on an "AS IS" BASIS,
12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ * See the License for the specific language governing permissions and
14
+ * limitations under the License.
15
+ */
16
+package io.reactivesocket.internal;
17
+
18
+import io.reactivesocket.Frame;
19
+import uk.co.real_logic.agrona.MutableDirectBuffer;
20
+import uk.co.real_logic.agrona.concurrent.OneToOneConcurrentArrayQueue;
21
+import uk.co.real_logic.agrona.concurrent.UnsafeBuffer;
22
23
+import java.nio.ByteBuffer;
24
25
+public class ThreadSafeFramePool implements FramePool
26
+{
27
+ private static final int MAX_CACHED_FRAMES = 16;
28
29
+ private static final OneToOneConcurrentArrayQueue<Frame> FRAME_QUEUE =
30
+ new OneToOneConcurrentArrayQueue<>(MAX_CACHED_FRAMES);
31
32
+ private static final OneToOneConcurrentArrayQueue<MutableDirectBuffer> DIRECTBUFFER_QUEUE =
33
34
35
+ public Frame acquireFrame(int size)
36
+ {
37
+ final MutableDirectBuffer directBuffer = acquireMutableDirectBuffer(size);
38
39
+ Frame frame = pollFrame();
40
+ if (null == frame)
41
42
+ frame = Frame.allocate(directBuffer);
43
+ }
44
45
+ return frame;
46
47
48
+ public Frame acquireFrame(ByteBuffer byteBuffer)
49
50
+ return Frame.allocate(new UnsafeBuffer(byteBuffer));
51
52
53
+ public Frame acquireFrame(MutableDirectBuffer mutableDirectBuffer)
54
55
56
57
58
+ frame = Frame.allocate(mutableDirectBuffer);
59
60
61
62
63
64
+ public MutableDirectBuffer acquireMutableDirectBuffer(ByteBuffer byteBuffer)
65
66
+ MutableDirectBuffer directBuffer = pollMutableDirectBuffer();
67
+ if (null == directBuffer)
68
69
+ directBuffer = new UnsafeBuffer(byteBuffer);
70
71
72
+ return directBuffer;
73
74
75
+ public MutableDirectBuffer acquireMutableDirectBuffer(int size)
76
77
+ UnsafeBuffer directBuffer = (UnsafeBuffer)pollMutableDirectBuffer();
78
+ if (null == directBuffer || directBuffer.byteBuffer().capacity() < size)
79
80
+ directBuffer = new UnsafeBuffer(ByteBuffer.allocate(size));
81
82
+ else
83
84
+ directBuffer.byteBuffer().limit(size).position(0);
85
86
87
88
89
90
+ public synchronized void release(Frame frame)
91
92
+ FRAME_QUEUE.offer(frame);
93
94
95
+ public synchronized void release(MutableDirectBuffer mutableDirectBuffer)
96
97
+ DIRECTBUFFER_QUEUE.offer(mutableDirectBuffer);
98
99
100
+ private synchronized Frame pollFrame()
101
102
+ return FRAME_QUEUE.poll();
103
104
105
+ private synchronized MutableDirectBuffer pollMutableDirectBuffer()
106
107
+ return DIRECTBUFFER_QUEUE.poll();
108
109
+}