8000 Start adding EventEmitter-based methods · rasayana/postgres-async-driver@7c4bb27 · GitHub
[go: up one dir, main page]

Skip to content

Commit 7c4bb27

Browse files
committed
Start adding EventEmitter-based methods
1 parent 5e68b38 commit 7c4bb27

File tree

4 files changed

+114
-4
lines changed

4 files changed

+114
-4
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package com.github.pgasync.impl;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
import java.util.function.Consumer;
6+
7+
import static java.util.Collections.unmodifiableMap;
8+
9+
public class EventEmitter<T> {
10+
11+
private final Consumer<Emitter<T>> onListen;
12+
13+
private EventEmitter(Consumer<Emitter<T>> onListen) {
14+
this.onListen = onListen;
15+
}
16+
17+
public static <X> EventEmitter<X> create(Consumer<Emitter<X>> onListen) {
18+
return new EventEmitter<>(onListen);
19+
}
20+
21+
public <X extends T, E extends Throwable> void on(Class<X> match1, Consumer<X> consumer1,
22+
Class<E> error, Consumer<E> onError) {
23+
Map<Class<?>,Consumer<?>> matches = new HashMap<>();
24+
matches.put(match1, consumer1);
25+
matches. 8000 put(error, onError);
26+
onListen.accept(new Emitter<>(unmodifiableMap(matches)));
27+
}
28+
29+
public <X extends T, Y extends T, E extends Throwable> void on(Class<X> match1, Consumer<X> consumer1,
30+
Class<Y> match2, Consumer<Y> consumer2,
31+
Class<E> error, Consumer<E> onError) {
32+
Map<Class<?>,Consumer<?>> matches = new HashMap<>();
33+
matches.put(match1, consumer1);
34+
matches.put(match2, consumer2);
35+
matches.put(error, onError);
36+
onListen.accept(new Emitter<>(unmodifiableMap(matches)));
37+
}
38+
39+
public static class Emitter<T> {
40+
41+
private final Map<Class<?>,Consumer<?>> consumers;
42+
43+
private Emitter(Map<Class<?>, Consumer<?>> consumers) {
44+
this.consumers = consumers;
45+
}
46+
47+
@SuppressWarnings("unchecked")
48+
public <X extends T> void emit(Class<X> type, X event) {
49+
Consumer<X> consumer = (Consumer<X>) consumers.get(type);
50+
consumer.accept(event);
51+
}
52+
53+
@SuppressWarnings("unchecked")
54+
public void error(Throwable e) {
55+
Consumer<Throwable> consumer = (Consumer<Throwable>) consumers.get(Throwable.class);
56+
consumer.accept(e);
57+
}
58+
}
59+
60+
}

src/main/java/com/github/pgasync/impl/PgConnection.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,11 @@
1515
package com.github.pgasync.impl;
1616

1717
import com.github.pgasync.*;
18+
import com.github.pgasync.impl.EventEmitter.Emitter;
1819
import com.github.pgasync.impl.conversion.DataConverter;
1920
import com.github.pgasync.impl.message.*;
2021

21-
import java.util.ArrayList;
22-
import java.util.LinkedHashMap;
23-
import java.util.List;
24-
import java.util.Map;
22+
import java.util.*;
2523
import java.util.function.Consumer;
2624
import java.util.function.Function;
2725
import java.util.function.Supplier;
@@ -47,6 +45,41 @@ public PgConnection(PgProtocolStream stream, DataConverter dataConverter) {
4745
this.dataConverter = dataConverter;
4846
}
4947

48+
EventEmitter<Connection> connect(String username, String password, String database) {
49+
return EventEmitter.create(emitter ->
50+
stream.connect(new StartupMessage(username, database)).on(
51+
Authentication.class, auth -> authenticate(username, password, auth, emitter),
52+
ReadyForQuery.class, __ -> emitter.emit(Connection.class, this),
53+
Throwable.class, emitter::error));
54+
}
55+
56+
private void authenticate(String username, String password, Authentication authentication, Emitter<Connection> emitter) {
57+
stream.send(new PasswordMessage(username, password, authentication.getMd5Salt())).on(
58+
ReadyForQuery.class, __ -> emitter.emit(Connection.class, this),
59+
Throwable.class, emitter::error);
60+
}
61+
62+
public EventEmitter<Row> query(String sql) {
63+
return EventEmitter.create(emitter -> {
64+
65+
Map<String,PgColumn> columns = new HashMap<>();
66+
67+
stream.send(new Query(sql)).on(
68+
RowDescription.class, desc -> columns.putAll(getColumns(desc.getColumns())),
69+
DataRow.class, data -> emitter.emit(Row.class, new PgRow(data, columns, dataConverter)),
70+
Throwable.class, emitter::error
71+
);
72+
});
73+
}
74+
75+
private Map<String,PgColumn> getColumns(ColumnDescription[] descriptions) {
76+
Map<String,PgColumn> columns = new HashMap<>();
77+
for (int i = 0; i < descriptions.length; i++) {
78+
columns.put(descriptions[i].getName().toUpperCase(), new PgColumn(i, descriptions[i].getType()));
79+
}
80+
return columns;
81+
}
82+
5083
void connect(String username, String password, String database,
5184
Consumer<Connection> onConnected, Consumer<Throwable> onError) {
5285

src/main/java/com/github/pgasync/impl/PgProtocolStream.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@
2727
*/
2828
public interface PgProtocolStream {
2929

30+
EventEmitter<Message> connect(StartupMessage startup);
3031
void connect(StartupMessage startup, Consumer<List<Message>> replyTo);
3132

33+
EventEmitter<Message> send(Message message);
3234
void send(Message message, Consumer<List<Message>> replyTo);
3335

3436
void send(List<Message> message, Consumer<List<Message>> replyTo);

src/main/java/com/github/pgasync/impl/netty/NettyPgProtocolStream.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package com.github.pgasync.impl.netty;
1616

1717
import com.github.pgasync.SqlException;
18+
import com.github.pgasync.impl.EventEmitter;
1819
import com.github.pgasync.impl.PgProtocolStream;
1920
import com.github.pgasync.impl.message.*;
2021
import io.netty.bootstrap.Bootstrap;
@@ -58,6 +59,20 @@ public NettyPgProtocolStream(EventLoopGroup group, SocketAddress address, boolea
5859
this.onReceivers = pipeline ? new LinkedBlockingDeque<>() : new ArrayBlockingQueue<>(1);
5960
}
6061

62+
@Override
63+
public EventEmitter<Message> connect(StartupMessage startup) {
64+
return EventEmitter.create(emitter -> {
65+
66+
});
67+
}
68+
69+
@Override
70+
public EventEmitter<Message> send(Message message) {
71+
return EventEmitter.create(emitter -> {
72+
73+
});
74+
}
75+
6176
@Override
6277
public void connect(StartupMessage startup, Consumer<List<Message>> replyTo) {
6378
new Bootstrap()

0 commit comments

Comments
 (0)
0