8000 headers treating sample (#646) · FzNl/rsocket-java@8b8dba1 · GitHub
[go: up one dir, main page]

Skip to content 8000

Commit 8b8dba1

Browse files
OlegDokukarobertroeser
authored andcommitted
headers treating sample (rsocket#646)
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
1 parent c176054 commit 8b8dba1

File tree

1 file changed

+141
-0
lines changed

1 file changed

+141
-0
lines changed
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Copyright 2015-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.examples.transport.ws;
18+
19+
import io.netty.handler.codec.http.HttpResponseStatus;
20+
import io.rsocket.AbstractRSocket;
21+
import io.rsocket.ConnectionSetupPayload;
22+
import io.rsocket.DuplexConnection;
23+
import io.rsocket.Payload;
24+
import io.rsocket.RSocket;
25+
import io.rsocket.RSocketFactory;
26+
import io.rsocket.SocketAcceptor;
27+
import io.rsocket.frame.decoder.PayloadDecoder;
28+
import io.rsocket.transport.ServerTransport;
29+
import io.rsocket.transport.netty.WebsocketDuplexConnection;
30+
import io.rsocket.transport.netty.client.WebsocketClientTransport;
31+
import io.rsocket.util.ByteBufPayload;
32+
import java.time.Duration;
33+
import java.util.HashMap;
34+
import org.reactivestreams.Publisher;
35+
import reactor.core.publisher.Flux;
36+
import reactor.core.publisher.Mono;
37+
import reactor.core.scheduler.Schedulers;
38+
import reactor.netty.Connection;
39+
import reactor.netty.DisposableServer;
40+
import reactor.netty.http.server.HttpServer;
41+
42+
public class WebSocketHeadersSample {
43+
static final Payload payload1 = ByteBufPayload.create("Hello ");
44+
45+
public static void main(String[] args) {
46+
47+
ServerTransport.ConnectionAcceptor acceptor =
48+
RSocketFactory.receive()
49+
.frameDecoder(PayloadDecoder.ZERO_COPY)
50+
.acceptor(new SocketAcceptorImpl())
51+
.toConnectionAcceptor();
52+
53+
DisposableServer disposableServer =
54+
HttpServer.create()
55+
.host("localhost")
56+
.port(0)
57+
.route(
58+
routes ->
59+
routes.ws(
60+
"/",
61+
(in, out) -> {
62+
if (in.headers().containsValue("Authorization", "test", true)) {
63+
DuplexConnection connection =
64+
new WebsocketDuplexConnection((Connection) in);
65+
return acceptor.apply(connection).then(out.neverComplete());
66+
}
67+
68+
return out.sendClose(
69+
HttpResponseStatus.UNAUTHORIZED.code(),
70+
HttpResponseStatus.UNAUTHORIZED.reasonPhrase());
71+
}))
72+
.bindNow();
73+
74+
WebsocketClientTransport clientTransport =
75+
WebsocketClientTransport.create(disposableServer.host(), disposableServer.port());
76+
77+
clientTransport.setTransportHeaders(
78+
() -> {
79+
HashMap<String, String> map = new HashMap<>();
80+
map.put("Authorization", "test");
81+
return map;
82+
});
83+
84+
RSocket socket =
85+
RSocketFactory.connect()
86+
.keepAliveAckTimeout(Duration.ofMinutes(10))
87+
.frameDecoder(PayloadDecoder.ZERO_COPY)
88+
.transport(clientTransport)
89+
.start()
90+
.block();
91+
92+
Flux.range(0, 100)
93+
.concatMap(i -> socket.fireAndForget(payload1.retain()))
94+
// .doOnNext(p -> {
95+
//// System.out.println(p.getDataUtf8());
96+
// p.release();
97+
// })
98+
.blockLast();
99+
socket.dispose();
100+
101+
WebsocketClientTransport clientTransport2 =
102+
WebsocketClientTransport.create(disposableServer.host(), disposableServer.port());
103+
104+
RSocket rSocket =
105+
RSocketFactory.connect()
106+
.keepAliveAckTimeout(Duration.ofMinutes(10))
107+
.frameDecoder(PayloadDecoder.ZERO_COPY)
108+
.transport(clientTransport2)
109+
.start()
110+
.block();
< 57AE /code>
111+
112+
// expect error here because of closed channel
113+
rSocket.requestResponse(payload1).block();
114+
}
115+
116+
private static class SocketAcceptorImpl implements SocketAcceptor {
117+
@Override
118+
public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket reactiveSocket) {
119+
return Mono.just(
120+
new AbstractRSocket() {
121+
122+
@Override
123+
public Mono<Void> fireAndForget(Payload payload) {
124+
// System.out.println(payload.getDataUtf8());
125+
payload.release();
126+
return Mono.empty();
127+
}
128+
129+
@Override
130+
public Mono<Payload> requestResponse(Payload payload) {
131+
return Mono.just(payload);
132+
}
133+
134+
@Override
135+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
136+
return Flux.from(payloads).subscribeOn(Schedulers.single());
137+
}
138+
});
139+
}
140+
}
141+
}

0 commit comments

Comments
 (0)
0