8000 fix: use native support for Rx request instead of callback emitter (#… · influxdata/influxdb-client-java@f5582aa · GitHub
[go: up one dir, main page]

Skip to content

Commit f5582aa

Browse files
authored
fix: use native support for Rx request instead of callback emitter (#302)
1 parent ba01a1b commit f5582aa

File tree

4 files changed

+28
-16
lines changed

4 files changed

+28
-16
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
## 4.3.0 [unreleased]
22

3+
### Bug Fixes
4+
1. [#300](https://github.com/influxdata/influxdb-client-java/pull/300): Uses native support for Rx requests to better performance
5+
36
## 4.2.0 [2022-02-04]
47

58
### Bug Fixes

client/pom.xml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,25 @@
127127
<artifactId>rxjava</artifactId>
128128
</dependency>
129129

130+
<dependency>
131+
<groupId>com.squareup.retrofit2</groupId>
132+
<artifactId>adapter-rxjava2</artifactId>
133+
<exclusions>
134+
<exclusion>
135+
<groupId>com.squareup.okhttp3</groupId>
136+
<artifactId>okhttp</artifactId>
137+
</exclusion>
138+
<exclusion>
139+
<groupId>com.squareup.retrofit2</groupId>
140+
<artifactId>retrofit</artifactId>
141+
</exclusion>
142+
<exclusion>
143+
<groupId>io.reactivex.rxjava2</groupId>
144+
<artifactId>rxjava</artifactId>
145+
</exclusion>
146+
</exclusions>
147+
</dependency>
148+
130149
<dependency>
131150
<groupId>io.swagger</groupId>
132151
<artifactId>swagger-annotations</artifactId>

client/src/main/java/com/influxdb/client/internal/AbstractWriteClient.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@
5858
import io.reactivex.processors.PublishProcessor;
5959
import io.reactivex.subjects.PublishSubject;
6060
import org.reactivestreams.Publisher;
61-
import retrofit2.Call;
62-
import retrofit2.Callback;
6361
import retrofit2.HttpException;
6462
import retrofit2.Response;
6563

@@ -434,21 +432,11 @@ public Maybe<Notification<Response>> apply(final BatchWriteItem batchWrite) {
434432
String bucket = batchWrite.batchWriteOptions.bucket;
435433
WritePrecision precision = batchWrite.batchWriteOptions.precision;
436434

437-
Maybe<Response<Void>> requestSource = Maybe.create(emitter -> service
438-
.postWrite(organization, bucket, content, null,
435+
Maybe<Response<Void>> requestSource = service
436+
.postWriteRx(organization, bucket, content, null,
439437
"identity", "text/plain; charset=utf-8", null,
440438
"application/json", null, precision)
441-
.enqueue(new Callback<Void>() {
442-
@Override
443-
public void onResponse(@Nonnull final Call<Void> call, @Nonnull final Response<Void> response) {
444-
emitter.onSuccess(response);
445-
}
446-
447-
@Override
448-
public void onFailure(@Nonnull final Call<Void> call, @Nonnull final Throwable throwable) {
449-
emitter.onError(throwable);
450-
}
451-
}));
439+
.toMaybe();
452440

453441
return requestSource
454442
//

client/src/main/java/com/influxdb/client/internal/InfluxDBClientImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
*/
2222
package com.influxdb.client.internal;
2323

24+
import java.util.Collections;
2425
import java.util.logging.Level;
2526
import java.util.logging.Logger;
2627
import javax.annotation.Nonnull;
@@ -78,6 +79,7 @@
7879
import com.influxdb.utils.Arguments;
7980

8081
import retrofit2.Call;
82+
import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory;
8183

8284
/**
8385
* @author Jakub Bednar (bednar@github) (11/10/2018 09:36)
@@ -92,7 +94,7 @@ public final class InfluxDBClientImpl extends AbstractInfluxDBClient implements
9294

9395
public InfluxDBClientImpl(@Nonnull final InfluxDBClientOptions options) {
9496

95-
super(options, "java");
97+
super(options, "java", Collections.singletonList(RxJava2CallAdapterFactory.create()));
9698

9799
setupService = retrofit.create(SetupService.class);
98100
readyService = retrofit.create(ReadyService.class);

0 commit comments

Comments
 (0)
0