1
+ /*
2
+ * Licensed under the Apache License, Version 2.0 (the "License");
3
+ * you may not use this file except in compliance with the License.
4
+ * You may obtain a copy of the License at
5
+ *
6
+ * http://www.apache.org/licenses/LICENSE-2.0
7
+ *
8
+ * Unless required by applicable law or agreed to in writing, software
9
+ * distributed under the License is distributed on an "AS IS" BASIS,
10
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
+ * See the License for the specific language governing permissions and
12
+ * limitations under the License.
13
+ */
14
+
15
+ package com .github .pgasync .impl ;
16
+
17
+ import static java .lang .Long .MIN_VALUE ;
18
+ import static java .lang .System .currentTimeMillis ;
19
+ import static java .lang .System .out ;
20
+ import static java .util .concurrent .TimeUnit .MILLISECONDS ;
21
+ import static org .junit .runners .MethodSorters .NAME_ASCENDING ;
22
+ import java .util .ArrayList ;
23
+ import java .util .Collection ;
24
+ import java .util .OptionalLong ;
25
+ import java .util .Queue ;
26
+ import java .util .SortedMap ;
27
+ import java .util .TreeMap ;
28
+ import java .util .concurrent .ArrayBlockingQueue ;
29
+ import java .util .concurrent .BrokenBarrierException ;
30
+ import java .util .concurrent .Callable ;
31
+ import java .util .concurrent .CyclicBarrier ;
32
+ import java .util .concurrent .Exchanger ;
33
+ import java .util .concurrent .ExecutorService ;
34
+ import java .util .concurrent .Executors ;
35
+ import java .util .concurrent .LinkedBlockingQueue ;
36
+ import java .util .function .Consumer ;
37
+ import org .junit .After ;
38
+ import org .junit .AfterClass ;
39
+ import org .junit .FixMethodOrder ;
40
+ import org .junit .Test ;
41
+ import org .junit .runner .RunWith ;
42
+ import org .junit .runners .Parameterized ;
43
+ import org .junit .runners .Parameterized .Parameters ;
44
+ import com .github .pgasync .Connection ;
45
+ import com .github .pgasync .ConnectionPool ;
46
+
47
+ @ RunWith (Parameterized .class )
48
+ @ FixMethodOrder (NAME_ASCENDING )
49
+ public class PerformanceTest {
50
+
51
+ @ Parameters (name = "{index}: poolSize={0}, threads={1}, pipeline={2}" )
52
+ public static Iterable <Object []> data () {
53
+ results = new TreeMap <>();
54
+ ArrayList <Object []> testData = new ArrayList <>();
55
+ for (int poolSize = 1 ; poolSize <= 4 ; poolSize *= 2 ) {
56
+ results .putIfAbsent (key (poolSize , true ), new TreeMap <>());
57
+ results .putIfAbsent (key (poolSize , false ), new TreeMap <>());
58
+ for (int threads = 1 ; threads <= 16 ; threads *= 2 ) {
59
+ testData .add (new Object [] { poolSize , threads , true });
60
+ testData .add (new Object [] { poolSize , threads , false });
61
+ }
62
+ }
63
+ return testData ;
64
+ }
65
+
66
+ private static String key (int poolSize , boolean pipeline ) {
67
+ return poolSize + " conn" + (pipeline ? "/pipeline" : "" );
68
+ }
69
+
70
+ private static final int batchSize = 100 ;
71
+ private static final int repeats = 5 ;
72
+ private static final Consumer <Throwable > err = t -> {t .printStackTrace ();
73
+ throw new AssertionError (t );
74
+ };
75
+ private static SortedMap <String , SortedMap <Integer , Long >> results = new TreeMap <>();
76
+ private final int poolSize ;
77
+ private final int numThreads ;
78
+ private final boolean pipeline ;
79
+ private final ConnectionPool dbPool ;
80
+ private final ExecutorService threadPool ;
81
+
82
+ public PerformanceTest (int poolSize , int numThreads , boolean pipeline ) {
83
+ this .poolSize = poolSize ;
84
+ this .numThreads = numThreads ;
85
+ this .pipeline = pipeline ;
86
+ dbPool = DatabaseRule .createPoolBuilder (poolSize ).pipeline (pipeline ).validationQuery (null ).build ();
87
+ threadPool = Executors .newFixedThreadPool (numThreads );
88
+ }
89
+
90
+ @ After
91
+ public void close () {
92
+ threadPool .shutdownNow ();
93
+ dbPool .close ();
94
+ }
95
+
96
+ @ Test (timeout = 1000 )
97
+ public void t1_preAllocatePool () throws InterruptedException {
98
+ Queue <Connection > connections = new ArrayBlockingQueue <>(poolSize );
99
+ for (int i = 0 ; i < poolSize ; ++i ) {
100
+ dbPool .getConnection ().subscribe (connections ::add );
101
+ }
102
+ while (connections .size () < poolSize ) {
103
+ MILLISECONDS .sleep (5 );
104
+ }
105
+ connections .forEach (c -> dbPool .release (c ));
106
+ }
107
+
108
+ @ Test
109
+ public void t3_run () throws Exception {
110
+ Collection <Callable <Long >> tasks = new ArrayList <>();
111
+ for (int i = 0 ; i < batchSize ; ++i ) {
112
+ tasks .add (new Callable <Long >() {
113
+ final Exchanger <Long > swap = new Exchanger <>();
114
+
115
+ @ Override
116
+ public Long call () throws Exception {
117
+ dbPool .query ("select 42" , r -> {
118
+ try {
119
+ swap .exchange (currentTimeMillis ());
120
+ } catch (Exception e ) {
121
+ err .accept (e );
122
+ }
123
+ }, err );
124
+ return swap .exchange (null );
125
+ }
126
+ });
127
+ }
128
+
129
+ long minTime = Long .MAX_VALUE ;
130
+
131
+ for (int r = 0 ; r < repeats ; ++r ) {
132
+ System .gc ();
133
+ MILLISECONDS .sleep (300 );
134
+
10000
td>135
+ final CyclicBarrier barrier = new CyclicBarrier (numThreads + 1 );
136
+
137
+ final Queue <Callable <Long >> taskQueue = new LinkedBlockingQueue <>(tasks );
138
+ final Queue <Long > endTimes = new ArrayBlockingQueue <>(batchSize );
139
+
140
+ Thread [] threads = new Thread [numThreads ];
141
+ for (int i = 0 ; i < numThreads ; ++i ) {
142
+ threads [i ] = new Thread ("tester" + i ) {
143
+ public void run () {
144
+ try {
145
+ barrier .await ();
146
+ } catch (InterruptedException | BrokenBarrierException e ) {
147
+ e .printStackTrace ();
148
+ }
149
+
150
+ Callable <Long > c ;
151
+ try {
152
+ while ((c = taskQueue .poll ()) != null ) {
153
+ endTimes .add (c .call ());
154
+ }
155
+ } catch (Exception e ) {
156
+ e .printStackTrace ();
157
+ }
158
+ }
159
+ };
160
+ threads [i ].start ();
161
+ }
162
+
163
+ long start = currentTimeMillis ();
164
+ barrier .await ();
165
+
166
+ for (Thread thread : threads ) {
167
+ thread .join ();
168
+ }
169
+
170
+ OptionalLong end = endTimes .stream ().mapToLong (f -> f ).max ();
171
+ long time = end .getAsLong () - start ;
172
+ minTime = Math .min (minTime , time );
173
+ }
174
+
175
+ results .get (key (poolSize , pipeline )).put (numThreads , minTime );
176
+
177
+ out .printf ("%d%s,%2d,%4.3f%n" , poolSize , pipeline ? "p" : "n" , numThreads , minTime / 1000.0 );
178
+ }
179
+
180
+ @ AfterClass
181
+ public static void printCsv () {
182
+ out .print ("threads" );
183
+ results .keySet ().forEach (i -> out .printf (",%s" , i ));
184
+ out .println ();
185
+
186
+ results .values ().iterator ().next ().keySet ().forEach (threads -> {
187
+ out .print (threads );
188
+ results .keySet ().forEach (conns -> {
189
+ long millis = results .get (conns ).get (threads );
190
+ double rps = batchSize * 1000 / (double ) millis ;
191
+ out .printf (",%f" , rps );
192
+ });
193
+ out .println ();
194
+ });
195
+ }
196
+ }
0 commit comments