diff --git a/README.md b/README.md
index 8d6bb47..916e80b 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,6 @@
## learn-java
example of learn java
+algorithm has moved to [learn-algorithm](https://github.com/wcong/learn-algorithm)
+
diff --git a/pom.xml b/pom.xml
index b51b888..5f4edf1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
junit
junit
- 4.12
+ 4.13.1
test
@@ -206,6 +206,11 @@
jackson-annotations
2.3.3
+
+ io.reactivex.rxjava2
+ rxjava
+ 2.1.6
+
org.freemarker
freemarker
diff --git a/src/main/java/org/wcong/test/algorithm/dp/LongestCommonSubsequence.java b/src/main/java/org/wcong/test/algorithm/dp/LongestCommonSubSequence.java
similarity index 100%
rename from src/main/java/org/wcong/test/algorithm/dp/LongestCommonSubsequence.java
rename to src/main/java/org/wcong/test/algorithm/dp/LongestCommonSubSequence.java
diff --git a/src/main/java/org/wcong/test/rxjava/MyPublisher.java b/src/main/java/org/wcong/test/rxjava/MyPublisher.java
new file mode 100644
index 0000000..307c6cd
--- /dev/null
+++ b/src/main/java/org/wcong/test/rxjava/MyPublisher.java
@@ -0,0 +1,120 @@
+package org.wcong.test.rxjava;
+
+import io.reactivex.Flowable;
+import io.reactivex.FlowableSubscriber;
+import io.reactivex.Scheduler;
+import io.reactivex.functions.Consumer;
+import io.reactivex.schedulers.Schedulers;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MyPublisher {
+
+ private static CountDownLatch countDownLatch = new CountDownLatch(4);
+
+ public static void main(String[] args) throws InterruptedException {
+ new ParallelFlowableRange(1, 100, Schedulers.computation()).subscribe(new Consumer() {
+ @Override
+ public void accept(Integer integer) throws Exception {
+ System.out.println(Thread.currentThread().getName() + ":" + integer);
+ }
+ });
+ countDownLatch.await();
+ }
+
+
+ public static class ParallelFlowableRange extends Flowable {
+
+ private Scheduler scheduler;
+
+ private int start;
+ private int end;
+
+ public ParallelFlowableRange(int start, int end, Scheduler scheduler) {
+ this.scheduler = scheduler;
+ this.start = start;
+ this.end = end;
+ }
+
+ @Override
+ protected void subscribeActual(Subscriber super Integer> subscriber) {
+ AtomicInteger atomicInteger = new AtomicInteger(start);
+ for (int i = 0; i < 4; i++) {
+ MySubscriber mySubscriber = new MySubscriber(subscriber, Integer.MAX_VALUE, scheduler.createWorker(), atomicInteger, end);
+ mySubscriber.onSubscribe(mySubscriber);
+ }
+ }
+ }
+
+ public static class MySubscriber extends AtomicInteger implements FlowableSubscriber, Subscription, Runnable {
+
+ private int prefetch;
+ private Scheduler.Worker worker;
+ private Subscriber super Integer> actual;
+ private boolean cancel;
+ private AtomicInteger atomicInteger;
+ private int end;
+
+ public MySubscriber(Subscriber super Integer> actual, int prefetch, Scheduler.Worker worker, AtomicInteger atomicInteger, int end) {
+ this.actual = actual;
+ this.prefetch = prefetch;
+ this.worker = worker;
+ this.atomicInteger = atomicInteger;
+ this.end = end;
+ }
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ subscription.request(prefetch);
+ }
+
+ @Override
+ public void onNext(Integer integer) {
+ actual.onNext(integer);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ actual.onError(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ countDownLatch.countDown();
+ actual.onComplete();
+ }
+
+ @Override
+ public void request(long l) {
+ worker.schedule(this);
+ }
+
+ @Override
+ public void cancel() {
+ cancel = true;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ if (cancel) {
+ break;
+ }
+ int num = atomicInteger.getAndIncrement();
+ if (num < end) {
+ try {
+ onNext(num);
+ } catch (Exception e) {
+ onError(e);
+ }
+ } else {
+ onComplete();
+ break;
+ }
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/wcong/test/rxjava/package-info.java b/src/main/java/org/wcong/test/rxjava/package-info.java
new file mode 100644
index 0000000..7b13049
--- /dev/null
+++ b/src/main/java/org/wcong/test/rxjava/package-info.java
@@ -0,0 +1,3 @@
+package org.wcong.test.rxjava;
+
+// about rxjava
\ No newline at end of file