8000 Added perf test · emrul/postgres-async-driver@2361404 · GitHub
[go: up one dir, main page]

Skip to content

Commit 2361404

Browse files
committed
Added perf test
1 parent 769f569 commit 2361404

File tree

1 file changed

+196
-0
lines changed

1 file changed

+196
-0
lines changed
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package com.github.pgasync.impl;
16+
17+
import static java.lang.Long.MIN_VALUE;
18+
import static java.lang.System.currentTimeMillis;
19+
import static java.lang.System.out;
20+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
21+
import static org.junit.runners.MethodSorters.NAME_ASCENDING;
22+
import java.util.ArrayList;
23+
import java.util.Collection;
24+
import java.util.OptionalLong;
25+
import java.util.Queue;
26+
import java.util.SortedMap;
27+
import java.util.TreeMap;
28+
import java.util.concurrent.ArrayBlockingQueue;
29+
import java.util.concurrent.BrokenBarrierException;
30+
import java.util.concurrent.Callable;
31+
import java.util.concurrent.CyclicBarrier;
32+
import java.util.concurrent.Exchanger;
33+
import java.util.concurrent.ExecutorService;
34+
import java.util.concurrent.Executors;
35+
import java.util.concurrent.LinkedBlockingQueue;
36+
import java.util.function.Consumer;
37+
import org.junit.After;
38+
import org.junit.AfterClass;
39+
import org.junit.FixMethodOrder;
40+
import org.junit.Test;
41+
import org.junit.runner.RunWith;
42+
import org.junit.runners.Parameterized;
43+
import org.junit.runners.Parameterized.Parameters;
44+
import com.github.pgasync.Connection;
45+
import com.github.pgasync.ConnectionPool;
46+
47+
@RunWith(Parameterized.class)
48+
@FixMethodOrder(NAME_ASCENDING)
49+
public class PerformanceTest {
50+
51+
@Parameters(name = "{index}: poolSize={0}, threads={1}, pipeline={2}")
52+
public static Iterable<Object[]> data() {
53+
results = new TreeMap<>();
54+
ArrayList<Object[]> testData = new ArrayList<>();
55+
for (int poolSize = 1; poolSize <= 4; poolSize *= 2) {
56+
results.putIfAbsent(key(poolSize, true), new TreeMap<>());
57+
results.putIfAbsent(key(poolSize, false), new TreeMap<>());
58+
for (int threads = 1; threads <= 16; threads *= 2) {
59+
testData.add(new Object[] { poolSize, threads, true });
60+
testData.add(new Object[] { poolSize, threads, false });
61+
}
62+
}
63+
return testData;
64+
}
65+
66+
private static String key(int poolSize, boolean pipeline) {
67+
return poolSize + " conn" + (pipeline ? "/pipeline" : "");
68+
}
69+
70+
private static final int batchSize = 100;
71+
private static final int repeats = 5;
72+
private static final Consumer<Throwable> err = t -> {t.printStackTrace();
73+
throw new AssertionError(t);
74+
};
75+
private static SortedMap<String, SortedMap<Integer, Long>> results = new TreeMap<>();
76+
private final int poolSize;
77+
private final int numThreads;
78+
private final boolean pipeline;
79+
private final ConnectionPool dbPool;
80+
private final ExecutorService threadPool;
81+
82+
public PerformanceTest(int poolSize, int numThreads, boolean pipeline) {
83+
this.poolSize = poolSize;
84+
this.numThreads = numThreads;
85+
this.pipeline = pipeline;
86+
dbPool = DatabaseRule.createPoolBuilder(poolSize).pipeline(pipeline).validationQuery(null).build();
87+
threadPool = Executors.newFixedThreadPool(numThreads);
88+
}
89+
90+
@After
91+
public void close() {
92+
threadPool.shutdownNow();
93+
dbPool.close();
94+
}
95+
96+
@Test(timeout = 1000)
97+
public void t1_preAllocatePool() throws InterruptedException {
98+
Queue<Connection> connections = new ArrayBlockingQueue<>(poolSize);
99+
for (int i = 0; i < poolSize; ++i) {
100+
dbPool.getConnection().subscribe(connections::add);
101+
}
102+
while (connections.size() < poolSize) {
103+
MILLISECONDS.sleep(5);
104+
}
105+
connections.forEach(c -> dbPool.release(c));
106+
}
107+
108+
@Test
109+
public void t3_run() throws Exception {
110+
Collection<Callable<Long>> tasks = new ArrayList<>();
111+
for (int i = 0; i < batchSize; ++i) {
112+
tasks.add(new Callable<Long>() {
113+
final Exchanger<Long> swap = new Exchanger<>();
114+
115+
@Override
116+
public Long call() throws Exception {
117+
dbPool.query("select 42", r -> {
118+
try {
119+
swap.exchange(currentTimeMillis());
120+
} catch (Exception e) {
121+
err.accept(e);
122+
}
123+
}, err);
124+
return swap.exchange(null);
125+
}
126+
});
127+
}
128+
129+
long minTime = Long.MAX_VALUE;
130+
131+
for (int r = 0; r < repeats; ++r) {
132+
System.gc();
133+
MILLISECONDS.sleep(300);
134+
135+
final CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
136+
137+
final Queue<Callable<Long>> taskQueue = new LinkedBlockingQueue<>(tasks);
138+
final Queue<Long> endTimes = new ArrayBlockingQueue<>(batchSize);
139+
140+
Thread[] threads = new Thread[numThreads];
141+
for (int i = 0; i < numThreads; ++i) {
142+
threads[i] = new Thread("tester" + i) {
143+
public void run() {
144+
try {
145+
barrier.await();
146+
} catch (InterruptedException | BrokenBarrierException e) {
147+
e.printStackTrace();
148+
}
149+
150+
Callable<Long> c;
151+
try {
152+
while ((c = taskQueue.poll()) != null) {
153+
endTimes.add(c.call());
154+
}
155+
} catch (Exception e) {
156+
e.printStackTrace();
157+
}
158+
}
159+
};
160+
threads[i].start();
161+
}
162+
163+
long start = currentTimeMillis();
164+
barrier.await();
165+
166+
for (Thread thread : threads) {
167+
thread.join();
168+
}
169+
170+
OptionalLong end = endTimes.stream().mapToLong(f -> f).max();
171+
long time = end.getAsLong() - start;
172+
minTime = Math.min(minTime, time);
173+
}
174+
175+
results.get(key(poolSize, pipeline)).put(numThreads, minTime);
176+
177+
out.printf("%d%s,%2d,%4.3f%n", poolSize, pipeline ? "p" : "n", numThreads, minTime / 1000.0);
178+
}
179+
180+
@AfterClass
181+
public static void printCsv() {
182+
out.print("threads");
183+
results.keySet().forEach(i -> out.printf(",%s", i));
184+
out.println();
185+
186+
results.values().iterator().next().keySet().forEach(threads -> {
187+
out.print(threads);
188+
results.keySet().forEach(conns -> {
189+
long millis = results.get(conns).get(threads);
190+
double rps = batchSize * 1000 / (double) millis;
191+
out.printf(",%f", rps);
192+
});
193+
out.println();
194+
});
195+
}
196+
}

0 commit comments

Comments
 (0)
0