diff --git a/.github/workflows/ari-apis.yml b/.github/workflows/ari-apis.yml index d7244a43..4bf86abc 100755 --- a/.github/workflows/ari-apis.yml +++ b/.github/workflows/ari-apis.yml @@ -7,10 +7,10 @@ on: jobs: getApis: - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Execute getApis run: | @@ -20,7 +20,7 @@ jobs: - name: Create Pull Request id: cpr - uses: peter-evans/create-pull-request@v3 + uses: peter-evans/create-pull-request@v5 with: commit-message: ARI API Updates branch: apis diff --git a/.github/workflows/ari-versions.yml b/.github/workflows/ari-versions.yml index b68b61bf..7a487ab4 100755 --- a/.github/workflows/ari-versions.yml +++ b/.github/workflows/ari-versions.yml @@ -7,13 +7,13 @@ on: jobs: getApis: - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 steps: - name: Checkout Project - uses: actions/checkout@v2 + uses: actions/checkout@v3 - name: Checkout Wiki - uses: actions/checkout@v2 + uses: actions/checkout@v3 with: repository: ${{github.repository}}.wiki path: codegen/tmp/wiki diff --git a/.github/workflows/gradle.yml b/.github/workflows/gradle.yml index 5bcb9318..f9cb19ed 100755 --- a/.github/workflows/gradle.yml +++ b/.github/workflows/gradle.yml @@ -12,11 +12,11 @@ on: jobs: build: - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 steps: - - uses: actions/checkout@v2 - - uses: actions/cache@v1 + - uses: actions/checkout@v3 + - uses: actions/cache@v3 with: path: ~/.gradle/caches key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle') }} @@ -25,7 +25,8 @@ jobs: - name: Set up JDK 1.8 uses: actions/setup-java@v1 with: - java-version: 1.8 + distribution: 'zulu' + java-version: '8' - name: Grant execute permission for gradlew run: chmod +x gradlew - name: Build with Gradle diff --git a/CHANGELOG.md b/CHANGELOG.md index ab791295..9b2faee6 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ ## [Unreleased] [Unreleased]: https://github.com/ari4java/ari4java/compare/v0.16.0...HEAD +### Added +- Connection Pooling using Netty's FixedChannelPool +- Examples have a Docker provider for Vagrant (in order to work around VirtualBox compatibility on Apple Silicon) + +### Changed +- Cannot wait for connection close due to using connection keep-alive (aka pooling) +- Some methods in the Examples use the async approach ## [0.16.0] [0.16.0]: https://github.com/ari4java/ari4java/compare/v0.15.0...v0.16.0 diff --git a/examples/Dockerfile b/examples/Dockerfile new file mode 100644 index 00000000..c06c503c --- /dev/null +++ b/examples/Dockerfile @@ -0,0 +1,32 @@ +# Docker image to use with Vagrant +# Aims to be as similar to normal Vagrant usage as possible +# Adds Puppet, SSH daemon, Systemd +# Adapted from https://github.com/BashtonLtd/docker-vagrant-images/blob/master/ubuntu1404/Dockerfile + +FROM ubuntu:focal +ENV container docker +RUN apt update && apt -y upgrade + +# Install system dependencies, you may not need all of these +RUN apt install -y --no-install-recommends ssh sudo libffi-dev systemd openssh-client nano + +# Add vagrant user and key for SSH +RUN useradd --create-home -s /bin/bash vagrant +RUN echo -n 'vagrant:vagrant' | chpasswd +RUN echo 'vagrant ALL = NOPASSWD: ALL' > /etc/sudoers.d/vagrant +RUN chmod 440 /etc/sudoers.d/vagrant +RUN mkdir -p /home/vagrant/.ssh +RUN chmod 700 /home/vagrant/.ssh +RUN echo "ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA6NF8iallvQVp22WDkTkyrtvp9eWW6A8YVr+kz4TjGYe7gHzIw+niNltGEFHzD8+v1I2YJ6oXevct1YeS0o9HZyN1Q9qgCgzUFtdOKLv6IedplqoPkcmF0aYet2PkEDo3MlTBckFXPITAMzF8dJSIFo9D8HfdOV0IAdx4O7PtixWKn5y2hMNG0zQPyUecp4pzC6kivAIhyfHilFR61RGL+GPXQ2MWZWFYbAGjyiYJnAmCP3NOTd0jMZEnDkbUvxhMmBYSdETk1rRgm+R4LOzFUGaHqHDLKLX+FIPKcF96hrucXzcWyLbIbEgE98OHlnVYCzRdK8jlqm8tehUc9c9WhQ==" > /home/vagrant/.ssh/authorized_keys +RUN chmod 600 /home/vagrant/.ssh/authorized_keys +RUN chown -R vagrant:vagrant /home/vagrant/.ssh +RUN sed -i -e 's/Defaults.*requiretty/#&/' /etc/sudoers +RUN sed -i -e 's/\(UsePAM \)yes/\1 no/' /etc/ssh/sshd_config + +# Start SSH +RUN mkdir /var/run/sshd +EXPOSE 22 +RUN /usr/sbin/sshd + +# Start Systemd (systemctl) +CMD ["/lib/systemd/systemd"] diff --git a/examples/Vagrantfile b/examples/Vagrantfile index 9fee894f..498c70ed 100644 --- a/examples/Vagrantfile +++ b/examples/Vagrantfile @@ -4,18 +4,32 @@ # use NET_BRIDGE=on to create a bridge interface with dynamic ip from your network. # Used for testing from an external source like a cell phone with Zoiper or the like # Note: Wifi with WPA2-Enterprise does not work, the interface doesn't get an IP +# Note: for M1 Mac use the --provider docker flag, the NET_BRIDGE is not compatible with docker if not ENV["NET_BRIDGE"] then ENV["NET_BRIDGE"] = "off" end Vagrant.configure("2") do |config| config.vm.box = "ubuntu/focal64" - config.vm.provider "virtualbox" do |v| + config.vm.synced_folder "./vagrant", "/vagrant" + config.vm.provider "virtualbox" do |v, override| v.memory = 1024 v.cpus = 2 + override.vm.network "private_network", ip: "192.168.56.44" + if ENV["NET_BRIDGE"] == "on" then + override.vm.network "public_network", use_dhcp_assigned_default_route: false + end + override.vm.provision :shell, :path => "vagrant/scripts/provision.sh" end - config.vm.network "private_network", ip: "192.168.56.44" - if ENV["NET_BRIDGE"] == "on" then - config.vm.network "public_network", use_dhcp_assigned_default_route: false + config.vm.provider "docker" do |d, override| + override.vm.box = nil + d.build_dir = "." + override.ssh.insert_key = true + d.has_ssh = true + d.privileged = true + override.vm.network :forwarded_port, guest: 8089, host: 8089 + override.vm.network :forwarded_port, guest: 5060, host: 5060, protocol: "udp" + for i in 20000..20010 + override.vm.network :forwarded_port, guest: i, host: i, protocol: "udp" + end + override.vm.provision :shell, :path => "vagrant/scripts/provision.sh", :args => ["docker"] end - config.vm.synced_folder "./vagrant", "/vagrant" - config.vm.provision :shell, :path => "vagrant/scripts/provision.sh" end diff --git a/examples/src/main/java/ch/loway/oss/ari4java/examples/Weasels.java b/examples/src/main/java/ch/loway/oss/ari4java/examples/Weasels.java index 59de5638..8a19e75f 100755 --- a/examples/src/main/java/ch/loway/oss/ari4java/examples/Weasels.java +++ b/examples/src/main/java/ch/loway/oss/ari4java/examples/Weasels.java @@ -3,11 +3,9 @@ import ch.loway.oss.ari4java.ARI; import ch.loway.oss.ari4java.AriVersion; import ch.loway.oss.ari4java.generated.AriWSHelper; -import ch.loway.oss.ari4java.generated.models.AsteriskInfo; -import ch.loway.oss.ari4java.generated.models.Message; -import ch.loway.oss.ari4java.generated.models.PlaybackFinished; -import ch.loway.oss.ari4java.generated.models.StasisStart; +import ch.loway.oss.ari4java.generated.models.*; import ch.loway.oss.ari4java.tools.ARIException; +import ch.loway.oss.ari4java.tools.AriCallback; import ch.loway.oss.ari4java.tools.RestException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,12 +78,12 @@ public void onFailure(RestException e) { @Override protected void onStasisStart(StasisStart message) { - handleStart(message); + handleStart(message, threadPool); } @Override protected void onPlaybackFinished(PlaybackFinished message) { - handlePlaybackFinished(message); + handlePlaybackFinished(message, threadPool); } }); @@ -96,24 +94,49 @@ protected void onPlaybackFinished(PlaybackFinished message) { System.exit(0); } - private void handleStart(StasisStart start) { + private void handleStart(StasisStart start, final ExecutorService threadPool) { logger.info("Stasis Start Channel: {}", start.getChannel().getId()); ARI.sleep(300); // a slight pause before we start the playback ... try { - ari.channels().play(start.getChannel().getId(), "sound:weasels-eaten-phonesys").execute(); + ari.channels().play(start.getChannel().getId(), "sound:weasels-eaten-phonesys") + .execute(new AriCallback() { + @Override + public void onSuccess(Playback playback) { + logger.debug("Playback success: {}", playback.getId()); + } + + @Override + public void onFailure(RestException e) { + logger.error("Playback Error: {}", e.getMessage(), e); + threadPool.shutdown(); + } + }); } catch (Throwable e) { logger.error("Error: {}", e.getMessage(), e); + threadPool.shutdown(); } } - private void handlePlaybackFinished(PlaybackFinished playback) { - logger.info("PlaybackFinished - {}", playback.getPlayback().getTarget_uri()); + private void handlePlaybackFinished(PlaybackFinished playback, final ExecutorService threadPool) { + logger.info("PlaybackFinished - {} {}", playback.getPlayback().getId(), playback.getPlayback().getTarget_uri()); if (playback.getPlayback().getTarget_uri().indexOf("channel:") == 0) { try { String chanId = playback.getPlayback().getTarget_uri().split(":")[1]; logger.info("Hangup Channel: {}", chanId); ARI.sleep(300); // a slight pause before we hangup ... - ari.channels().hangup(chanId).execute(); + ari.channels().hangup(chanId).execute(new AriCallback() { + @Override + public void onSuccess(Void a) { + logger.debug("Hangup success"); + threadPool.shutdown(); + } + + @Override + public void onFailure(RestException e) { + logger.error("Hangup Error: {}", e.getMessage(), e); + threadPool.shutdown(); + } + }); } catch (Throwable e) { logger.error("Error: {}", e.getMessage(), e); } diff --git a/examples/src/main/java/ch/loway/oss/ari4java/examples/comprehensive/Asterisk.java b/examples/src/main/java/ch/loway/oss/ari4java/examples/comprehensive/Asterisk.java index 504048a0..0d671698 100644 --- a/examples/src/main/java/ch/loway/oss/ari4java/examples/comprehensive/Asterisk.java +++ b/examples/src/main/java/ch/loway/oss/ari4java/examples/comprehensive/Asterisk.java @@ -4,6 +4,7 @@ import ch.loway.oss.ari4java.AriVersion; import ch.loway.oss.ari4java.generated.AriWSHelper; import ch.loway.oss.ari4java.generated.models.*; +import ch.loway.oss.ari4java.tools.AriCallback; import ch.loway.oss.ari4java.tools.RestException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,10 +44,19 @@ public boolean start() { logger.info("Starting ARI..."); try { ari = ARI.build(address, ARI_APP_NAME, user, pass, version); - AsteriskInfo info = ari.asterisk().getInfo().execute(); + ari.asterisk().getInfo().execute(new AriCallback() { + @Override + public void onSuccess(AsteriskInfo info) { + logger.info("Asterisk {}", info.getSystem().getVersion()); + } + + @Override + public void onFailure(RestException e) { + logger.error("Error getting Asterisk version: {}", e.getMessage(), e); + } + }); threadPool = Executors.newFixedThreadPool(5); ari.events().eventWebsocket(ARI_APP_NAME).execute(new Handler()); - logger.info("Connected to Asterisk {}", info.getSystem().getVersion()); return true; } catch (Throwable t) { logger.error("Error: {}", t.getMessage(), t); @@ -123,7 +133,17 @@ private void hangupChannel(String channelId) { if (channelId != null) { if (lookups.containsKey(channelId)) { try { - ari.channels().hangup(channelId).execute(); + ari.channels().hangup(channelId).execute(new AriCallback() { + @Override + public void onSuccess(Void result) { + logger.debug("hanging up channel"); + } + + @Override + public void onFailure(RestException e) { + logger.error("Error hanging up channel", e); + } + }); } catch (RestException e) { logger.error("Error hanging up channel", e); } @@ -139,10 +159,23 @@ private void hangupChannel(String channelId) { * @throws RestException raised by ARI interactions */ private void createBridgeAndAddChannel1(State state) throws RestException { - Bridge bridge = ari.bridges().create().execute(); - logger.debug("Bridge created id: {}", bridge.getId()); - state.bridge = bridge.getId(); - addChannelToBridge(state.bridge, state.channel1); + ari.bridges().create().execute(new AriCallback() { + @Override + public void onSuccess(Bridge bridge) { + logger.debug("Bridge created id: {}", bridge.getId()); + state.bridge = bridge.getId(); + try { + addChannelToBridge(state.bridge, state.channel1); + } catch (RestException e) { + logger.error("Error adding channel to bridge: {}", e.getMessage(), e); + } + } + + @Override + public void onFailure(RestException e) { + logger.error("Error creating bridge: {}", e.getMessage(), e); + } + }); } /** @@ -151,7 +184,17 @@ private void createBridgeAndAddChannel1(State state) throws RestException { * @throws RestException raised by ARI interactions */ private void addChannelToBridge(String bridgeId, String channelId) throws RestException { - ari.bridges().addChannel(bridgeId, channelId).execute(); + ari.bridges().addChannel(bridgeId, channelId).execute(new AriCallback() { + @Override + public void onSuccess(Void result) { + logger.debug("Channel {}, added to Bridge {}", channelId, bridgeId); + } + + @Override + public void onFailure(RestException e) { + logger.debug("Error adding Channel {} to Bridge {}: {}", channelId, bridgeId, e.getMessage(), e); + } + }); } /** @@ -231,7 +274,7 @@ public void onSuccess(Message message) { @Override protected void onStasisStart(StasisStart message) { // StasisStart is created by both the Stasis dialplan app and a call to the channels API in ARI, - // so we check an argument set in the create channel code and ignore + // so we check an argument set in the createChannel code and ignore logger.debug("onStasisStart, chan id: {}, name: {}", message.getChannel().getId(), message.getChannel().getName()); if (message.getArgs() != null && !message.getArgs().isEmpty() && "me".equals(message.getArgs().get(0))) { logger.debug("started by me, not processing..."); @@ -260,18 +303,6 @@ protected void onStasisStart(StasisStart message) { } catch (RestException e) { logger.error("Error creating bridge", e); hangupChannel(state.channel1); - return; - } - // create the 2nd channel to the desired endpoint - try { - state.channel2 = createChannel(state.to, "Inbound " + state.from); - logger.debug("channel2: {}", state.channel2); - synchronized (lookups) { - lookups.put(state.channel2, state.id); - } - } catch (RestException e) { - logger.error("Error creating channel", e); - hangupChannel(state.channel1); } } @@ -301,17 +332,6 @@ protected void onChannelStateChange(ChannelStateChange message) { } catch (RestException e) { logger.error("Error creating bridge", e); hangupChannel(state.channel1); - return; - } - try { - state.channel2 = createChannel(state.to, state.from); - synchronized (lookups) { - lookups.put(state.channel2, state.id); - } - logger.debug("channel2: {}", state.channel2); - } catch (RestException e) { - logger.error("Error creating channel2", e); - hangupChannel(state.channel1); } } else if ("Ringing".equals(message.getChannel().getState()) && message.getChannel().getId().equals(state.channel2)) { // the channel state has changed to Ringing and is channel2 in the State object ... @@ -340,6 +360,24 @@ protected void onChannelStateChange(ChannelStateChange message) { } } + @Override + protected void onChannelEnteredBridge(ChannelEnteredBridge message) { + logger.debug("onChannelEnteredBridge {} {}", message.getBridge().getId(), message.getChannel().getId()); + State state = lookupState(message.getChannel().getId()); + if (message.getChannel().getId().equals(state.channel1)) { + try { + state.channel2 = createChannel(state.to, state.from); + synchronized (lookups) { + lookups.put(state.channel2, state.id); + } + logger.debug("channel2: {}", state.channel2); + } catch (RestException e) { + logger.error("Error creating channel2", e); + hangupChannel(state.channel1); + } + } + } + @Override protected void onChannelLeftBridge(ChannelLeftBridge message) { logger.debug("onChannelLeftBridge {} {}", message.getBridge().getId(), message.getChannel().getId()); diff --git a/examples/vagrant/asterisk/extensions.conf b/examples/vagrant/asterisk/extensions.conf index 955dc57e..68d94d8c 100644 --- a/examples/vagrant/asterisk/extensions.conf +++ b/examples/vagrant/asterisk/extensions.conf @@ -3,7 +3,7 @@ static=yes writeprotect=yes [default] -; empty - we dont use the defailt context +; empty - we dont use the default context [from-internal] ; Extensions 100, 200 & 300 diff --git a/examples/vagrant/asterisk/modules.conf b/examples/vagrant/asterisk/modules.conf index f131e277..be4c2cc2 100644 --- a/examples/vagrant/asterisk/modules.conf +++ b/examples/vagrant/asterisk/modules.conf @@ -43,7 +43,6 @@ load = codec_resample.so load = codec_ulaw.so load = codec_alaw.so load = codec_g722.so -load = codec_g729a.so load = codec_opus.so ; Formats @@ -52,11 +51,12 @@ load = format_gsm.so load = format_pcm.so load = format_wav_gsm.so load = format_wav.so -load = format_g729.so -load = res_format_attr_g729.so load = format_ogg_opus.so load = res_format_attr_opus.so +load = format_ogg_vorbis.so load = format_mp3.so +load = format_vp8.so +load = format_h264.so ; Functions @@ -74,10 +74,12 @@ load = func_md5.so load = func_sha1.so load = func_channel.so load = func_timeout.so +load = func_hangupcause.so ; Core/PBX load = pbx_config.so +load = pbx_spool.so ; Resources @@ -93,9 +95,9 @@ load = res_pjsip_diversion.so load = res_pjsip_dlg_options.so load = res_pjsip_dtmf_info.so load = res_pjsip_empty_info.so -load = res_pjsip_endpoint_identifier_anonymous.so -load = res_pjsip_endpoint_identifier_ip.so load = res_pjsip_endpoint_identifier_user.so +load = res_pjsip_endpoint_identifier_ip.so +load = res_pjsip_endpoint_identifier_anonymous.so load = res_pjsip_exten_state.so load = res_pjsip_header_funcs.so load = res_pjsip_history.so @@ -135,6 +137,7 @@ load = res_sorcery_config.so load = res_sorcery_memory.so load = res_sorcery_realtime.so load = res_timing_timerfd.so +load = res_timing_pthread.so load = res_http_media_cache.so load = res_http_websocket.so load = res_crypto.so diff --git a/examples/vagrant/asterisk/pjsip.conf b/examples/vagrant/asterisk/pjsip.conf index 9e33284f..6ffee21f 100644 --- a/examples/vagrant/asterisk/pjsip.conf +++ b/examples/vagrant/asterisk/pjsip.conf @@ -5,8 +5,19 @@ type = transport protocol = udp bind = 0.0.0.0 +domain = ari4java.local [transport-wss] type=transport protocol=wss bind=0.0.0.0 +domain = ari4java.local +websocket_write_timeout = 5000 + +[192.168.44.56] +type = domain-alias +domain = ari4java.local + +[localhost] +type = domain-alias +domain = ari4java.local diff --git a/examples/vagrant/asterisk/pjsip_wizard.conf b/examples/vagrant/asterisk/pjsip_wizard.conf index 04726844..fcdc248c 100644 --- a/examples/vagrant/asterisk/pjsip_wizard.conf +++ b/examples/vagrant/asterisk/pjsip_wizard.conf @@ -7,12 +7,12 @@ sends_registrations = no accepts_auth = yes sends_auth = no endpoint/allow_subscribe = yes -endpoint/allow = !all,g722,g729,alaw +endpoint/disallow = all +endpoint/allow = g722,ulaw endpoint/direct_media = no endpoint/force_rport = yes endpoint/disable_direct_media_on_nat = yes endpoint/direct_media_method = invite -endpoint/ice_support = yes endpoint/moh_suggest = default endpoint/send_rpid = yes endpoint/rewrite_contact = yes @@ -29,13 +29,16 @@ aor/support_path = yes ; Default Template for web phones [web-phone-defaults](!,phone-defaults) transport = transport-wss -endpoint/allow = !all,opus,g722,alaw +endpoint/disallow=all +endpoint/allow = opus,ulaw endpoint/webrtc = yes endpoint/rtp_symmetric = yes -endpoint/dtls_auto_generate_cert = yes -endpoint/device_state_busy_at = 1 +;endpoint/dtls_auto_generate_cert = yes +endpoint/dtls_ca_file = /etc/asterisk/keys/ca.crt +endpoint/dtls_cert_file = /etc/asterisk/keys/asterisk.crt +endpoint/dtls_private_key = /etc/asterisk/keys/asterisk.key -; User Extensions using the templates definded above +; User Extensions using the templates defined above [100](web-phone-defaults) inbound_auth/username = 100 inbound_auth/password = abc123 diff --git a/examples/vagrant/asterisk/rtp.conf b/examples/vagrant/asterisk/rtp.conf new file mode 100644 index 00000000..c136ff36 --- /dev/null +++ b/examples/vagrant/asterisk/rtp.conf @@ -0,0 +1,162 @@ +; +; RTP Configuration +; +[general] +; +; RTP start and RTP end configure start and end addresses +; +; Defaults are rtpstart=5000 and rtpend=31000 +; +rtpstart=10000 +rtpend=10010 +; +; Whether to enable or disable UDP checksums on RTP traffic +; +;rtpchecksums=no +; +; The amount of time a DTMF digit with no 'end' marker should be +; allowed to continue (in 'samples', 1/8000 of a second) +; +;dtmftimeout=3000 +; rtcpinterval = 5000 ; Milliseconds between rtcp reports + ;(min 500, max 60000, default 5000) +; +; Enable strict RTP protection. This will drop RTP packets that do not come +; from the recognized source of the RTP stream. Strict RTP qualifies RTP +; packet stream sources before accepting them upon initial connection and +; when the connection is renegotiated (e.g., transfers and direct media). +; Initial connection and renegotiation starts a learning mode to qualify +; stream source addresses. Once Asterisk has recognized a stream it will +; allow other streams to qualify and replace the current stream for 5 +; seconds after starting learning mode. Once learning mode completes the +; current stream is locked in and cannot change until the next +; renegotiation. +; Valid options are "no" to disable strictrtp, "yes" to enable strictrtp, +; and "seqno", which does the same thing as strictrtp=yes, but only checks +; to make sure the sequence number is correct rather than checking the time +; interval as well. +; This option is enabled by default. +; strictrtp=yes +; +; Number of packets containing consecutive sequence values needed +; to change the RTP source socket address. This option only comes +; into play while using strictrtp=yes. Consider changing this value +; if rtp packets are dropped from one or both ends after a call is +; connected. This option is set to 4 by default. +; probation=8 +; +; Enable sRTP replay protection. Buggy SIP user agents (UAs) reset the +; sequence number (RTP-SEQ) on a re-INVITE, for example, with Session Timers +; or on Call Hold/Resume, but keep the synchronization source (RTP-SSRC). If +; the new RTP-SEQ is higher than the previous one, the call continues if the +; roll-over counter (sRTP-ROC) is zero (the call lasted less than 22 minutes). +; In all other cases, the call faces one-way audio or even no audio at all. +; "replay check failed (index too old)" gets printed continuously. This is a +; software bug. You have to report this to the creator of that UA. Until it is +; fixed, you could disable sRTP replay protection (see RFC 3711 section 3.3.2). +; This option is enabled by default. +; srtpreplayprotection=yes +; +; Whether to enable or disable ICE support. This option is enabled by default. +; icesupport=false +; +; Hostname or address for the STUN server used when determining the external +; IP address and port an RTP session can be reached at. The port number is +; optional. If omitted the default value of 3478 will be used. This option is +; disabled by default. Name resolution will occur at load time, and if DNS is +; used, name resolution will occur repeatedly after the TTL expires. +; +; e.g. stundaddr=mystun.server.com:3478 +; +; stunaddr= +; +; Some multihomed servers have IP interfaces that cannot reach the STUN +; server specified by stunaddr. Blacklist those interface subnets from +; trying to send a STUN packet to find the external IP address. +; Attempting to send the STUN packet needlessly delays processing incoming +; and outgoing SIP INVITEs because we will wait for a response that can +; never come until we give up on the response. +; * Multiple subnets may be listed. +; * Blacklisting applies to IPv4 only. STUN isn't needed for IPv6. +; * Blacklisting applies when binding RTP to specific IP addresses and not +; the wildcard 0.0.0.0 address. e.g., A PJSIP endpoint binding RTP to a +; specific address using the bind_rtp_to_media_address and media_address +; options. Or the PJSIP endpoint specifies an explicit transport that binds +; to a specific IP address. Blacklisting is done via ACL infrastructure +; so it's possible to whitelist as well. +; +; stun_acl = named_acl +; stun_deny = 0.0.0.0/0 +; stun_permit = 1.2.3.4/32 +; +; For historic reasons stun_blacklist is an alias for stun_deny. +; +; Whether to report the PJSIP version in a SOFTWARE attribute for all +; outgoing STUN packets. This option is enabled by default. +; +; stun_software_attribute=yes +; +; Hostname or address for the TURN server to be used as a relay. The port +; number is optional. If omitted the default value of 3478 will be used. +; This option is disabled by default. +; +; e.g. turnaddr=myturn.server.com:34780 +; +; turnaddr= +; +; Username used to authenticate with TURN relay server. +; turnusername= +; +; Password used to authenticate with TURN relay server. +; turnpassword= +; +; An ACL can be used to determine which discovered addresses to include for +; ICE, srflx and relay discovery. This is useful to optimize the ICE process +; where a system has multiple host address ranges and/or physical interfaces +; and certain of them are not expected to be used for RTP. For example, VPNs +; and local interconnections may not be suitable or necessary for ICE. Multiple +; subnets may be listed. If left unconfigured, all discovered host addresses +; are used. +; +; ice_acl = named_acl +; ice_deny = 0.0.0.0/0 +; ice_permit = 1.2.3.4/32 +; +; For historic reasons ice_blacklist is an alias for ice_deny. +; +; The MTU to use for DTLS packet fragmentation. This option is set to 1200 +; by default. The minimum MTU is 256. +; dtls_mtu = 1200 +; +[ice_host_candidates] +; +; When Asterisk is behind a static one-to-one NAT and ICE is in use, ICE will +; expose the server's internal IP address as one of the host candidates. +; Although using STUN (see the 'stunaddr' configuration option) will provide a +; publicly accessible IP, the internal IP will still be sent to the remote +; peer. To help hide the topology of your internal network, you can override +; the host candidates that Asterisk will send to the remote peer. +; +; IMPORTANT: Only use this functionality when your Asterisk server is behind a +; one-to-one NAT and you know what you're doing. If you do define anything +; here, you almost certainly will NOT want to specify 'stunaddr' or 'turnaddr' +; above. +; +; The format for these overrides is: +; +; => ,[include_local_address] +; +; The following will replace 192.168.1.10 with 1.2.3.4 during ICE +; negotiation: +; +;192.168.1.10 => 1.2.3.4 +; +; The following will include BOTH 192.168.1.10 and 1.2.3.4 during ICE +; negotiation instead of replacing 192.168.1.10. This can make it easier +; to serve both local and remote clients. +; +;192.168.1.10 => 1.2.3.4,include_local_address +; +; You can define an override for more than 1 interface if you have a multihomed +; server. Any local interface that is not matched will be passed through +; unaltered. Both IPv4 and IPv6 addresses are supported. diff --git a/examples/vagrant/scripts/provision.sh b/examples/vagrant/scripts/provision.sh index de377e1c..f04741cd 100755 --- a/examples/vagrant/scripts/provision.sh +++ b/examples/vagrant/scripts/provision.sh @@ -1,44 +1,38 @@ #!/usr/bin/env bash -# Hostname -echo "Updating Hostname ..." -sed -i 's/^ubuntu-focal$/localpbx/g' /etc/hostname -sed -i 's/ubuntu-focal$/localpbx/g' /etc/hosts -systemctl restart systemd-logind.service -hostnamectl set-hostname localpbx +DOCKER=false +if [ "$1" == "docker" ]; then + DOCKER=true +fi + +if [ "$DOCKER" == "false" ]; then + # Hostname + echo "Updating Hostname ..." + sed -i 's/^ubuntu-focal$/localpbx/g' /etc/hostname + sed -i 's/ubuntu-focal$/localpbx/g' /etc/hosts + systemctl restart systemd-logind.service + hostnamectl set-hostname localpbx +fi # get the latest packages and upgrade them echo "Updating System ..." +export DEBIAN_FRONTEND=noninteractive apt update apt -y upgrade -# install some pre-requisites (mostly for Asterisk) -echo "Installing pre-requisites ..." +# install some pre-requisites +echo "Installing some pre-requisites ..." apt -y install \ + curl \ wget \ - unzip \ - subversion \ - build-essential \ - openssl \ - pkg-config \ - libssl-dev \ - libcurl4-openssl-dev \ - libgsm1-dev \ - libnewt-dev \ - libxml2-dev \ - libsqlite3-dev \ - uuid-dev \ - libjansson-dev \ - libncurses5-dev \ - libedit-dev \ - xmlstarlet \ - libsrtp2-dev \ - zlib1g-dev \ - mpg123 \ - subversion \ - tcl \ sox \ - lame + lame \ + mpg123 \ + libopusfile-dev \ + autoconf + +# set the timezone +ln -snf /usr/share/zoneinfo/$(curl https://ipapi.co/timezone) /etc/localtime # create user & folders for Asterisk echo "Creating asterisk user and required folders ..." @@ -46,23 +40,53 @@ adduser --system --group --no-create-home asterisk mkdir -p /var/{lib,log,spool}/asterisk # goto home folder, download and build Asterisk using the version specified in AST_VER -AST_VER=18.9.0 +AST_VER=16.27.0 echo "Download, compile & setup Asterisk $AST_VER ..." cd ~ wget http://downloads.asterisk.org/pub/telephony/asterisk/releases/asterisk-$AST_VER.tar.gz tar xvfz asterisk-$AST_VER.tar.gz +rm asterisk-$AST_VER.tar.gz cd asterisk-$AST_VER/ +echo "Installing Asterisk pre-requisites ..." +contrib/scripts/install_prereq install contrib/scripts/get_mp3_source.sh +if [ "$DOCKER" == "true" ]; then + echo "Getting Open Source OPUS Codec" + # although we're using a later Asterisk this is the latest opus but seems to work (from my limited testing) + wget github.com/traud/asterisk-opus/archive/asterisk-13.7.tar.gz + tar zvxf asterisk-13.7.tar.gz + rm asterisk-13.7.tar.gz + cp --verbose ./asterisk-opus*/include/asterisk/* ./include/asterisk + cp --verbose ./asterisk-opus*/codecs/* ./codecs + cp --verbose ./asterisk-opus*/res/* ./res + cp --verbose ./asterisk-opus*/formats/* ./formats + patch -p1 <./asterisk-opus*/asterisk.patch + ./bootstrap.sh +fi ./configure --with-pjproject-bundled --with-jansson-bundled -make menuselect.makeopts && menuselect/menuselect \ ---enable codec_opus \ ---enable codec_g729a \ ---enable EXTRA-SOUNDS-EN-G722 \ ---enable EXTRA-SOUNDS-EN-WAV \ ---enable format_mp3 \ ---disable chan_sip \ ---disable BUILD_NATIVE \ -menuselect.makeopts +make menuselect.makeopts +if [ "$DOCKER" == "true" ]; then + menuselect/menuselect \ + --enable CORE-SOUNDS-EN-SLN16 \ + --enable EXTRA-SOUNDS-EN-G722 \ + --enable EXTRA-SOUNDS-EN-WAV \ + --enable EXTRA-SOUNDS-EN-SLN16 \ + --enable format_mp3 \ + --disable chan_sip \ + --disable BUILD_NATIVE \ + menuselect.makeopts +else + menuselect/menuselect \ + --enable CORE-SOUNDS-EN-SLN16 \ + --enable EXTRA-SOUNDS-EN-G722 \ + --enable EXTRA-SOUNDS-EN-WAV \ + --enable EXTRA-SOUNDS-EN-SLN16 \ + --enable codec_opus \ + --enable format_mp3 \ + --disable chan_sip \ + --disable BUILD_NATIVE \ + menuselect.makeopts +fi make make install #make sure the asterisk user owns its folders @@ -70,14 +94,21 @@ chown -R asterisk:asterisk /var/{lib,log,spool}/asterisk # copy config & http content cp -f /vagrant/asterisk/* /etc/asterisk/ cp -f /vagrant/static-http/* /var/lib/asterisk/static-http/ - +if [ "$DOCKER" == "true" ]; then + sed -i 's/codec_opus/codec_opus_open_source/g' /etc/asterisk/modules.conf + sed -i 's/format_ogg_opus/format_ogg_opus_open_source/g' /etc/asterisk/modules.conf +fi # create keys for TLS echo "Creating TLS keys ..." mkdir -p /etc/asterisk/keys openssl genrsa -des3 -out /etc/asterisk/keys/ca.key -passout pass:asterisk 4096 > /dev/null openssl req -batch -new -x509 -days 3650 -subj "/O=ARI4Java/CN=ARI4Java CA" -key /etc/asterisk/keys/ca.key -passin pass:asterisk -out /etc/asterisk/keys/ca.crt > /dev/null openssl genrsa -out /etc/asterisk/keys/asterisk.key 2048 > /dev/null -openssl req -batch -new -subj "/O=ARI4Java/CN=192.168.56.44" -key /etc/asterisk/keys/asterisk.key -out /etc/asterisk/keys/asterisk.csr > /dev/null +SUBJECT="/O=ARI4Java/CN=192.168.56.44" +if [ "$DOCKER" == "true" ]; then + SUBJECT="/O=ARI4Java/CN=localhost" +fi +openssl req -batch -new -subj "$SUBJECT" -key /etc/asterisk/keys/asterisk.key -out /etc/asterisk/keys/asterisk.csr > /dev/null openssl x509 -req -days 3650 -in /etc/asterisk/keys/asterisk.csr -CA /etc/asterisk/keys/ca.crt -CAkey /etc/asterisk/keys/ca.key -passin pass:asterisk -set_serial 01 -out /etc/asterisk/keys/asterisk.crt > /dev/null chown -R asterisk:asterisk /etc/asterisk/keys diff --git a/examples/vagrant/static-http/ari4java-phone.html b/examples/vagrant/static-http/ari4java-phone.html index a31910c2..ef5097d0 100644 --- a/examples/vagrant/static-http/ari4java-phone.html +++ b/examples/vagrant/static-http/ari4java-phone.html @@ -108,7 +108,7 @@

ARI4Java Phone

let audioOutput = new Audio(); audioOutput.setSinkId('default'); audioOutput.load(); - let server = '192.168.56.44'; + let server = window.location.hostname; let phone = undefined; let session = undefined; function login(exten) { diff --git a/src/main/java/ch/loway/oss/ari4java/tools/WsClientAutoReconnect.java b/src/main/java/ch/loway/oss/ari4java/tools/WsClientAutoReconnect.java deleted file mode 100755 index 1489bad7..00000000 --- a/src/main/java/ch/loway/oss/ari4java/tools/WsClientAutoReconnect.java +++ /dev/null @@ -1,12 +0,0 @@ -package ch.loway.oss.ari4java.tools; - -/** - * Interface to pluggable WebSocket reconnect implementation - * - * @author grahambrown11 - * - */ -public interface WsClientAutoReconnect { - void reconnectWs(Throwable cause); - void pong(); -} diff --git a/src/main/java/ch/loway/oss/ari4java/tools/http/HTTPLogger.java b/src/main/java/ch/loway/oss/ari4java/tools/http/HTTPLogger.java index 2210ad31..f17ee670 100755 --- a/src/main/java/ch/loway/oss/ari4java/tools/http/HTTPLogger.java +++ b/src/main/java/ch/loway/oss/ari4java/tools/http/HTTPLogger.java @@ -15,7 +15,7 @@ private HTTPLogger() { throw new IllegalStateException("Utility class"); } - private static Logger logger = LoggerFactory.getLogger(HTTPLogger.class); + private static final Logger logger = LoggerFactory.getLogger(HTTPLogger.class); private static final int MAX_LEN = 1000; diff --git a/src/main/java/ch/loway/oss/ari4java/tools/http/NettyHttpClient.java b/src/main/java/ch/loway/oss/ari4java/tools/http/NettyHttpClient.java index 55048fa1..81a2273d 100755 --- a/src/main/java/ch/loway/oss/ari4java/tools/http/NettyHttpClient.java +++ b/src/main/java/ch/loway/oss/ari4java/tools/http/NettyHttpClient.java @@ -8,16 +8,26 @@ import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.pool.ChannelPool; +import io.netty.channel.pool.ChannelPoolHandler; +import io.netty.channel.pool.FixedChannelPool; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.base64.Base64; import io.netty.handler.codec.http.*; import io.netty.handler.codec.http.websocketx.*; +import io.netty.handler.logging.ByteBufFormat; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.util.NettyRuntime; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ScheduledFuture; +import io.netty.util.internal.SystemPropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +47,7 @@ * * @author mwalton */ -public class NettyHttpClient implements HttpClient, WsClient, WsClientAutoReconnect { +public class NettyHttpClient implements HttpClient, WsClient { public static final int CONNECTION_TIMEOUT_SEC = 10; public static final int READ_TIMEOUT_SEC = 30; @@ -50,7 +60,7 @@ public class NettyHttpClient implements HttpClient, WsClient, WsClientAutoReconn private static final String HTTP_AGGREGATOR = "http-aggregator"; private static final String HTTP_HANDLER = "http-handler"; - private Logger logger = LoggerFactory.getLogger(NettyHttpClient.class); + private final Logger logger = LoggerFactory.getLogger(NettyHttpClient.class); protected Bootstrap httpBootstrap; protected URI baseUri; @@ -58,7 +68,7 @@ public class NettyHttpClient implements HttpClient, WsClient, WsClientAutoReconn private EventLoopGroup shutDownGroup; protected String auth; - private HttpResponseHandler wsCallback; + protected HttpResponseHandler wsCallback; private String wsEventsUrl; private List wsEventsParamQuery; private WsClientConnection wsClientConnection; @@ -76,10 +86,16 @@ public class NettyHttpClient implements HttpClient, WsClient, WsClientAutoReconn private static boolean autoReconnect = true; protected int pingPeriod = 5; protected TimeUnit pingTimeUnit = TimeUnit.MINUTES; + protected ChannelPool pool; + private int threadCount; public NettyHttpClient() { - group = new NioEventLoopGroup(); - shutDownGroup = new NioEventLoopGroup(); + // use at least 3 threads + threadCount = Math.max(3, SystemPropertyUtil.getInt( + "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); + logger.debug("Starting NioEventLoopGroup with {} threads", threadCount); + group = new NioEventLoopGroup(threadCount); + shutDownGroup = new NioEventLoopGroup(1); } public void initialize(String baseUrl, String username, String password) throws URISyntaxException { @@ -108,23 +124,38 @@ protected void initHttpBootstrap() { READ_TIMEOUT_SEC, MAX_HTTP_REQUEST); httpBootstrap = new Bootstrap(); + httpBootstrap.remoteAddress(baseUri.getHost(), getPort()); + httpBootstrap.group(group); bootstrapOptions(httpBootstrap); - httpBootstrap.handler(new ChannelInitializer() { + pool = new FixedChannelPool(httpBootstrap, new ChannelPoolHandler() { @Override - public void initChannel(SocketChannel ch) throws Exception { + public void channelCreated(Channel ch) throws Exception { + logger.trace("Channel Pool connection created: {}", ch); ChannelPipeline pipeline = ch.pipeline(); + pipeline.addFirst("logger", new LoggingHandler(HTTPLogger.class, LogLevel.TRACE, ByteBufFormat.SIMPLE)); addSSLIfRequired(pipeline, baseUri); pipeline.addLast("read-timeout", new ReadTimeoutHandler(READ_TIMEOUT_SEC)); pipeline.addLast(HTTP_CODEC, new HttpClientCodec()); pipeline.addLast(HTTP_AGGREGATOR, new HttpObjectAggregator(MAX_HTTP_REQUEST)); pipeline.addLast(HTTP_HANDLER, new NettyHttpClientHandler()); + logger.debug("pipeline names: {}", pipeline.names()); + ch.closeFuture().addListener(future -> logger.debug("Channel closed, {}", ch)); } - }); + + @Override + public void channelAcquired(Channel ch) { + logger.trace("Channel Pool connection acquired: {}", ch); + } + + @Override + public void channelReleased(Channel ch) { + logger.trace("Channel Pool connection released: {}", ch); + } + }, threadCount); } } private void bootstrapOptions(Bootstrap bootStrap) { - bootStrap.group(group); bootStrap.channel(NioSocketChannel.class); bootStrap.option(ChannelOption.TCP_NODELAY, true); bootStrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); @@ -153,9 +184,12 @@ private int getPort() { return port; } - protected ChannelFuture httpConnect() { - logger.debug("HTTP Connect uri: {}", baseUri); - return httpBootstrap.connect(baseUri.getHost(), getPort()); + protected Future poolAcquire() { + return pool.acquire(); + } + + protected void poolRelease(Channel ch) { + pool.release(ch); } @Override @@ -184,10 +218,16 @@ public void destroy() { } } if (group != null && !group.isShuttingDown()) { - logger.debug("group shutdownGracefully"); + logger.debug("wsGroup shutdownGracefully"); + group.shutdownGracefully(0, 1, TimeUnit.SECONDS).syncUninterruptibly(); + group = null; + logger.debug("wsGroup shutdown complete"); + } + if (group != null && !group.isShuttingDown()) { + logger.debug("httpGroup shutdownGracefully"); group.shutdownGracefully(0, 1, TimeUnit.SECONDS).syncUninterruptibly(); group = null; - logger.debug("group shutdown complete"); + logger.debug("httpGroup shutdown complete"); } }); shutDownGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS).syncUninterruptibly(); @@ -248,7 +288,7 @@ private HttpRequest buildRequest(String path, String method, List par request.content().clear().writeBytes(bbuf); } request.headers().set(HttpHeaderNames.HOST, baseUri.getHost()); - request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); + request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); request.headers().set(HttpHeaderNames.AUTHORIZATION, this.auth); HTTPLogger.traceRequest(request, body); return request; @@ -257,9 +297,9 @@ private HttpRequest buildRequest(String path, String method, List par private RestException makeException(HttpResponseStatus status, String response, List errors) { if (status == null && response == null) { - return new RestException("Client Shutdown"); + return new RestException("No HTTP Status and Response, timeout or client shutdown"); } else if (status == null) { - return new RestException("Client Shutdown: " + response); + return new RestException("Error: " + response); } if (errors != null) { @@ -278,7 +318,7 @@ private RestException makeException(HttpResponseStatus status, String response, public String httpActionSync(String uri, String method, List parametersQuery, String body, List errors) throws RestException { NettyHttpClientHandler handler = httpActionSyncHandler(uri, method, parametersQuery, body, errors); - return handler.getResponseText(); + return handler != null ? handler.getResponseText() : null; } // Synchronous HTTP action @@ -286,7 +326,7 @@ public String httpActionSync(String uri, String method, List paramete public byte[] httpActionSyncAsBytes(String uri, String method, List parametersQuery, String body, List errors) throws RestException { NettyHttpClientHandler handler = httpActionSyncHandler(uri, method, parametersQuery, body, errors, true); - return handler.getResponseBytes(); + return handler != null ? handler.getResponseBytes() : null; } private NettyHttpClientHandler httpActionSyncHandler(String uri, String method, List parametersQuery, @@ -298,33 +338,42 @@ private NettyHttpClientHandler httpActionSyncHandler(String uri, String method, String body, List errors, boolean binary) throws RestException { HttpRequest request = buildRequest(uri, method, parametersQuery, body); logger.debug("Sync Action - {} to {}", request.method(), request.uri()); - Channel ch = httpConnect().addListener((ChannelFutureListener) future -> { - if (future.isSuccess()) { - logger.debug("HTTP connected"); - replaceAggregator(binary, future.channel()); - } else if (future.cause() != null) { - logger.error("HTTP Connection Error - {}", future.cause().getMessage(), future.cause()); + Channel ch = poolAcquire().syncUninterruptibly().getNow(); + try { + replaceAggregator(binary, ch); + NettyHttpClientHandler handler = (NettyHttpClientHandler) ch.pipeline().get(HTTP_HANDLER); + if (handler == null) return null; + handler.reset(); + ch.writeAndFlush(request); + logger.debug("Wait for response..."); + handler.waitForResponse(READ_TIMEOUT_SEC); + if (handler.getException() != null) { + logger.debug("got an error: {}", handler.getException().toString()); + throw new RestException(handler.getException()); + } else if (httpResponseOkay(handler.getResponseStatus())) { + logger.debug("got OK response"); + return handler; } else { - logger.error("HTTP Connection Error - Unknown"); + logger.debug("...done waiting"); + throw makeException(handler.getResponseStatus(), handler.getResponseText(), errors); } - }).syncUninterruptibly().channel(); - NettyHttpClientHandler handler = (NettyHttpClientHandler) ch.pipeline().get(HTTP_HANDLER); - ch.writeAndFlush(request); - ch.closeFuture().syncUninterruptibly(); - if (handler.getException() != null) { - throw new RestException(handler.getException()); - } else if (httpResponseOkay(handler.getResponseStatus())) { - return handler; - } else { - throw makeException(handler.getResponseStatus(), handler.getResponseText(), errors); + } finally { + poolRelease(ch); } } private void replaceAggregator(boolean binary, Channel ch) { - if (binary) { - logger.debug("Is Binary, replace http-aggregator ..."); - ch.pipeline().replace( - HTTP_AGGREGATOR, HTTP_AGGREGATOR, new HttpObjectAggregator(MAX_HTTP_BIN_REQUEST)); + HttpObjectAggregator aggregator = (HttpObjectAggregator) ch.pipeline().get(HTTP_AGGREGATOR); + if (aggregator != null) { + if (binary && aggregator.maxContentLength() != MAX_HTTP_BIN_REQUEST) { + logger.debug("Replace http-aggregator with larger content length..."); + ch.pipeline().replace( + HTTP_AGGREGATOR, HTTP_AGGREGATOR, new HttpObjectAggregator(MAX_HTTP_BIN_REQUEST)); + } else if (!binary && aggregator.maxContentLength() != MAX_HTTP_REQUEST) { + logger.debug("Replace http-aggregator with smaller content length..."); + ch.pipeline().replace( + HTTP_AGGREGATOR, HTTP_AGGREGATOR, new HttpObjectAggregator(MAX_HTTP_REQUEST)); + } } } @@ -337,40 +386,52 @@ public void httpActionAsync(String uri, String method, List parameter final HttpRequest request = buildRequest(uri, method, parametersQuery, body); logger.debug("Async Action - {} to {}", request.method(), request.uri()); // Get future channel - ChannelFuture cf = httpConnect(); - cf.addListener((ChannelFutureListener) future1 -> { + Future cf = poolAcquire(); + cf.addListener((FutureListener) future1 -> { if (future1.isSuccess()) { - logger.debug("HTTP connected"); - Channel ch = future1.channel(); + Channel ch = future1.getNow(); + logger.debug("Channel, {}", ch); + group.execute(responseHandler::onChReadyToWrite); replaceAggregator(binary, ch); - responseHandler.onChReadyToWrite(); - ch.writeAndFlush(request); - ch.closeFuture().addListener((ChannelFutureListener) future2 -> { - responseHandler.onResponseReceived(); - if (future2.isSuccess()) { - NettyHttpClientHandler handler = (NettyHttpClientHandler) future2.channel().pipeline().get(HTTP_HANDLER); - if (handler.getException() != null) { - responseHandler.onFailure(new RestException(handler.getException())); - } else if (httpResponseOkay(handler.getResponseStatus())) { - if (binary) { - responseHandler.onSuccess(handler.getResponseBytes()); + final NettyHttpClientHandler handler = (NettyHttpClientHandler) ch.pipeline().get(HTTP_HANDLER); + handler.reset(); + ch.writeAndFlush(request).addListener(future2 -> + group.execute(() -> { + try { + logger.debug("Wait for response..."); + handler.waitForResponse(READ_TIMEOUT_SEC); + if (handler.getException() != null) { + logger.debug("got an error: {}", handler.getException().toString()); + onFailure(responseHandler, new RestException(handler.getException())); + } else if (httpResponseOkay(handler.getResponseStatus())) { + logger.debug("got OK response"); + if (binary) { + responseHandler.onSuccess(handler.getResponseBytes()); + } else { + responseHandler.onSuccess(handler.getResponseText()); + } } else { - responseHandler.onSuccess(handler.getResponseText()); + logger.debug("...done waiting"); + onFailure(responseHandler, makeException(handler.getResponseStatus(), handler.getResponseText(), errors)); } - } else { - responseHandler.onFailure(makeException(handler.getResponseStatus(), handler.getResponseText(), errors)); + } finally { + poolRelease(ch); } - } else { - responseHandler.onFailure(future2.cause()); - } - }); + }) + ); } else { - responseHandler.onFailure(future1.cause()); + onFailure(responseHandler, future1.cause()); } }); } // WsClient implementation - connect to WebSocket server + private void onFailure(HttpResponseHandler responseHandler, Throwable cause) { + if (!group.isShuttingDown()) { + group.execute(() -> responseHandler.onFailure(cause)); + } + } + @Override public WsClientConnection connect(final HttpResponseHandler callback, final String url, final List lParamQuery) throws RestException { if (isWsConnected()) { @@ -381,7 +442,7 @@ public WsClientConnection connect(final HttpResponseHandler callback, final Stri logger.debug("WS Connect uri: {}", handshake.uri()); this.wsEventsUrl = url; this.wsEventsParamQuery = lParamQuery; - this.wsHandler = new NettyWSClientHandler(handshake, callback, this); + this.wsHandler = new NettyWSClientHandler(handshake, this); this.wsCallback = callback; return connect(new Bootstrap(), callback); } catch (Exception e) { @@ -390,12 +451,14 @@ public WsClientConnection connect(final HttpResponseHandler callback, final Stri } protected WsClientConnection connect(Bootstrap wsBootStrap, final HttpResponseHandler callback) { + wsBootStrap.group(group); bootstrapOptions(wsBootStrap); wsBootStrap.handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); addSSLIfRequired(pipeline, baseUri); + pipeline.addFirst("logger", new LoggingHandler(HTTPLogger.class, LogLevel.TRACE, ByteBufFormat.SIMPLE)); pipeline.addLast(HTTP_CODEC, new HttpClientCodec()); pipeline.addLast(HTTP_AGGREGATOR, new HttpObjectAggregator(MAX_HTTP_REQUEST)); pipeline.addLast("ws-handler", wsHandler); @@ -419,7 +482,9 @@ public void operationComplete(ChannelFuture future) throws Exception { // start a ping and reset reconnect counter startPing(); reconnectCount = 0; - callback.onChReadyToWrite(); + if (!group.isShuttingDown()) { + group.execute(callback::onChReadyToWrite); + } } else { if (future.cause() != null) { logger.error("WS Upgrade Error - {}", future.cause().getMessage(), future.cause()); @@ -537,8 +602,11 @@ private boolean httpResponseOkay(HttpResponseStatus status) { || HttpResponseStatus.CREATED.equals(status); } - @Override public void reconnectWs(Throwable cause) { + // send the disconnect callback + if (!group.isShuttingDown()) { + group.execute(() -> wsCallback.onDisconnect()); + } // cancel the ping timer if (wsPingTimer != null) { wsPingTimer.cancel(true); @@ -547,7 +615,9 @@ public void reconnectWs(Throwable cause) { if (!autoReconnect || (maxReconnectCount > -1 && reconnectCount >= maxReconnectCount)) { logger.warn("Cannot connect: {} - executing failure callback", cause.getMessage()); - wsCallback.onFailure(cause); + if (!group.isShuttingDown()) { + group.execute(() -> wsCallback.onFailure(cause)); + } return; } @@ -565,14 +635,31 @@ public void reconnectWs(Throwable cause) { // then connect again connect(wsCallback, wsEventsUrl, wsEventsParamQuery); } catch (RestException e) { - wsCallback.onFailure(e); + group.execute(() -> wsCallback.onFailure(e)); } }, timeout, TimeUnit.SECONDS); } } - @Override - public void pong() { + public void onWSResponseReceived() { + if (!group.isShuttingDown()) { + group.execute(() -> wsCallback.onResponseReceived()); + } + } + + public void onWSSuccess(String text) { + if (!group.isShuttingDown()) { + group.execute(() -> wsCallback.onSuccess(text)); + } + } + + public void onWSFailure(Throwable cause) { + if (!group.isShuttingDown()) { + group.execute(() -> wsCallback.onFailure(cause)); + } + } + + public void onWSPong() { lastPong = System.currentTimeMillis(); } diff --git a/src/main/java/ch/loway/oss/ari4java/tools/http/NettyHttpClientHandler.java b/src/main/java/ch/loway/oss/ari4java/tools/http/NettyHttpClientHandler.java index b6a6b866..474e7b6a 100755 --- a/src/main/java/ch/loway/oss/ari4java/tools/http/NettyHttpClientHandler.java +++ b/src/main/java/ch/loway/oss/ari4java/tools/http/NettyHttpClientHandler.java @@ -13,6 +13,8 @@ import org.slf4j.LoggerFactory; import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * HttpClientHandler handles the asynchronous response from the remote @@ -26,10 +28,12 @@ public class NettyHttpClientHandler extends SimpleChannelInboundHandler protected byte[] responseBytes; protected HttpResponseStatus responseStatus; private Throwable exception; + private CountDownLatch latch = new CountDownLatch(1); - private Logger logger = LoggerFactory.getLogger(NettyHttpClientHandler.class); + private final Logger logger = LoggerFactory.getLogger(NettyHttpClientHandler.class); public void reset() { + latch = new CountDownLatch(1); responseBytes = null; responseStatus = null; } @@ -48,9 +52,11 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Except response.headers().getInt(HttpHeaderNames.CONTENT_LENGTH), responseBytes.length); } } else if (msg != null) { + latch.countDown(); logger.warn("Unexpected: {}", msg); throw new RestException("Unknown object: " + msg.getClass().getSimpleName() + ", expecting FullHttpResponse"); } + latch.countDown(); } public String getResponseText() { @@ -76,7 +82,7 @@ public Throwable getException() { } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { this.exception = cause; if (!(cause instanceof ReadTimeoutException)) { logger.error("Not a read timeout", cause); @@ -85,5 +91,13 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E ctx.close(); } + public void waitForResponse(int timeout) { + try { + latch.await(timeout, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } diff --git a/src/main/java/ch/loway/oss/ari4java/tools/http/NettyWSClientHandler.java b/src/main/java/ch/loway/oss/ari4java/tools/http/NettyWSClientHandler.java index 608a9a94..ec0d18fb 100755 --- a/src/main/java/ch/loway/oss/ari4java/tools/http/NettyWSClientHandler.java +++ b/src/main/java/ch/loway/oss/ari4java/tools/http/NettyWSClientHandler.java @@ -24,19 +24,13 @@ public class NettyWSClientHandler extends NettyHttpClientHandler { final WebSocketClientHandshaker handshaker; private ChannelPromise handshakeFuture; - final HttpResponseHandler wsCallback; - private WsClientAutoReconnect wsClient = null; + private NettyHttpClient client = null; private boolean shuttingDown = false; - private Logger logger = LoggerFactory.getLogger(NettyWSClientHandler.class); + private final Logger logger = LoggerFactory.getLogger(NettyWSClientHandler.class); - public NettyWSClientHandler(WebSocketClientHandshaker handshaker, HttpResponseHandler wsCallback, WsClientAutoReconnect wsClient) { - this(handshaker, wsCallback); - this.wsClient = wsClient; - } - - public NettyWSClientHandler(WebSocketClientHandshaker handshaker, HttpResponseHandler wsCallback) { + public NettyWSClientHandler(WebSocketClientHandshaker handshaker, NettyHttpClient client) { this.handshaker = handshaker; - this.wsCallback = wsCallback; + this.client = client; } public ChannelFuture handshakeFuture() { @@ -44,21 +38,20 @@ public ChannelFuture handshakeFuture() { } @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + public void handlerAdded(ChannelHandlerContext ctx) { handshakeFuture = ctx.newPromise(); } @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { + public void channelActive(ChannelHandlerContext ctx) { handshaker.handshake(ctx.channel()); } @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - if (!shuttingDown && this.wsClient != null) { + public void channelInactive(ChannelHandlerContext ctx) { + if (!shuttingDown) { logger.debug("WS channel inactive - {}", ctx); - wsCallback.onDisconnect(); - wsClient.reconnectWs(new RestException("WS channel inactive")); + client.reconnectWs(new RestException("WS channel inactive")); } } @@ -84,25 +77,21 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Except } // call this so we can set the last received time - wsCallback.onResponseReceived(); + client.onWSResponseReceived(); if (msg instanceof TextWebSocketFrame) { TextWebSocketFrame textFrame = (TextWebSocketFrame) msg; String text = textFrame.content().toString(ARIEncoder.ENCODING); HTTPLogger.traceWebSocketFrame(text); responseBytes = text.getBytes(ARIEncoder.ENCODING); - wsCallback.onSuccess(text); + client.onWSSuccess(text); } else if (msg instanceof CloseWebSocketFrame) { ch.close(); if (!shuttingDown) { - if (this.wsClient != null) { - wsClient.reconnectWs(new RestException("CloseWebSocketFrame received")); - } else { - wsCallback.onDisconnect(); - } + client.reconnectWs(new RestException("CloseWebSocketFrame received")); } } else if (msg instanceof PongWebSocketFrame) { - wsClient.pong(); + client.onWSPong(); } else { HTTPLogger.traceWebSocketFrame(msg.toString()); String error = "Not expecting: " + msg.getClass().getSimpleName(); @@ -112,7 +101,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Except } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { if (!shuttingDown) return; logger.error("exceptionCaught: {}", cause.getMessage(), cause); @@ -121,7 +110,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E } ctx.fireExceptionCaught(cause); ctx.close(); - wsCallback.onFailure(cause); + client.onWSFailure(cause); } public boolean isShuttingDown() { diff --git a/src/test/java/ch/loway/oss/ari4java/tools/http/NettyHttpClientTest.java b/src/test/java/ch/loway/oss/ari4java/tools/http/NettyHttpClientTest.java index 9f589082..312d8fb7 100755 --- a/src/test/java/ch/loway/oss/ari4java/tools/http/NettyHttpClientTest.java +++ b/src/test/java/ch/loway/oss/ari4java/tools/http/NettyHttpClientTest.java @@ -11,8 +11,10 @@ import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.http.*; import io.netty.handler.codec.http.websocketx.WebSocketVersion; +import io.netty.util.concurrent.Future; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import java.net.URI; import java.net.URISyntaxException; @@ -20,7 +22,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; @@ -29,20 +34,36 @@ public class NettyHttpClientTest { private NettyHttpClient client; private ChannelFuture cf; + private Future fc; + private final ExecutorService executor = Executors.newSingleThreadExecutor(); - private void setupTestClient(boolean init) throws URISyntaxException { + private void setupTestClient(boolean init, Channel ch) throws URISyntaxException { client = new NettyHttpClient() { protected void initHttpBootstrap() { // for testing skip the bootstrapping } - protected ChannelFuture httpConnect() { - return cf; + @Override + protected Future poolAcquire() { + return fc; + } + + @Override + protected void poolRelease(Channel ch) { + // for testing skip } }; if (init) { client.initialize("http://localhost:8088/", "user", "p@ss"); } + if (ch instanceof EmbeddedChannel) { + fc = ch.eventLoop().newSucceededFuture(ch); + } else { + //noinspection unchecked + fc = (Future) mock(Future.class); + when(fc.syncUninterruptibly()).thenReturn(fc); + when(fc.getNow()).thenReturn(ch); + } } @AfterEach @@ -54,21 +75,21 @@ public void tearDown() { @Test public void testInitializeBadURL() throws URISyntaxException { - setupTestClient(false); + setupTestClient(false, null); assertThrows(URISyntaxException.class, () -> client.initialize(":", "", "")); } @Test public void testInitializeInvalidURL() throws URISyntaxException { - setupTestClient(false); + setupTestClient(false, null); assertThrows(IllegalArgumentException.class, () -> client.initialize("ws://localhost:8088/", "", "")); } @Test public void testBuildURL() throws Exception { - setupTestClient(true); + setupTestClient(true, null); List queryParams = new ArrayList<>(); queryParams.add(HttpParam.build("a", "b/c")); queryParams.add(HttpParam.build("d", "e")); @@ -84,36 +105,13 @@ public void testInitialize() throws Exception { assertNotNull(client.baseUri); } - @Test - public void testHttpConnect() { - Bootstrap bootstrap = mock(Bootstrap.class); - NettyHttpClient client = new NettyHttpClient() { - { - initHttpBootstrap(); - } - @Override - protected void initHttpBootstrap() { - httpBootstrap = bootstrap; - try { - baseUri = new URI("http://localhost:8088/"); - } catch (URISyntaxException e) { - // oh well - } - } - }; - when(bootstrap.connect(eq("localhost"), eq(8088))).thenReturn(mock(ChannelFuture.class)); - ChannelFuture future = client.httpConnect(); - assertNotNull(future, "Expected ChannelFuture"); - verify(bootstrap, times(1)).connect(anyString(), anyInt()); - client.destroy(); - } - @Test public void testWsConnect() throws Exception { Bootstrap bootstrap = mock(Bootstrap.class); NettyWSClientHandler testHandler = mock(NettyWSClientHandler.class); + ChannelFuture handshakeFuture = mock(ChannelFuture.class); + when(testHandler.handshakeFuture()).thenReturn(handshakeFuture); EmbeddedChannel channel = createTestChannel("ws-handler", testHandler); - FullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/events"); HttpHeaders headers = req.headers(); headers.set(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET); @@ -122,11 +120,12 @@ public void testWsConnect() throws Exception { cf = channel.closeFuture(); when(bootstrap.connect(eq("localhost"), eq(443))).thenReturn(cf); ((DefaultChannelPromise) cf).setSuccess(null); - + HttpResponseHandler testWsCallback = mock(HttpResponseHandler.class); + when(testWsCallback.getLastResponseTime()).thenReturn(0L); class TestNettyHttpClient extends NettyHttpClient { @Override public WsClientConnection connect(final HttpResponseHandler callback, final String url, - final List lParamQuery) throws RestException { + final List lParamQuery) { try { baseUri = new URI("https://localhost/"); this.auth = "123"; @@ -136,38 +135,43 @@ public WsClientConnection connect(final HttpResponseHandler callback, final Stri } pingPeriod = 1; pingTimeUnit = TimeUnit.SECONDS; + wsHandler = testHandler; + wsCallback = testWsCallback; return connect(bootstrap, callback); } -// public void testWsFutureOperationComplete(ChannelFuture future) throws Exception { -// this.wsHandler = testHandler; -// wsFuture.operationComplete(future); -// } } TestNettyHttpClient client = new TestNettyHttpClient(); WsClient.WsClientConnection connection = client.connect(mock(HttpResponseHandler.class), "/events", null); assertNotNull(connection, "Expected WsClientConnection"); channel.writeInbound(req); - Thread.sleep(10000); + ArgumentCaptor captor = ArgumentCaptor.forClass(ChannelFutureListener.class); + verify(handshakeFuture).addListener(captor.capture()); + captor.getValue().operationComplete(channel.newSucceededFuture()); + client.onWSPong(); + Thread.sleep(2500); + System.out.println(channel.inboundMessages().toString()); + channel.readOutbound(); client.destroy(); } - private void setupSync(NettyHttpClientHandler h) { - cf = mock(ChannelFuture.class); - when(cf.addListener(any())).thenReturn(cf); - when(cf.syncUninterruptibly()).thenReturn(cf); + @Test + public void testHttpActionSync() throws Exception { Channel ch = mock(Channel.class); - when(ch.closeFuture()).thenReturn(cf); - when(cf.channel()).thenReturn(ch); ChannelPipeline p = mock(ChannelPipeline.class); - when(p.get("http-handler")).thenReturn(h); when(ch.pipeline()).thenReturn(p); - } + setupTestClient(true, ch); + NettyHttpClientHandler h = new NettyHttpClientHandler() { + @Override + public void reset() { + // dont reset for this test + } - @Test - public void testHttpActionSync() throws Exception { - setupTestClient(true); - NettyHttpClientHandler h = new NettyHttpClientHandler(); - setupSync(h); + @Override + public void waitForResponse(int timeout) { + // dont wait for test + } + }; + when(p.get("http-handler")).thenReturn(h); h.responseStatus = HttpResponseStatus.OK; h.responseBytes = "testing".getBytes(ARIEncoder.ENCODING); String res = client.httpActionSync("", "GET", null, null, null); @@ -189,11 +193,25 @@ private EmbeddedChannel createTestChannel(String name, ChannelHandler handler) { return channel; } + private void delayedWriteInbound(EmbeddedChannel channel, HttpResponseStatus status, String data) { + delayedWriteInbound(channel, status, data, 200); + } + + private void delayedWriteInbound(EmbeddedChannel channel, HttpResponseStatus status, String data, long sleep) { + executor.submit(() -> { + try { + Thread.sleep(sleep); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + channel.writeInbound(new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, status, Unpooled.copiedBuffer(data, ARIEncoder.ENCODING))); + }); + } + private AsteriskPingGetRequest pingSetup(EmbeddedChannel channel) { - channel.writeInbound(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, - Unpooled.copiedBuffer( - "{\"ping\":\"pong\",\"timestamp\":\"2020-01-01T00:00:00.000+0000\",\"asterisk_id\":\"test_asterisk\"}", - ARIEncoder.ENCODING))); + delayedWriteInbound(channel, HttpResponseStatus.OK, + "{\"ping\":\"pong\",\"timestamp\":\"2020-01-01T00:00:00.000+0000\",\"asterisk_id\":\"test_asterisk\"}", 500); AsteriskPingGetRequest_impl_ari_6_0_0 req = new AsteriskPingGetRequest_impl_ari_6_0_0(); req.setHttpClient(client); return req; @@ -210,8 +228,8 @@ private void pingValidate(EmbeddedChannel channel, AsteriskPing res) { @Test public void testHttpActionSyncPing() throws Exception { - setupTestClient(true); EmbeddedChannel channel = createTestChannel(); + setupTestClient(true, channel); AsteriskPingGetRequest req = pingSetup(channel); AsteriskPing res = req.execute(); pingValidate(channel, res); @@ -219,15 +237,15 @@ public void testHttpActionSyncPing() throws Exception { @Test public void testHttpActionAsyncPing() throws Exception { - setupTestClient(true); EmbeddedChannel channel = createTestChannel(); + setupTestClient(true, channel); AsteriskPingGetRequest req = pingSetup(channel); - final boolean[] callback = {false}; + final AtomicBoolean callback = new AtomicBoolean(false); req.execute(new AriCallback() { @Override public void onSuccess(AsteriskPing res) { pingValidate(channel, res); - callback[0] = true; + callback.set(true); } @Override @@ -235,19 +253,19 @@ public void onFailure(RestException e) { fail(e.toString()); } }); + Thread.sleep(550); channel.runPendingTasks(); - assertTrue(callback[0], "No onSuccess Callback"); + assertTrue(callback.get(), "No onSuccess Callback"); } @Test public void testHttpActionException() throws Exception { - setupTestClient(true); EmbeddedChannel channel = createTestChannel(); + setupTestClient(true, channel); ApplicationsGetRequest_impl_ari_6_0_0 req = new ApplicationsGetRequest_impl_ari_6_0_0("test"); req.setHttpClient(client); // when the response is JSON error then return the error from the server - channel.writeInbound(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, - Unpooled.copiedBuffer("{\"message\":\"a test error\"}", ARIEncoder.ENCODING))); + delayedWriteInbound(channel, HttpResponseStatus.NOT_FOUND, "{\"message\":\"a test error\"}"); boolean exception = false; try { req.execute(); @@ -257,8 +275,7 @@ public void testHttpActionException() throws Exception { } assertTrue(exception, "Expecting an exception"); // when the response is not JSON and there is an error definition from the API then return API definition - channel.writeInbound(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, - Unpooled.copiedBuffer("Not found", ARIEncoder.ENCODING))); + delayedWriteInbound(channel, HttpResponseStatus.NOT_FOUND, "Not found"); exception = false; try { req.execute(); @@ -271,10 +288,9 @@ public void testHttpActionException() throws Exception { @Test public void testBodyFieldSerialisation() throws Exception { - setupTestClient(true); EmbeddedChannel channel = createTestChannel(); - channel.writeInbound(new DefaultFullHttpResponse( - HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer("[]", ARIEncoder.ENCODING))); + setupTestClient(true, channel); + delayedWriteInbound(channel, HttpResponseStatus.OK, "[]"); AsteriskUpdateObjectPutRequest_impl_ari_6_0_0 req = new AsteriskUpdateObjectPutRequest_impl_ari_6_0_0( "cc", "ot", "id"); req.setHttpClient(client); @@ -284,11 +300,9 @@ public void testBodyFieldSerialisation() throws Exception { @Test public void testBodyVariableSerialisation() throws Exception { - setupTestClient(true); EmbeddedChannel channel = createTestChannel(); - channel.writeInbound(new DefaultFullHttpResponse( - HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer("{}", ARIEncoder.ENCODING))); - + setupTestClient(true, channel); + delayedWriteInbound(channel, HttpResponseStatus.OK, "{}"); EndpointsSendMessagePutRequest_impl_ari_6_0_0 req = new EndpointsSendMessagePutRequest_impl_ari_6_0_0("to", "from"); req.setHttpClient(client); req.addVariables("key1", "val1").addVariables("key2", "val2").execute(); @@ -297,11 +311,9 @@ public void testBodyVariableSerialisation() throws Exception { @Test public void testBodyObjectSerialisation() throws Exception { - setupTestClient(true); - EmbeddedChannel channel = createTestChannel(); - channel.writeInbound(new DefaultFullHttpResponse( - HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer("{}", ARIEncoder.ENCODING))); - + final EmbeddedChannel channel = createTestChannel(); + setupTestClient(true, channel); + delayedWriteInbound(channel, HttpResponseStatus.OK, "{}"); Map map = new HashMap<>(); map.put("key1", "val1"); map.put("key2", "val2");