8000 Use doOnUnsubscribe for UNLISTEN · rasayana/postgres-async-driver@9e6b784 · GitHub
[go: up one dir, main page]

Skip to content

Commit 9e6b784

Browse files
committed
Use doOnUnsubscribe for UNLISTEN
1 parent 48ea507 commit 9e6b784

File tree

2 files changed

+9
-28
lines changed

2 files changed

+9
-28
lines changed

src/main/java/com/github/pgasync/impl/PgConnection.java

Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import rx.Observable;
2424
import rx.Subscriber;
2525
import rx.Subscription;
26+
import rx.subscriptions.Subscriptions;
2627

2728
import java.util.ArrayList;
2829
import java.util.HashMap;
@@ -95,35 +96,15 @@ public Observable<Void> rollback() {
9596

9697
@Override
9798
public Observable<String> listen(String channel) {
98-
return querySet("LISTEN " + channel).lift(subscriber -> {
99+
AtomicReference<String> token = new AtomicReference<>();
100+
return Observable.<String>create(subscriber ->
99101

100-
AtomicReference<String> token = new AtomicReference<>();
101-
AtomicBoolean unsubscribed = new AtomicBoolean();
102+
querySet("LISTEN " + channel)
103+
.subscribe( rs -> token.set(stream.registerNotificationHandler(channel, subscriber::onNext)),
104+
subscriber::onError)
102105

103-
subscriber.add(new Subscription() {
104-
@Override
105-
public void unsubscribe() {
106-
stream.unRegisterNotificationHandler(channel, token.get());
107-
unsubscribed.set(true);
108-
}
109-
@Override
110-
public boolean isUnsubscribed() {
111-
return unsubscribed.get();
112-
}
113-
});
114-
return new Subscriber<ResultSet>() {
115-
@Override
116-
public void onCompleted() {
117-
token.set(stream.registerNotificationHandler(channel, subscriber::onNext));
118-
}
119-
@Override
120-
public void onError(Throwable e) {
121-
subscriber.onError(e);
122-
}
123-
@Override
124-
public void onNext(ResultSet s) { }
125-
};
126-
});
106+
).doOnUnsubscribe(() -> querySet("UNLISTEN " + channel)
107+
.subscribe(rs -> stream.unRegisterNotificationHandler(channel, token.get())));
127108
}
128109

129110
@Override

src/test/java/com/github/pgasync/impl/ListenNotifyTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public class ListenNotifyTest {
2323
public void shouldReceiveNotificationsOnListenedChannel() throws Exception {
2424
BlockingQueue<String> result = new LinkedBlockingQueue<>(5);
2525

26-
Subscription subscription = pool.listen("example").subscribe(result::add);
26+
Subscription subscription = pool.listen("example").subscribe(result::add, Throwable::printStackTrace);
2727
TimeUnit.SECONDS.sleep(2);
2828

2929
pool.querySet("notify example, 'msg'").toBlocking().single();

0 commit comments

Comments
 (0)
0