|
23 | 23 | import rx.Observable;
|
24 | 24 | import rx.Subscriber;
|
25 | 25 | import rx.Subscription;
|
| 26 | +import rx.subscriptions.Subscriptions; |
26 | 27 |
|
27 | 28 | import java.util.ArrayList;
|
28 | 29 | import java.util.HashMap;
|
@@ -95,35 +96,15 @@ public Observable<Void> rollback() {
|
95 | 96 |
|
96 | 97 | @Override
|
97 | 98 | public Observable<String> listen(String channel) {
|
98 |
| - return querySet("LISTEN " + channel).lift(subscriber -> { |
| 99 | + AtomicReference<String> token = new AtomicReference<>(); |
| 100 | + return Observable.<String>create(subscriber -> |
99 | 101 |
|
100 |
| - AtomicReference<String> token = new AtomicReference<>(); |
101 |
| - AtomicBoolean unsubscribed = new AtomicBoolean(); |
| 102 | + querySet("LISTEN " + channel) |
| 103 | + .subscribe( rs -> token.set(stream.registerNotificationHandler(channel, subscriber::onNext)), |
| 104 | + subscriber::onError) |
102 | 105 |
|
103 |
| - subscriber.add(new Subscription() { |
104 |
| - @Override |
105 |
| - public void unsubscribe() { |
106 |
| - stream.unRegisterNotificationHandler(channel, token.get()); |
107 |
| - unsubscribed.set(true); |
108 |
| - } |
109 |
| - @Override |
110 |
| - public boolean isUnsubscribed() { |
111 |
| - return unsubscribed.get(); |
112 |
| - } |
113 |
| - }); |
114 |
| - return new Subscriber<ResultSet>() { |
115 |
| - @Override |
116 |
| - public void onCompleted() { |
117 |
| - token.set(stream.registerNotificationHandler(channel, subscriber::onNext)); |
118 |
| - } |
119 |
| - @Override |
120 |
| - public void onError(Throwable e) { |
121 |
| - subscriber.onError(e); |
122 |
| - } |
123 |
| - @Override |
124 |
| - public void onNext(ResultSet s) { } |
125 |
| - }; |
126 |
| - }); |
| 106 | + ).doOnUnsubscribe(() -> querySet("UNLISTEN " + channel) |
| 107 | + .subscribe(rs -> stream.unRegisterNotificationHandler(channel, token.get()))); |
127 | 108 | }
|
128 | 109 |
|
129 | 110 | @Override
|
|
0 commit comments