8000 Don't delay errors on authentication. · rasayana/postgres-async-driver@48ea507 · GitHub
[go: up one dir, main page]

Skip to content

Commit 48ea507

Browse files
committed
Don't delay errors on authentication.
1 parent e1dc438 commit 48ea507

File tree

5 files changed

+33
-8
lines changed

5 files changed

+33
-8
lines changed

src/main/java/com/github/pgasync/impl/PgConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ Observable<Connection> connect(String username, String password, String database
5858

5959
Observable<? extends Message> authenticate(String username, String password, Message message) {
6060
return message instanceof Authentication
61-
? stream.send(new PasswordMessage(username, password, ((Authentication) message).getMd5Salt()))
61+
? stream.authenticate(new PasswordMessage(username, password, ((Authentication) message).getMd5Salt()))
6262
: Observable.just(message);
6363
}
6464

src/main/java/com/github/pgasync/impl/PgProtocolStream.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package com.github.pgasync.impl;
1616

1717
import com.github.pgasync.impl.message.Message;
18+
import com.github.pgasync.impl.message.PasswordMessage;
1819
import com.github.pgasync.impl.message.StartupMessage;
1920
import rx.Observable;
2021

@@ -29,6 +30,8 @@ public interface PgProtocolStream {
2930

3031
Observable<Message> connect(StartupMessage startup);
3132

33+
Observable<Message> authenticate(PasswordMessage password);
34+
3235
Observable<Message> send(Message... messages);
3336

3437
boolean isConnected();

src/main/java/com/github/pgasync/impl/message/CommandComplete.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,9 @@ public CommandComplete(String tag) {
3333
public int getUpdatedRows() {
3434
return updatedRows;
3535
}
36+
37+
@Override
38+
public String toString() {
39+
return String.format("CommandComplete(updatedRows=%d)", updatedRows);
40+
}
3641
}

src/main/java/com/github/pgasync/impl/netty/NettyPgProtocolStream.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import java.util.concurrent.ConcurrentHashMap;
3939
import java.util.concurrent.ConcurrentMap;
4040
import java.util.concurrent.LinkedBlockingDeque;
41-
import java.util.concurrent.atomic.AtomicReference;
4241
import java.util.function.Consumer;
4342

4443
/**
@@ -82,7 +81,17 @@ public Observable<Message> connect(StartupMessage startup) {
8281
.connect(address)
8382
.addListener(onError);
8483

85-
}).lift(throwErrorResponses());
84+
}).flatMap(this::throwErrorResponses);
85+
}
86+
87+
@Override
88+
public Observable<Message> authenticate(PasswordMessage password) {
89+
return Observable.create(subscriber -> {
90+
91+
pushSubscriber(subscriber);
92+
write(password);
93+
94+
}).flatMap(this::throwErrorResponses);
8695
}
8796

8897
@Override
@@ -97,7 +106,7 @@ public Observable<Message> send(Message... messages) {
97106
pushSubscriber(subscriber);
98107
write(messages);
99108

100-
}).lift(throwErrorResponses());
109+
}).lift(throwErrorResponsesOnComplete());
101110
}
102111

103112
@Override
@@ -149,7 +158,13 @@ private void publishNotification(NotificationResponse notification) {
149158
}
150159
}
151160

152-
private static Observable.Operator<Message,? super Object> throwErrorResponses() {
161+
private Observable<Message> throwErrorResponses(Object message) {
162+
return message instanceof ErrorResponse
163+
? Observable.error(toSqlException((ErrorResponse) message))
164+
: Observable.just((Message) message);
165+
}
166+
167+
private static Observable.Operator<Message,? super Object> throwErrorResponsesOnComplete() {
153168
return subscriber -> new Subscriber<Object>() {
154169

155170
SqlException sqlException;
@@ -171,8 +186,7 @@ public void onError(Throwable e) {
171186
@Override
172187
public void onNext(Object message) {
173188
if (message instanceof ErrorResponse) {
174-
ErrorResponse error = (ErrorResponse) message;
175-
sqlException = new SqlException(error.getLevel().name(), error.getCode(), error.getMessage());
189+
sqlException = toSqlException((ErrorResponse) message);
176190
return;
177191
}
178192
subscriber.onNext((Message) message);
@@ -185,6 +199,10 @@ private static boolean isCompleteMessage(Object msg) {
185199
|| (msg instanceof Authentication && !((Authentication) msg).isAuthenticationOk());
186200
}
187201

202+
private static SqlException toSqlException(ErrorResponse error) {
203+
return new SqlException(error.getLevel().name(), error.getCode(), error.getMessage());
204+
}
205+
188206
ChannelInboundHandlerAdapter newStartupHandler(StartupMessage startup) {
189207
return new ChannelInboundHandlerAdapter() {
190208
@Override

src/test/java/com/github/pgasync/impl/AuthenticationTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
public class AuthenticationTest {
1313

1414
@Test
15-
@Ignore
1615
public void shouldThrowExceptionOnInvalidCredentials() throws Exception {
1716
try (ConnectionPool pool = createPoolBuilder(1).password("_invalid_").build()) {
1817
pool.queryRows("SELECT 1").toBlocking().first();

0 commit comments

Comments
 (0)
0