8000 Reading an InputStream directly line by line by Crystark · Pull Request #30 · ReactiveX/RxJavaString · GitHub
[go: up one dir, main page]

Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package rx.internal.operators;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;

import rx.Subscriber;
import rx.observables.AbstractOnSubscribe;

public final class OnSubscribeInputStreamToLines extends AbstractOnSubscribe<String, BufferedReader> {
private final InputStream is;
private final Charset ch;

public OnSubscribeInputStreamToLines(InputStream is) {
this(is, Charset.defaultCharset());
}

public OnSubscribeInputStreamToLines(InputStream is, Charset ch) {
this.is = is;
this.ch = ch;
}

@Override
protected BufferedReader onSubscribe(Subscriber<? super String> subscriber) {
return new BufferedReader(new InputStreamReader(is, ch));
}

@Override
protected void onTerminated(BufferedReader state) {
try {
state.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
protected void next(SubscriptionState<String, BufferedReader> state) {
BufferedReader reader = state.state();
try {
String line = reader.readLine();
if (line == null) {
state.onCompleted();
}
else {
state.onNext(line);
}
} catch (IOException e) {
state.onError(e);
}
}
}
14 changes: 14 additions & 0 deletions src/main/java/rx/observables/StringObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import rx.functions.Func1;
import rx.functions.Func2;
import rx.internal.operators.OnSubscribeInputStream;
import rx.internal.operators.OnSubscribeInputStreamToLines;
import rx.internal.operators.OnSubscribeReader;

import java.io.Closeable;
Expand Down Expand Up @@ -150,6 +151,19 @@ public static Observable<String> from(final Reader reader, final int size) {
return Observable.create(new OnSubscribeReader(reader, size));
}

/**
* Reads lines from a source {@link InputStream} and outputs {@link Observable} of {@code String}s. Supports backpressure.
*
* @param is
* Source {@link InputStream}
* @param ch
* Charset to be used to read the source {@link InputStream}
* @return the Observable containing the split lines of the source
*/
public static Observable<String> byLine(InputStream is, Charset ch) {
return Observable.create(new OnSubscribeInputStreamToLines(is, ch));
}

/**
* Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams
* and where handles when a multibyte character spans two chunks.
Expand Down
0