From 64b6ff5fa0fd84916166467833c46b10d3f4328d Mon Sep 17 00:00:00 2001 From: Crystark Date: Tue, 15 Sep 2015 11:46:06 +0200 Subject: [PATCH 1/2] Added a way to read an InputStream by lines --- .../OnSubscribeInputStreamToLines.java | 54 + .../java/rx/observables/StringObservable.java | 1213 +++++++++-------- 2 files changed, 667 insertions(+), 600 deletions(-) create mode 100644 src/main/java/rx/internal/operators/OnSubscribeInputStreamToLines.java diff --git a/src/main/java/rx/internal/operators/OnSubscribeInputStreamToLines.java b/src/main/java/rx/internal/operators/OnSubscribeInputStreamToLines.java new file mode 100644 index 0000000..e8585ac --- /dev/null +++ b/src/main/java/rx/internal/operators/OnSubscribeInputStreamToLines.java @@ -0,0 +1,54 @@ +package rx.internal.operators; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.Charset; + +import rx.Subscriber; +import rx.observables.AbstractOnSubscribe; + +public final class OnSubscribeInputStreamToLines extends AbstractOnSubscribe { + private final InputStream is; + private final Charset ch; + + public OnSubscribeInputStreamToLines(InputStream is) { + this(is, Charset.defaultCharset()); + } + + public OnSubscribeInputStreamToLines(InputStream is, Charset ch) { + this.is = is; + this.ch = ch; + } + + @Override + protected BufferedReader onSubscribe(Subscriber subscriber) { + return new BufferedReader(new InputStreamReader(is, ch)); + } + + @Override + protected void onTerminated(BufferedReader state) { + try { + state.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + protected void next(SubscriptionState state) { + BufferedReader reader = state.state(); + try { + String line = reader.readLine(); + if (line == null) { + state.onCompleted(); + } + else { + state.onNext(line); + } + } catch (IOException e) { + state.onError(e); + } + } +} diff --git a/src/main/java/rx/observables/StringObservable.java b/src/main/java/rx/observables/StringObservable.java index f474252..2927066 100644 --- a/src/main/java/rx/observables/StringObservable.java +++ b/src/main/java/rx/observables/StringObservable.java @@ -1,600 +1,613 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.observables; - -import rx.Observable; -import rx.Observable.Operator; -import rx.Producer; -import rx.Subscriber; -import rx.functions.Action1; -import rx.functions.Func0; -import rx.functions.Func1; -import rx.functions.Func2; -import rx.internal.operators.OnSubscribeInputStream; -import rx.internal.operators.OnSubscribeReader; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.io.Reader; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.CharsetDecoder; -import java.nio.charset.CharsetEncoder; -import java.nio.charset.CoderResult; -import java.nio.charset.CodingErrorAction; -import java.util.Arrays; -import java.util.concurrent.Callable; -import java.util.regex.Pattern; - -public class StringObservable { - /** - * Reads bytes from a source {@link InputStream} and outputs {@link Observable} of - * {@code byte[]}s. Supports backpressure. - *

- * - * - * @param i - * Source {@link InputStream} - * @return the Observable containing read byte arrays from the input - */ - public static Observable from(final InputStream i) { - return from(i, 8 * 1024); - } - - /** - * Func0 that allows throwing an {@link IOException}s commonly thrown during IO operations. - * @see StringObservable#using(UnsafeFunc0, Func1) - * - * @param - */ - public static interface UnsafeFunc0 extends Callable { - public R call() throws Exception; - } - - /** - * Helps in creating an Observable that automatically calls {@link Closeable#close()} on completion, error or unsubscribe. - * - *

-     * StringObservable.using(() -> new FileReader(file), (reader) -> StringObservable.from(reader))
-     * 
- * - * @param resourceFactory - * Generates a new {@link Closeable} resource for each new subscription to the returned Observable - * @param observableFactory - * Converts the {@link Closeable} resource into a {@link Observable} with {@link #from(InputStream)} or {@link #from(Reader)} - * @return - * An {@link Observable} that automatically closes the resource when done. - */ - public static Observable using(final UnsafeFunc0 resourceFactory, - final Func1> observableFactory) { - return Observable.using(new Func0() { - @Override - public S call() { - try { - return resourceFactory.call(); - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - }, observableFactory, new Action1() { - @Override - public void call(S resource) { - try { - resource.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - }, true); - } - - /** - * Reads bytes from a source {@link InputStream} and outputs {@link Observable} of - * {@code byte[]}s. Supports backpressure. - *

- * - * - * @param is - * Source {@link InputStream} - * @param size - * internal buffer size - * @return the Observable containing read byte arrays from the input - */ - public static Observable from(final InputStream is, final int size) { - return Observable.create(new OnSubscribeInputStream(is, size)); - } - - /** - * Reads characters from a source {@link Reader} and outputs {@link Observable} of - * {@link String}s. Supports backpressure. - *

- * - * - * @param i - * Source {@link Reader} - * @return the Observable of Strings read from the source - */ - public static Observable from(final Reader i) { - return from(i, 8 * 1024); - } - - /** - * Reads characters from a source {@link Reader} and outputs {@link Observable} of - * {@link String}s. Supports backpressure. - *

- * - * - * @param i - * Source {@link Reader} - * @param size - * internal buffer size - * @return the Observable of Strings read from the source - */ - public static Observable from(final Reader reader, final int size) { - return Observable.create(new OnSubscribeReader(reader, size)); - } - - /** - * Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams - * and where handles when a multibyte character spans two chunks. - *

- * - * - * @param src - * @param charsetName - * @return the Observable returning a stream of decoded strings - */ - public static Observable decode(Observable src, String charsetName) { - return decode(src, Charset.forName(charsetName)); - } - - /** - * Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams - * and where handles when a multibyte character spans two chunks. - *

- * - * - * @param src - * @param charset - * @return the Observable returning a stream of decoded strings - */ - public static Observable decode(Observable src, Charset charset) { - return decode(src, charset.newDecoder().onMalformedInput(CodingErrorAction.REPLACE).onUnmappableCharacter(CodingErrorAction.REPLACE)); - } - - /** - * Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams - * and handles when a multibyte character spans two chunks. - * This method allows for more control over how malformed and unmappable characters are handled. - *

- * - * - * @param src - * @param charsetDecoder - * @return the Observable returning a stream of decoded strings - */ - public static Observable decode(final Observable src, final CharsetDecoder charsetDecoder) { - return src.lift(new Operator() { - @Override - public Subscriber call(final Subscriber o) { - return new Subscriber(o) { - private ByteBuffer leftOver = null; - - @Override - public void onCompleted() { - if (process(null, leftOver, true)) - o.onCompleted(); - } - - @Override - public void onError(Throwable e) { - if (process(null, leftOver, true)) - o.onError(e); - } - - @Override - public void onNext(byte[] bytes) { - process(bytes, leftOver, false); - } - - public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) { - if (o.isUnsubscribed()) - return false; - - ByteBuffer bb; - if (last != null) { - if (next != null) { - // merge leftover in front of the next bytes - bb = ByteBuffer.allocate(last.remaining() + next.length); - bb.put(last); - bb.put(next); - bb.flip(); - } - else { // next == null - bb = last; - } - } - else { // last == null - if (next != null) { - bb = ByteBuffer.wrap(next); - } - else { // next == null - return true; - } - } - - CharBuffer cb = CharBuffer.allocate((int) (bb.limit() * charsetDecoder.averageCharsPerByte())); - CoderResult cr = charsetDecoder.decode(bb, cb, endOfInput); - cb.flip(); - - if (cr.isError()) { - try { - cr.throwException(); - } - catch (CharacterCodingException e) { - o.onError(e); - return false; - } - } - - if (bb.remaining() > 0) { - leftOver = bb; - } - else { - leftOver = null; - } - - String string = cb.toString(); - if (!string.isEmpty()) - o.onNext(string); - - return true; - } - }; - } - }); - } - - /** - * Encodes a possibly infinite stream of strings into an Observable of byte arrays. - *

- * - * - * @param src - * @param charsetName - * @return the Observable with a stream of encoded byte arrays - */ - public static Observable encode(Observable src, String charsetName) { - return encode(src, Charset.forName(charsetName)); - } - - /** - * Encodes a possibly infinite stream of strings into an Observable of byte arrays. - *

- * - * - * @param src - * @param charset - * @return the Observable with a stream of encoded byte arrays - */ - public static Observable encode(Observable src, Charset charset) { - return encode(src, charset.newEncoder().onMalformedInput(CodingErrorAction.REPLACE).onUnmappableCharacter(CodingErrorAction.REPLACE)); - } - - /** - * Encodes a possibly infinite stream of strings into an Observable of byte arrays. - * This method allows for more control over how malformed and unmappable characters are handled. - *

- * - * - * @param src - * @param charsetEncoder - * @return the Observable with a stream of encoded byte arrays - */ - public static Observable encode(Observable src, final CharsetEncoder charsetEncoder) { - return src.map(new Func1() { - @Override - public byte[] call(String str) { - CharBuffer cb = CharBuffer.wrap(str); - ByteBuffer bb; - try { - bb = charsetEncoder.encode(cb); - } catch (CharacterCodingException e) { - throw new RuntimeException(e); - } - return Arrays.copyOfRange(bb.array(), bb.position(), bb.limit()); - } - }); - } - - /** - * Gather up all of the strings in to one string to be able to use it as one message. Don't use - * this on infinite streams. - *

- * - * - * @param src - * @return the Observable returing all strings concatenated as a single string - */ - public static Observable stringConcat(Observable src) { - return toString(src.reduce(new StringBuilder(), new Func2() { - @Override - public StringBuilder call(StringBuilder a, String b) { - return a.append(b); - } - })); - } - - /** - * Maps {@link Observable}<{@link Object}> to {@link Observable}<{@link String}> by using {@link String#valueOf(Object)} - * @param src - * @return An {@link Observable} of only {@link String}s. - */ - public static Observable toString(Observable src) { - return src.map(new Func1() { - @Override - public String call(Object obj) { - return String.valueOf(obj); - } - }); - } - - /** - * Rechunks the strings based on a regex pattern and works on infinite stream. - * - *

-     * split(["boo:an", "d:foo"], ":") --> ["boo", "and", "foo"]
-     * split(["boo:an", "d:foo"], "o") --> ["b", "", ":and:f", "", ""]
-     * 
- * - * See {@link Pattern} - *

- * - * - * @param src - * the source that should be use for the split - * @param regex - * a string that build regular expression modifier - * @return the Observable streaming the split values - */ - - public static Observable split(final Observable src, String regex) { - Pattern pattern = Pattern.compile(regex); - return StringObservable.split(src,pattern); - } - - /** - * Rechunks the strings based on a regex pattern and works on infinite stream. - * - *

-     * split(["boo:an", "d:foo"], ":") --> ["boo", "and", "foo"]
-     * split(["boo:an", "d:foo"], "o") --> ["b", "", ":and:f", "", ""]
-     * 
- * - * See {@link Pattern} - *

- * - * - * @param src - * the source that should be use for the split - * @param pattern - * pre compiled regular expression pattern for the split functionality - * @return the Observable streaming the split values - */ - public static Observable split(final Observable src, final Pattern pattern) { - - return src.lift(new Operator() { - @Override - public Subscriber call(final Subscriber o) { - return new Subscriber(o) { - private String leftOver = null; - - @Override - public void onCompleted() { - if (leftOver!=null) - output(leftOver); - if (!o.isUnsubscribed()) - o.onCompleted(); - } - - @Override - public void onError(Throwable e) { - if (leftOver!=null) - output(leftOver); - if (!o.isUnsubscribed()) - o.onError(e); - } - - @Override - public void onNext(String segment) { - String[] parts = pattern.split(segment, -1); - - if (leftOver != null) - parts[0] = leftOver + parts[0]; - for (int i = 0; i < parts.length - 1; i++) { - String part = parts[i]; - output(part); - } - leftOver = parts[parts.length - 1]; - } - - private int emptyPartCount = 0; - - /** - * when limit == 0 trailing empty parts are not emitted. - * - * @param part - */ - private void output(String part) { - if (part.isEmpty()) { - emptyPartCount++; - } - else { - for (; emptyPartCount > 0; emptyPartCount--) - if (!o.isUnsubscribed()) - o.onNext(""); - if (!o.isUnsubscribed()) - o.onNext(part); - } - } - }; - } - }); - } - - /** - * Concatenates the sequence of values by adding a separator - * between them and emitting the result once the source completes. - *

- * - *

- * The conversion from the value type to String is performed via - * {@link java.lang.String#valueOf(java.lang.Object)} calls. - *

- * For example: - * - *

-     * Observable<Object> source = Observable.from("a", 1, "c");
-     * Observable<String> result = join(source, ", ");
-     * 
- * - * will yield a single element equal to "a, 1, c". - * - * @param source - * the source sequence of CharSequence values - * @param separator - * the separator to a - * @return an Observable which emits a single String value having the concatenated - * values of the source observable with the separator between elements - */ - public static Observable join(final Observable source, final CharSequence separator) { - return source.lift(new Operator() { - @Override - public Subscriber call(final Subscriber child) { - final JoinParentSubscriber parent = new JoinParentSubscriber(child, separator); - child.add(parent); - child.setProducer(new Producer() { - @Override - public void request(long n) { - if (n > 0) { - parent.requestAll(); - } - }}); - return parent; - } - }); - } - - private static final class JoinParentSubscriber extends Subscriber { - - private final Subscriber child; - private final CharSequence separator; - private boolean mayAddSeparator; - private StringBuilder b = new StringBuilder(); - - JoinParentSubscriber(Subscriber child, CharSequence separator) { - this.child = child; - this.separator = separator; - } - - void requestAll() { - request(Long.MAX_VALUE); - } - - @Override - public void onStart() { - request(0); - } - - @Override - public void onCompleted() { - String str = b.toString(); - b = null; - if (!child.isUnsubscribed()) - child.onNext(str); - if (!child.isUnsubscribed()) - child.onCompleted(); - } - - @Override - public void onError(Throwable e) { - b = null; - if (!child.isUnsubscribed()) - child.onError(e); - } - - @Override - public void onNext(String t) { - if (mayAddSeparator) { - b.append(separator); - } - mayAddSeparator = true; - b.append(t); - } - - } - - /** - * Splits the {@link Observable} of Strings by lines and numbers them (zero based index) - *

- * - * - * @param source - * @return the Observable conaining the split lines of the source - */ - public static Observable byLine(Observable source) { - return split(source, System.getProperty("line.separator")); - } - - /** - * Converts a String into an Observable that emits the chars in the String. - *

- * - * - * @param str - * the source String - * @return an Observable that emits each char in the source String - * @see RxJava wiki: from - */ - public static Observable byCharacter(Observable source) { - return source.lift(new Operator() { - @Override - public Subscriber call(final Subscriber subscriber) { - return new Subscriber(subscriber) { - @Override - public void onCompleted() { - subscriber.onCompleted(); - } - - @Override - public void onError(Throwable e) { - subscriber.onError(e); - } - - @Override - public void onNext(String str) { - for (char c : str.toCharArray()) { - subscriber.onNext(Character.toString(c)); - } - } - }; - } - }); - } -} +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.observables; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CoderResult; +import java.nio.charset.CodingErrorAction; +import java.util.Arrays; +import java.util.concurrent.Callable; +import java.util.regex.Pattern; + +import rx.Observable; +import rx.Observable.Operator; +import rx.Producer; +import rx.Subscriber; +import rx.functions.Action1; +import rx.functions.Func0; +import rx.functions.Func1; +import rx.functions.Func2; +import rx.internal.operators.OnSubscribeInputStream; +import rx.internal.operators.OnSubscribeInputStreamToLines; +import rx.internal.operators.OnSubscribeReader; + +public class StringObservable { + /** + * Reads bytes from a source {@link InputStream} and outputs {@link Observable} of {@code byte[]}s. Supports backpressure. + *

+ * + * + * @param i + * Source {@link InputStream} + * @return the Observable containing read byte arrays from the input + */ + public static Observable from(final InputStream i) { + return from(i, 8 * 1024); + } + + /** + * Func0 that allows throwing an {@link IOException}s commonly thrown during IO operations. + * + * @see StringObservable#using(UnsafeFunc0, Func1) + * + * @param + */ + public static interface UnsafeFunc0 extends Callable { + @Override + public R call() throws Exception; + } + + /** + * Helps in creating an Observable that automatically calls {@link Closeable#close()} on completion, error or unsubscribe. + * + *

+	 * StringObservable.using(() -> new FileReader(file), (reader) -> StringObservable.from(reader))
+	 * 
+ * + * @param resourceFactory + * Generates a new {@link Closeable} resource for each new subscription to the returned Observable + * @param observableFactory + * Converts the {@link Closeable} resource into a {@link Observable} with {@link #from(InputStream)} or {@link #from(Reader)} + * @return + * An {@link Observable} that automatically closes the resource when done. + */ + public static Observable using(final UnsafeFunc0 resourceFactory, + final Func1> observableFactory) { + return Observable.using(new Func0() { + @Override + public S call() { + try { + return resourceFactory.call(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + }, observableFactory, new Action1() { + @Override + public void call(S resource) { + try { + resource.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }, true); + } + + /** + * Reads bytes from a source {@link InputStream} and outputs {@link Observable} of {@code byte[]}s. Supports backpressure. + *

+ * + * + * @param is + * Source {@link InputStream} + * @param size + * internal buffer size + * @return the Observable containing read byte arrays from the input + */ + public static Observable from(final InputStream is, final int size) { + return Observable.create(new OnSubscribeInputStream(is, size)); + } + + /** + * Reads lines from a source {@link InputStream} and outputs {@link Observable} of {@code String}s. Supports backpressure. + * + * @param is + * Source {@link InputStream} + * @param ch + * Charset to be used to read the source {@link InputStream} + * @return the Observable containing the split lines of the source + */ + public static Observable byLine(InputStream is, Charset ch) { + return Observable.create(new OnSubscribeInputStreamToLines(is, ch)); + } + + /** + * Reads characters from a source {@link Reader} and outputs {@link Observable} of {@link String}s. Supports backpressure. + *

+ * + * + * @param i + * Source {@link Reader} + * @return the Observable of Strings read from the source + */ + public static Observable from(final Reader i) { + return from(i, 8 * 1024); + } + + /** + * Reads characters from a source {@link Reader} and outputs {@link Observable} of {@link String}s. Supports backpressure. + *

+ * + * + * @param i + * Source {@link Reader} + * @param size + * internal buffer size + * @return the Observable of Strings read from the source + */ + public static Observable from(final Reader reader, final int size) { + return Observable.create(new OnSubscribeReader(reader, size)); + } + + /** + * Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams + * and where handles when a multibyte character spans two chunks. + *

+ * + * + * @param src + * @param charsetName + * @return the Observable returning a stream of decoded strings + */ + public static Observable decode(Observable src, String charsetName) { + return decode(src, Charset.forName(charsetName)); + } + + /** + * Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams + * and where handles when a multibyte character spans two chunks. + *

+ * + * + * @param src + * @param charset + * @return the Observable returning a stream of decoded strings + */ + public static Observable decode(Observable src, Charset charset) { + return decode(src, charset.newDecoder().onMalformedInput(CodingErrorAction.REPLACE).onUnmappableCharacter(CodingErrorAction.REPLACE)); + } + + /** + * Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams + * and handles when a multibyte character spans two chunks. + * This method allows for more control over how malformed and unmappable characters are handled. + *

+ * + * + * @param src + * @param charsetDecoder + * @return the Observable returning a stream of decoded strings + */ + public static Observable decode(final Observable src, final CharsetDecoder charsetDecoder) { + return src.lift(new Operator() { + @Override + public Subscriber call(final Subscriber o) { + return new Subscriber(o) { + private ByteBuffer leftOver = null; + + @Override + public void onCompleted() { + if (process(null, leftOver, true)) + o.onCompleted(); + } + + @Override + public void onError(Throwable e) { + if (process(null, leftOver, true)) + o.onError(e); + } + + @Override + public void onNext(byte[] bytes) { + process(bytes, leftOver, false); + } + + public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) { + if (o.isUnsubscribed()) + return false; + + ByteBuffer bb; + if (last != null) { + if (next != null) { + // merge leftover in front of the next bytes + bb = ByteBuffer.allocate(last.remaining() + next.length); + bb.put(last); + bb.put(next); + bb.flip(); + } + else { // next == null + bb = last; + } + } + else { // last == null + if (next != null) { + bb = ByteBuffer.wrap(next); + } + else { // next == null + return true; + } + } + + CharBuffer cb = CharBuffer.allocate((int) (bb.limit() * charsetDecoder.averageCharsPerByte())); + CoderResult cr = charsetDecoder.decode(bb, cb, endOfInput); + cb.flip(); + + if (cr.isError()) { + try { + cr.throwException(); + } + catch (CharacterCodingException e) { + o.onError(e); + return false; + } + } + + if (bb.remaining() > 0) { + leftOver = bb; + } + else { + leftOver = null; + } + + String string = cb.toString(); + if (!string.isEmpty()) + o.onNext(string); + + return true; + } + }; + } + }); + } + + /** + * Encodes a possibly infinite stream of strings into an Observable of byte arrays. + *

+ * + * + * @param src + * @param charsetName + * @return the Observable with a stream of encoded byte arrays + */ + public static Observable encode(Observable src, String charsetName) { + return encode(src, Charset.forName(charsetName)); + } + + /** + * Encodes a possibly infinite stream of strings into an Observable of byte arrays. + *

+ * + * + * @param src + * @param charset + * @return the Observable with a stream of encoded byte arrays + */ + public static Observable encode(Observable src, Charset charset) { + return encode(src, charset.newEncoder().onMalformedInput(CodingErrorAction.REPLACE).onUnmappableCharacter(CodingErrorAction.REPLACE)); + } + + /** + * Encodes a possibly infinite stream of strings into an Observable of byte arrays. + * This method allows for more control over how malformed and unmappable characters are handled. + *

+ * + * + * @param src + * @param charsetEncoder + * @return the Observable with a stream of encoded byte arrays + */ + public static Observable encode(Observable src, final CharsetEncoder charsetEncoder) { + return src.map(new Func1() { + @Override + public byte[] call(String str) { + CharBuffer cb = CharBuffer.wrap(str); + ByteBuffer bb; + try { + bb = charsetEncoder.encode(cb); + } catch (CharacterCodingException e) { + throw new RuntimeException(e); + } + return Arrays.copyOfRange(bb.array(), bb.position(), bb.limit()); + } + }); + } + + /** + * Gather up all of the strings in to one string to be able to use it as one message. Don't use + * this on infinite streams. + *

+ * + * + * @param src + * @return the Observable returing all strings concatenated as a single string + */ + public static Observable stringConcat(Observable src) { + return toString(src.reduce(new StringBuilder(), new Func2() { + @Override + public StringBuilder call(StringBuilder a, String b) { + return a.append(b); + } + })); + } + + /** + * Maps {@link Observable}<{@link Object}> to {@link Observable}<{@link String}> by using {@link String#valueOf(Object)} + * + * @param src + * @return An {@link Observable} of only {@link String}s. + */ + public static Observable toString(Observable src) { + return src.map(new Func1() { + @Override + public String call(Object obj) { + return String.valueOf(obj); + } + }); + } + + /** + * Rechunks the strings based on a regex pattern and works on infinite stream. + * + *

+	 * split(["boo:an", "d:foo"], ":") --> ["boo", "and", "foo"]
+	 * split(["boo:an", "d:foo"], "o") --> ["b", "", ":and:f", "", ""]
+	 * 
+ * + * See {@link Pattern} + *

+ * + * + * @param src + * the source that should be use for the split + * @param regex + * a string that build regular expression modifier + * @return the Observable streaming the split values + */ + + public static Observable split(final Observable src, String regex) { + Pattern pattern = Pattern.compile(regex); + return StringObservable.split(src, pattern); + } + + /** + * Rechunks the strings based on a regex pattern and works on infinite stream. + * + *

+	 * split(["boo:an", "d:foo"], ":") --> ["boo", "and", "foo"]
+	 * split(["boo:an", "d:foo"], "o") --> ["b", "", ":and:f", "", ""]
+	 * 
+ * + * See {@link Pattern} + *

+ * + * + * @param src + * the source that should be use for the split + * @param pattern + * pre compiled regular expression pattern for the split functionality + * @return the Observable streaming the split values + */ + public static Observable split(final Observable src, final Pattern pattern) { + + return src.lift(new Operator() { + @Override + public Subscriber call(final Subscriber o) { + return new Subscriber(o) { + private String leftOver = null; + + @Override + public void onCompleted() { + if (leftOver != null) + output(leftOver); + if (!o.isUnsubscribed()) + o.onCompleted(); + } + + @Override + public void onError(Throwable e) { + if (leftOver != null) + output(leftOver); + if (!o.isUnsubscribed()) + o.onError(e); + } + + @Override + public void onNext(String segment) { + String[] parts = pattern.split(segment, -1); + + if (leftOver != null) + parts[0] = leftOver + parts[0]; + for (int i = 0; i < parts.length - 1; i++) { + String part = parts[i]; + output(part); + } + leftOver = parts[parts.length - 1]; + } + + private int emptyPartCount = 0; + + /** + * when limit == 0 trailing empty parts are not emitted. + * + * @param part + */ + private void output(String part) { + if (part.isEmpty()) { + emptyPartCount++; + } + else { + for (; emptyPartCount > 0; emptyPartCount--) + if (!o.isUnsubscribed()) + o.onNext(""); + if (!o.isUnsubscribed()) + o.onNext(part); + } + } + }; + } + }); + } + + /** + * Concatenates the sequence of values by adding a separator + * between them and emitting the result once the source completes. + *

+ * + *

+ * The conversion from the value type to String is performed via {@link java.lang.String#valueOf(java.lang.Object)} calls. + *

+ * For example: + * + *

+	 * Observable<Object>	source	= Observable.from("a", 1, "c");
+	 * 															Observable<String>	result	= join(source, ", ");
+	 * 
+ * + * will yield a single element equal to "a, 1, c". + * + * @param source + * the source sequence of CharSequence values + * @param separator + * the separator to a + * @return an Observable which emits a single String value having the concatenated + * values of the source observable with the separator between elements + */ + public static Observable join(final Observable source, final CharSequence separator) { + return source.lift(new Operator() { + @Override + public Subscriber call(final Subscriber child) { + final JoinParentSubscriber parent = new JoinParentSubscriber(child, separator); + child.add(parent); + child.setProducer(new Producer() { + @Override + public void request(long n) { + if (n > 0) { + parent.requestAll(); + } + } + }); + return parent; + } + }); + } + + private static final class JoinParentSubscriber extends Subscriber { + + private final Subscriber child; + private final CharSequence separator; + private boolean mayAddSeparator; + private StringBuilder b = new StringBuilder(); + + JoinParentSubscriber(Subscriber child, CharSequence separator) { + this.child = child; + this.separator = separator; + } + + void requestAll() { + request(Long.MAX_VALUE); + } + + @Override + public void onStart() { + request(0); + } + + @Override + public void onCompleted() { + String str = b.toString(); + b = null; + if (!child.isUnsubscribed()) + child.onNext(str); + if (!child.isUnsubscribed()) + child.onCompleted(); + } + + @Override + public void onError(Throwable e) { + b = null; + if (!child.isUnsubscribed()) + child.onError(e); + } + + @Override + public void onNext(String t) { + if (mayAddSeparator) { + b.append(separator); + } + mayAddSeparator = true; + b.append(t); + } + + } + + /** + * Splits the {@link Observable} of Strings by lines and numbers them (zero based index) + *

+ * + * + * @param source + * @return the Observable conaining the split lines of the source + */ + public static Observable byLine(Observable source) { + return split(source, System.getProperty("line.separator")); + } + + /** + * Converts a String into an Observable that emits the chars in the String. + *

+ * + * + * @param str + * the source String + * @return an Observable that emits each char in the source String + * @see RxJava wiki: from + */ + public static Observable byCharacter(Observable source) { + return source.lift(new Operator() { + @Override + public Subscriber call(final Subscriber subscriber) { + return new Subscriber(subscriber) { + @Override + public void onCompleted() { + subscriber.onCompleted(); + } + + @Override + public void onError(Throwable e) { + subscriber.onError(e); + } + + @Override + public void onNext(String str) { + for (char c : str.toCharArray()) { + subscriber.onNext(Character.toString(c)); + } + } + }; + } + }); + } +} From a6813fb605552a9e01a026c464951211313a808a Mon Sep 17 00:00:00 2001 From: Crystark Date: Tue, 15 Sep 2015 11:50:26 +0200 Subject: [PATCH 2/2] Fixed code style --- .../java/rx/observables/StringObservable.java | 1227 +++++++++-------- 1 file changed, 614 insertions(+), 613 deletions(-) diff --git a/src/main/java/rx/observables/StringObservable.java b/src/main/java/rx/observables/StringObservable.java index 2927066..d9aac80 100644 --- a/src/main/java/rx/observables/StringObservable.java +++ b/src/main/java/rx/observables/StringObservable.java @@ -1,613 +1,614 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.observables; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.io.Reader; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.CharsetDecoder; -import java.nio.charset.CharsetEncoder; -import java.nio.charset.CoderResult; -import java.nio.charset.CodingErrorAction; -import java.util.Arrays; -import java.util.concurrent.Callable; -import java.util.regex.Pattern; - -import rx.Observable; -import rx.Observable.Operator; -import rx.Producer; -import rx.Subscriber; -import rx.functions.Action1; -import rx.functions.Func0; -import rx.functions.Func1; -import rx.functions.Func2; -import rx.internal.operators.OnSubscribeInputStream; -import rx.internal.operators.OnSubscribeInputStreamToLines; -import rx.internal.operators.OnSubscribeReader; - -public class StringObservable { - /** - * Reads bytes from a source {@link InputStream} and outputs {@link Observable} of {@code byte[]}s. Supports backpressure. - *

- * - * - * @param i - * Source {@link InputStream} - * @return the Observable containing read byte arrays from the input - */ - public static Observable from(final InputStream i) { - return from(i, 8 * 1024); - } - - /** - * Func0 that allows throwing an {@link IOException}s commonly thrown during IO operations. - * - * @see StringObservable#using(UnsafeFunc0, Func1) - * - * @param - */ - public static interface UnsafeFunc0 extends Callable { - @Override - public R call() throws Exception; - } - - /** - * Helps in creating an Observable that automatically calls {@link Closeable#close()} on completion, error or unsubscribe. - * - *

-	 * StringObservable.using(() -> new FileReader(file), (reader) -> StringObservable.from(reader))
-	 * 
- * - * @param resourceFactory - * Generates a new {@link Closeable} resource for each new subscription to the returned Observable - * @param observableFactory - * Converts the {@link Closeable} resource into a {@link Observable} with {@link #from(InputStream)} or {@link #from(Reader)} - * @return - * An {@link Observable} that automatically closes the resource when done. - */ - public static Observable using(final UnsafeFunc0 resourceFactory, - final Func1> observableFactory) { - return Observable.using(new Func0() { - @Override - public S call() { - try { - return resourceFactory.call(); - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - }, observableFactory, new Action1() { - @Override - public void call(S resource) { - try { - resource.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - }, true); - } - - /** - * Reads bytes from a source {@link InputStream} and outputs {@link Observable} of {@code byte[]}s. Supports backpressure. - *

- * - * - * @param is - * Source {@link InputStream} - * @param size - * internal buffer size - * @return the Observable containing read byte arrays from the input - */ - public static Observable from(final InputStream is, final int size) { - return Observable.create(new OnSubscribeInputStream(is, size)); - } - - /** - * Reads lines from a source {@link InputStream} and outputs {@link Observable} of {@code String}s. Supports backpressure. - * - * @param is - * Source {@link InputStream} - * @param ch - * Charset to be used to read the source {@link InputStream} - * @return the Observable containing the split lines of the source - */ - public static Observable byLine(InputStream is, Charset ch) { - return Observable.create(new OnSubscribeInputStreamToLines(is, ch)); - } - - /** - * Reads characters from a source {@link Reader} and outputs {@link Observable} of {@link String}s. Supports backpressure. - *

- * - * - * @param i - * Source {@link Reader} - * @return the Observable of Strings read from the source - */ - public static Observable from(final Reader i) { - return from(i, 8 * 1024); - } - - /** - * Reads characters from a source {@link Reader} and outputs {@link Observable} of {@link String}s. Supports backpressure. - *

- * - * - * @param i - * Source {@link Reader} - * @param size - * internal buffer size - * @return the Observable of Strings read from the source - */ - public static Observable from(final Reader reader, final int size) { - return Observable.create(new OnSubscribeReader(reader, size)); - } - - /** - * Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams - * and where handles when a multibyte character spans two chunks. - *

- * - * - * @param src - * @param charsetName - * @return the Observable returning a stream of decoded strings - */ - public static Observable decode(Observable src, String charsetName) { - return decode(src, Charset.forName(charsetName)); - } - - /** - * Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams - * and where handles when a multibyte character spans two chunks. - *

- * - * - * @param src - * @param charset - * @return the Observable returning a stream of decoded strings - */ - public static Observable decode(Observable src, Charset charset) { - return decode(src, charset.newDecoder().onMalformedInput(CodingErrorAction.REPLACE).onUnmappableCharacter(CodingErrorAction.REPLACE)); - } - - /** - * Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams - * and handles when a multibyte character spans two chunks. - * This method allows for more control over how malformed and unmappable characters are handled. - *

- * - * - * @param src - * @param charsetDecoder - * @return the Observable returning a stream of decoded strings - */ - public static Observable decode(final Observable src, final CharsetDecoder charsetDecoder) { - return src.lift(new Operator() { - @Override - public Subscriber call(final Subscriber o) { - return new Subscriber(o) { - private ByteBuffer leftOver = null; - - @Override - public void onCompleted() { - if (process(null, leftOver, true)) - o.onCompleted(); - } - - @Override - public void onError(Throwable e) { - if (process(null, leftOver, true)) - o.onError(e); - } - - @Override - public void onNext(byte[] bytes) { - process(bytes, leftOver, false); - } - - public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) { - if (o.isUnsubscribed()) - return false; - - ByteBuffer bb; - if (last != null) { - if (next != null) { - // merge leftover in front of the next bytes - bb = ByteBuffer.allocate(last.remaining() + next.length); - bb.put(last); - bb.put(next); - bb.flip(); - } - else { // next == null - bb = last; - } - } - else { // last == null - if (next != null) { - bb = ByteBuffer.wrap(next); - } - else { // next == null - return true; - } - } - - CharBuffer cb = CharBuffer.allocate((int) (bb.limit() * charsetDecoder.averageCharsPerByte())); - CoderResult cr = charsetDecoder.decode(bb, cb, endOfInput); - cb.flip(); - - if (cr.isError()) { - try { - cr.throwException(); - } - catch (CharacterCodingException e) { - o.onError(e); - return false; - } - } - - if (bb.remaining() > 0) { - leftOver = bb; - } - else { - leftOver = null; - } - - String string = cb.toString(); - if (!string.isEmpty()) - o.onNext(string); - - return true; - } - }; - } - }); - } - - /** - * Encodes a possibly infinite stream of strings into an Observable of byte arrays. - *

- * - * - * @param src - * @param charsetName - * @return the Observable with a stream of encoded byte arrays - */ - public static Observable encode(Observable src, String charsetName) { - return encode(src, Charset.forName(charsetName)); - } - - /** - * Encodes a possibly infinite stream of strings into an Observable of byte arrays. - *

- * - * - * @param src - * @param charset - * @return the Observable with a stream of encoded byte arrays - */ - public static Observable encode(Observable src, Charset charset) { - return encode(src, charset.newEncoder().onMalformedInput(CodingErrorAction.REPLACE).onUnmappableCharacter(CodingErrorAction.REPLACE)); - } - - /** - * Encodes a possibly infinite stream of strings into an Observable of byte arrays. - * This method allows for more control over how malformed and unmappable characters are handled. - *

- * - * - * @param src - * @param charsetEncoder - * @return the Observable with a stream of encoded byte arrays - */ - public static Observable encode(Observable src, final CharsetEncoder charsetEncoder) { - return src.map(new Func1() { - @Override - public byte[] call(String str) { - CharBuffer cb = CharBuffer.wrap(str); - ByteBuffer bb; - try { - bb = charsetEncoder.encode(cb); - } catch (CharacterCodingException e) { - throw new RuntimeException(e); - } - return Arrays.copyOfRange(bb.array(), bb.position(), bb.limit()); - } - }); - } - - /** - * Gather up all of the strings in to one string to be able to use it as one message. Don't use - * this on infinite streams. - *

- * - * - * @param src - * @return the Observable returing all strings concatenated as a single string - */ - public static Observable stringConcat(Observable src) { - return toString(src.reduce(new StringBuilder(), new Func2() { - @Override - public StringBuilder call(StringBuilder a, String b) { - return a.append(b); - } - })); - } - - /** - * Maps {@link Observable}<{@link Object}> to {@link Observable}<{@link String}> by using {@link String#valueOf(Object)} - * - * @param src - * @return An {@link Observable} of only {@link String}s. - */ - public static Observable toString(Observable src) { - return src.map(new Func1() { - @Override - public String call(Object obj) { - return String.valueOf(obj); - } - }); - } - - /** - * Rechunks the strings based on a regex pattern and works on infinite stream. - * - *

-	 * split(["boo:an", "d:foo"], ":") --> ["boo", "and", "foo"]
-	 * split(["boo:an", "d:foo"], "o") --> ["b", "", ":and:f", "", ""]
-	 * 
- * - * See {@link Pattern} - *

- * - * - * @param src - * the source that should be use for the split - * @param regex - * a string that build regular expression modifier - * @return the Observable streaming the split values - */ - - public static Observable split(final Observable src, String regex) { - Pattern pattern = Pattern.compile(regex); - return StringObservable.split(src, pattern); - } - - /** - * Rechunks the strings based on a regex pattern and works on infinite stream. - * - *

-	 * split(["boo:an", "d:foo"], ":") --> ["boo", "and", "foo"]
-	 * split(["boo:an", "d:foo"], "o") --> ["b", "", ":and:f", "", ""]
-	 * 
- * - * See {@link Pattern} - *

- * - * - * @param src - * the source that should be use for the split - * @param pattern - * pre compiled regular expression pattern for the split functionality - * @return the Observable streaming the split values - */ - public static Observable split(final Observable src, final Pattern pattern) { - - return src.lift(new Operator() { - @Override - public Subscriber call(final Subscriber o) { - return new Subscriber(o) { - private String leftOver = null; - - @Override - public void onCompleted() { - if (leftOver != null) - output(leftOver); - if (!o.isUnsubscribed()) - o.onCompleted(); - } - - @Override - public void onError(Throwable e) { - if (leftOver != null) - output(leftOver); - if (!o.isUnsubscribed()) - o.onError(e); - } - - @Override - public void onNext(String segment) { - String[] parts = pattern.split(segment, -1); - - if (leftOver != null) - parts[0] = leftOver + parts[0]; - for (int i = 0; i < parts.length - 1; i++) { - String part = parts[i]; - output(part); - } - leftOver = parts[parts.length - 1]; - } - - private int emptyPartCount = 0; - - /** - * when limit == 0 trailing empty parts are not emitted. - * - * @param part - */ - private void output(String part) { - if (part.isEmpty()) { - emptyPartCount++; - } - else { - for (; emptyPartCount > 0; emptyPartCount--) - if (!o.isUnsubscribed()) - o.onNext(""); - if (!o.isUnsubscribed()) - o.onNext(part); - } - } - }; - } - }); - } - - /** - * Concatenates the sequence of values by adding a separator - * between them and emitting the result once the source completes. - *

- * - *

- * The conversion from the value type to String is performed via {@link java.lang.String#valueOf(java.lang.Object)} calls. - *

- * For example: - * - *

-	 * Observable<Object>	source	= Observable.from("a", 1, "c");
-	 * 															Observable<String>	result	= join(source, ", ");
-	 * 
- * - * will yield a single element equal to "a, 1, c". - * - * @param source - * the source sequence of CharSequence values - * @param separator - * the separator to a - * @return an Observable which emits a single String value having the concatenated - * values of the source observable with the separator between elements - */ - public static Observable join(final Observable source, final CharSequence separator) { - return source.lift(new Operator() { - @Override - public Subscriber call(final Subscriber child) { - final JoinParentSubscriber parent = new JoinParentSubscriber(child, separator); - child.add(parent); - child.setProducer(new Producer() { - @Override - public void request(long n) { - if (n > 0) { - parent.requestAll(); - } - } - }); - return parent; - } - }); - } - - private static final class JoinParentSubscriber extends Subscriber { - - private final Subscriber child; - private final CharSequence separator; - private boolean mayAddSeparator; - private StringBuilder b = new StringBuilder(); - - JoinParentSubscriber(Subscriber child, CharSequence separator) { - this.child = child; - this.separator = separator; - } - - void requestAll() { - request(Long.MAX_VALUE); - } - - @Override - public void onStart() { - request(0); - } - - @Override - public void onCompleted() { - String str = b.toString(); - b = null; - if (!child.isUnsubscribed()) - child.onNext(str); - if (!child.isUnsubscribed()) - child.onCompleted(); - } - - @Override - public void onError(Throwable e) { - b = null; - if (!child.isUnsubscribed()) - child.onError(e); - } - - @Override - public void onNext(String t) { - if (mayAddSeparator) { - b.append(separator); - } - mayAddSeparator = true; - b.append(t); - } - - } - - /** - * Splits the {@link Observable} of Strings by lines and numbers them (zero based index) - *

- * - * - * @param source - * @return the Observable conaining the split lines of the source - */ - public static Observable byLine(Observable source) { - return split(source, System.getProperty("line.separator")); - } - - /** - * Converts a String into an Observable that emits the chars in the String. - *

- * - * - * @param str - * the source String - * @return an Observable that emits each char in the source String - * @see RxJava wiki: from - */ - public static Observable byCharacter(Observable source) { - return source.lift(new Operator() { - @Override - public Subscriber call(final Subscriber subscriber) { - return new Subscriber(subscriber) { - @Override - public void onCompleted() { - subscriber.onCompleted(); - } - - @Override - public void onError(Throwable e) { - subscriber.onError(e); - } - - @Override - public void onNext(String str) { - for (char c : str.toCharArray()) { - subscriber.onNext(Character.toString(c)); - } - } - }; - } - }); - } -} +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.observables; + +import rx.Observable; +import rx.Observable.Operator; +import rx.Producer; +import rx.Subscriber; +import rx.functions.Action1; +import rx.functions.Func0; +import rx.functions.Func1; +import rx.functions.Func2; +import rx.internal.operators.OnSubscribeInputStream; +import rx.internal.operators.OnSubscribeInputStreamToLines; +import rx.internal.operators.OnSubscribeReader; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CoderResult; +import java.nio.charset.CodingErrorAction; +import java.util.Arrays; +import java.util.concurrent.Callable; +import java.util.regex.Pattern; + +public class StringObservable { + /** + * Reads bytes from a source {@link InputStream} and outputs {@link Observable} of + * {@code byte[]}s. Supports backpressure. + *

+ * + * + * @param i + * Source {@link InputStream} + * @return the Observable containing read byte arrays from the input + */ + public static Observable from(final InputStream i) { + return from(i, 8 * 1024); + } + + /** + * Func0 that allows throwing an {@link IOException}s commonly thrown during IO operations. + * @see StringObservable#using(UnsafeFunc0, Func1) + * + * @param + */ + public static interface UnsafeFunc0 extends Callable { + public R call() throws Exception; + } + + /** + * Helps in creating an Observable that automatically calls {@link Closeable#close()} on completion, error or unsubscribe. + * + *

+     * StringObservable.using(() -> new FileReader(file), (reader) -> StringObservable.from(reader))
+     * 
+ * + * @param resourceFactory + * Generates a new {@link Closeable} resource for each new subscription to the returned Observable + * @param observableFactory + * Converts the {@link Closeable} resource into a {@link Observable} with {@link #from(InputStream)} or {@link #from(Reader)} + * @return + * An {@link Observable} that automatically closes the resource when done. + */ + public static Observable using(final UnsafeFunc0 resourceFactory, + final Func1> observableFactory) { + return Observable.using(new Func0() { + @Override + public S call() { + try { + return resourceFactory.call(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + }, observableFactory, new Action1() { + @Override + public void call(S resource) { + try { + resource.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }, true); + } + + /** + * Reads bytes from a source {@link InputStream} and outputs {@link Observable} of + * {@code byte[]}s. Supports backpressure. + *

+ * + * + * @param is + * Source {@link InputStream} + * @param size + * internal buffer size + * @return the Observable containing read byte arrays from the input + */ + public static Observable from(final InputStream is, final int size) { + return Observable.create(new OnSubscribeInputStream(is, size)); + } + + /** + * Reads characters from a source {@link Reader} and outputs {@link Observable} of + * {@link String}s. Supports backpressure. + *

+ * + * + * @param i + * Source {@link Reader} + * @return the Observable of Strings read from the source + */ + public static Observable from(final Reader i) { + return from(i, 8 * 1024); + } + + /** + * Reads characters from a source {@link Reader} and outputs {@link Observable} of + * {@link String}s. Supports backpressure. + *

+ * + * + * @param i + * Source {@link Reader} + * @param size + * internal buffer size + * @return the Observable of Strings read from the source + */ + public static Observable from(final Reader reader, final int size) { + return Observable.create(new OnSubscribeReader(reader, size)); + } + + /** + * Reads lines from a source {@link InputStream} and outputs {@link Observable} of {@code String}s. Supports backpressure. + * + * @param is + * Source {@link InputStream} + * @param ch + * Charset to be used to read the source {@link InputStream} + * @return the Observable containing the split lines of the source + */ + public static Observable byLine(InputStream is, Charset ch) { + return Observable.create(new OnSubscribeInputStreamToLines(is, ch)); + } + + /** + * Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams + * and where handles when a multibyte character spans two chunks. + *

+ * + * + * @param src + * @param charsetName + * @return the Observable returning a stream of decoded strings + */ + public static Observable decode(Observable src, String charsetName) { + return decode(src, Charset.forName(charsetName)); + } + + /** + * Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams + * and where handles when a multibyte character spans two chunks. + *

+ * + * + * @param src + * @param charset + * @return the Observable returning a stream of decoded strings + */ + public static Observable decode(Observable src, Charset charset) { + return decode(src, charset.newDecoder().onMalformedInput(CodingErrorAction.REPLACE).onUnmappableCharacter(CodingErrorAction.REPLACE)); + } + + /** + * Decodes a stream of multibyte chunks into a stream of strings that works on infinite streams + * and handles when a multibyte character spans two chunks. + * This method allows for more control over how malformed and unmappable characters are handled. + *

+ * + * + * @param src + * @param charsetDecoder + * @return the Observable returning a stream of decoded strings + */ + public static Observable decode(final Observable src, final CharsetDecoder charsetDecoder) { + return src.lift(new Operator() { + @Override + public Subscriber call(final Subscriber o) { + return new Subscriber(o) { + private ByteBuffer leftOver = null; + + @Override + public void onCompleted() { + if (process(null, leftOver, true)) + o.onCompleted(); + } + + @Override + public void onError(Throwable e) { + if (process(null, leftOver, true)) + o.onError(e); + } + + @Override + public void onNext(byte[] bytes) { + process(bytes, leftOver, false); + } + + public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) { + if (o.isUnsubscribed()) + return false; + + ByteBuffer bb; + if (last != null) { + if (next != null) { + // merge leftover in front of the next bytes + bb = ByteBuffer.allocate(last.remaining() + next.length); + bb.put(last); + bb.put(next); + bb.flip(); + } + else { // next == null + bb = last; + } + } + else { // last == null + if (next != null) { + bb = ByteBuffer.wrap(next); + } + else { // next == null + return true; + } + } + + CharBuffer cb = CharBuffer.allocate((int) (bb.limit() * charsetDecoder.averageCharsPerByte())); + CoderResult cr = charsetDecoder.decode(bb, cb, endOfInput); + cb.flip(); + + if (cr.isError()) { + try { + cr.throwException(); + } + catch (CharacterCodingException e) { + o.onError(e); + return false; + } + } + + if (bb.remaining() > 0) { + leftOver = bb; + } + else { + leftOver = null; + } + + String string = cb.toString(); + if (!string.isEmpty()) + o.onNext(string); + + return true; + } + }; + } + }); + } + + /** + * Encodes a possibly infinite stream of strings into an Observable of byte arrays. + *

+ * + * + * @param src + * @param charsetName + * @return the Observable with a stream of encoded byte arrays + */ + public static Observable encode(Observable src, String charsetName) { + return encode(src, Charset.forName(charsetName)); + } + + /** + * Encodes a possibly infinite stream of strings into an Observable of byte arrays. + *

+ * + * + * @param src + * @param charset + * @return the Observable with a stream of encoded byte arrays + */ + public static Observable encode(Observable src, Charset charset) { + return encode(src, charset.newEncoder().onMalformedInput(CodingErrorAction.REPLACE).onUnmappableCharacter(CodingErrorAction.REPLACE)); + } + + /** + * Encodes a possibly infinite stream of strings into an Observable of byte arrays. + * This method allows for more control over how malformed and unmappable characters are handled. + *

+ * + * + * @param src + * @param charsetEncoder + * @return the Observable with a stream of encoded byte arrays + */ + public static Observable encode(Observable src, final CharsetEncoder charsetEncoder) { + return src.map(new Func1() { + @Override + public byte[] call(String str) { + CharBuffer cb = CharBuffer.wrap(str); + ByteBuffer bb; + try { + bb = charsetEncoder.encode(cb); + } catch (CharacterCodingException e) { + throw new RuntimeException(e); + } + return Arrays.copyOfRange(bb.array(), bb.position(), bb.limit()); + } + }); + } + + /** + * Gather up all of the strings in to one string to be able to use it as one message. Don't use + * this on infinite streams. + *

+ * + * + * @param src + * @return the Observable returing all strings concatenated as a single string + */ + public static Observable stringConcat(Observable src) { + return toString(src.reduce(new StringBuilder(), new Func2() { + @Override + public StringBuilder call(StringBuilder a, String b) { + return a.append(b); + } + })); + } + + /** + * Maps {@link Observable}<{@link Object}> to {@link Observable}<{@link String}> by using {@link String#valueOf(Object)} + * @param src + * @return An {@link Observable} of only {@link String}s. + */ + public static Observable toString(Observable src) { + return src.map(new Func1() { + @Override + public String call(Object obj) { + return String.valueOf(obj); + } + }); + } + + /** + * Rechunks the strings based on a regex pattern and works on infinite stream. + * + *

+     * split(["boo:an", "d:foo"], ":") --> ["boo", "and", "foo"]
+     * split(["boo:an", "d:foo"], "o") --> ["b", "", ":and:f", "", ""]
+     * 
+ * + * See {@link Pattern} + *

+ * + * + * @param src + * the source that should be use for the split + * @param regex + * a string that build regular expression modifier + * @return the Observable streaming the split values + */ + + public static Observable split(final Observable src, String regex) { + Pattern pattern = Pattern.compile(regex); + return StringObservable.split(src,pattern); + } + + /** + * Rechunks the strings based on a regex pattern and works on infinite stream. + * + *

+     * split(["boo:an", "d:foo"], ":") --> ["boo", "and", "foo"]
+     * split(["boo:an", "d:foo"], "o") --> ["b", "", ":and:f", "", ""]
+     * 
+ * + * See {@link Pattern} + *

+ * + * + * @param src + * the source that should be use for the split + * @param pattern + * pre compiled regular expression pattern for the split functionality + * @return the Observable streaming the split values + */ + public static Observable split(final Observable src, final Pattern pattern) { + + return src.lift(new Operator() { + @Override + public Subscriber call(final Subscriber o) { + return new Subscriber(o) { + private String leftOver = null; + + @Override + public void onCompleted() { + if (leftOver!=null) + output(leftOver); + if (!o.isUnsubscribed()) + o.onCompleted(); + } + + @Override + public void onError(Throwable e) { + if (leftOver!=null) + output(leftOver); + if (!o.isUnsubscribed()) + o.onError(e); + } + + @Override + public void onNext(String segment) { + String[] parts = pattern.split(segment, -1); + + if (leftOver != null) + parts[0] = leftOver + parts[0]; + for (int i = 0; i < parts.length - 1; i++) { + String part = parts[i]; + output(part); + } + leftOver = parts[parts.length - 1]; + } + + private int emptyPartCount = 0; + + /** + * when limit == 0 trailing empty parts are not emitted. + * + * @param part + */ + private void output(String part) { + if (part.isEmpty()) { + emptyPartCount++; + } + else { + for (; emptyPartCount > 0; emptyPartCount--) + if (!o.isUnsubscribed()) + o.onNext(""); + if (!o.isUnsubscribed()) + o.onNext(part); + } + } + }; + } + }); + } + + /** + * Concatenates the sequence of values by adding a separator + * between them and emitting the result once the source completes. + *

+ * + *

+ * The conversion from the value type to String is performed via + * {@link java.lang.String#valueOf(java.lang.Object)} calls. + *

+ * For example: + * + *

+     * Observable<Object> source = Observable.from("a", 1, "c");
+     * Observable<String> result = join(source, ", ");
+     * 
+ * + * will yield a single element equal to "a, 1, c". + * + * @param source + * the source sequence of CharSequence values + * @param separator + * the separator to a + * @return an Observable which emits a single String value having the concatenated + * values of the source observable with the separator between elements + */ + public static Observable join(final Observable source, final CharSequence separator) { + return source.lift(new Operator() { + @Override + public Subscriber call(final Subscriber child) { + final JoinParentSubscriber parent = new JoinParentSubscriber(child, separator); + child.add(parent); + child.setProducer(new Producer() { + @Override + public void request(long n) { + if (n > 0) { + parent.requestAll(); + } + }}); + return parent; + } + }); + } + + private static final class JoinParentSubscriber extends Subscriber { + + private final Subscriber child; + private final CharSequence separator; + private boolean mayAddSeparator; + private StringBuilder b = new StringBuilder(); + + JoinParentSubscriber(Subscriber child, CharSequence separator) { + this.child = child; + this.separator = separator; + } + + void requestAll() { + request(Long.MAX_VALUE); + } + + @Override + public void onStart() { + request(0); + } + + @Override + public void onCompleted() { + String str = b.toString(); + b = null; + if (!child.isUnsubscribed()) + child.onNext(str); + if (!child.isUnsubscribed()) + child.onCompleted(); + } + + @Override + public void onError(Throwable e) { + b = null; + if (!child.isUnsubscribed()) + child.onError(e); + } + + @Override + public void onNext(String t) { + if (mayAddSeparator) { + b.append(separator); + } + mayAddSeparator = true; + b.append(t); + } + + } + + /** + * Splits the {@link Observable} of Strings by lines and numbers them (zero based index) + *

+ * + * + * @param source + * @return the Observable conaining the split lines of the source + */ + public static Observable byLine(Observable source) { + return split(source, System.getProperty("line.separator")); + } + + /** + * Converts a String into an Observable that emits the chars in the String. + *

+ * + * + * @param str + * the source String + * @return an Observable that emits each char in the source String + * @see RxJava wiki: from + */ + public static Observable byCharacter(Observable source) { + return source.lift(new Operator() { + @Override + public Subscriber call(final Subscriber subscriber) { + return new Subscriber(subscriber) { + @Override + public void onCompleted() { + subscriber.onCompleted(); + } + + @Override + public void onError(Throwable e) { + subscriber.onError(e); + } + + @Override + public void onNext(String str) { + for (char c : str.toCharArray()) { + subscriber.onNext(Character.toString(c)); + } + } + }; + } + }); + } +}