diff --git a/src/main/java/rx/internal/operators/OnSubscribeInputStreamToLines.java b/src/main/java/rx/internal/operators/OnSubscribeInputStreamToLines.java new file mode 100644 index 0000000..e8585ac --- /dev/null +++ b/src/main/java/rx/internal/operators/OnSubscribeInputStreamToLines.java @@ -0,0 +1,54 @@ +package rx.internal.operators; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.Charset; + +import rx.Subscriber; +import rx.observables.AbstractOnSubscribe; + +public final class OnSubscribeInputStreamToLines extends AbstractOnSubscribe { + private final InputStream is; + private final Charset ch; + + public OnSubscribeInputStreamToLines(InputStream is) { + this(is, Charset.defaultCharset()); + } + + public OnSubscribeInputStreamToLines(InputStream is, Charset ch) { + this.is = is; + this.ch = ch; + } + + @Override + protected BufferedReader onSubscribe(Subscriber subscriber) { + return new BufferedReader(new InputStreamReader(is, ch)); + } + + @Override + protected void onTerminated(BufferedReader state) { + try { + state.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + protected void next(SubscriptionState state) { + BufferedReader reader = state.state(); + try { + String line = reader.readLine(); + if (line == null) { + state.onCompleted(); + } + else { + state.onNext(line); + } + } catch (IOException e) { + state.onError(e); + } + } +} diff --git a/src/main/java/rx/observables/StringObservable.java b/src/main/java/rx/observables/StringObservable.java index f474252..d9aac80 100644 --- a/src/main/java/rx/observables/StringObservable.java +++ b/src/main/java/rx/observables/StringObservable.java @@ -24,6 +24,7 @@ import rx.functions.Func1; import rx.functions.Func2; import rx.internal.operators.OnSubscribeInputStream; +import rx.internal.operators.OnSubscribeInputStreamToLines; import rx.internal.operators.OnSubscribeReader; import java.io.Closeable; @@ -150,6 +151,19 @@ public static Observable from(final Reader reader, final int size) { return Observable.create(new OnSubscribeReader(reader, size)); } + /** + * Reads lines from a source {@link InputStream} and outputs {@link Observable} of {@code String}s. Supports backpressure. + * + * @param is + * Source {@link InputStream} + * @param ch + * Charset to be used to read the source {@link InputStream} + * @return the Observable containing the split lines of the source + */ + public static Observable byLine(InputStream is, Charset ch) { + return Observable.create(new OnSubscribeInputStreamToLines(is, ch)); + } + /** * Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams * and where handles when a multibyte character spans two chunks.