diff --git a/README.md b/README.md index 8d6bb47..916e80b 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,6 @@ ## learn-java example of learn java +algorithm has moved to [learn-algorithm](https://github.com/wcong/learn-algorithm) + diff --git a/pom.xml b/pom.xml index b51b888..5f4edf1 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ junit junit - 4.12 + 4.13.1 test @@ -206,6 +206,11 @@ jackson-annotations 2.3.3 + + io.reactivex.rxjava2 + rxjava + 2.1.6 + org.freemarker freemarker diff --git a/src/main/java/org/wcong/test/algorithm/dp/LongestCommonSubsequence.java b/src/main/java/org/wcong/test/algorithm/dp/LongestCommonSubSequence.java similarity index 100% rename from src/main/java/org/wcong/test/algorithm/dp/LongestCommonSubsequence.java rename to src/main/java/org/wcong/test/algorithm/dp/LongestCommonSubSequence.java diff --git a/src/main/java/org/wcong/test/rxjava/MyPublisher.java b/src/main/java/org/wcong/test/rxjava/MyPublisher.java new file mode 100644 index 0000000..307c6cd --- /dev/null +++ b/src/main/java/org/wcong/test/rxjava/MyPublisher.java @@ -0,0 +1,120 @@ +package org.wcong.test.rxjava; + +import io.reactivex.Flowable; +import io.reactivex.FlowableSubscriber; +import io.reactivex.Scheduler; +import io.reactivex.functions.Consumer; +import io.reactivex.schedulers.Schedulers; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +public class MyPublisher { + + private static CountDownLatch countDownLatch = new CountDownLatch(4); + + public static void main(String[] args) throws InterruptedException { + new ParallelFlowableRange(1, 100, Schedulers.computation()).subscribe(new Consumer() { + @Override + public void accept(Integer integer) throws Exception { + System.out.println(Thread.currentThread().getName() + ":" + integer); + } + }); + countDownLatch.await(); + } + + + public static class ParallelFlowableRange extends Flowable { + + private Scheduler scheduler; + + private int start; + private int end; + + public ParallelFlowableRange(int start, int end, Scheduler scheduler) { + this.scheduler = scheduler; + this.start = start; + this.end = end; + } + + @Override + protected void subscribeActual(Subscriber subscriber) { + AtomicInteger atomicInteger = new AtomicInteger(start); + for (int i = 0; i < 4; i++) { + MySubscriber mySubscriber = new MySubscriber(subscriber, Integer.MAX_VALUE, scheduler.createWorker(), atomicInteger, end); + mySubscriber.onSubscribe(mySubscriber); + } + } + } + + public static class MySubscriber extends AtomicInteger implements FlowableSubscriber, Subscription, Runnable { + + private int prefetch; + private Scheduler.Worker worker; + private Subscriber actual; + private boolean cancel; + private AtomicInteger atomicInteger; + private int end; + + public MySubscriber(Subscriber actual, int prefetch, Scheduler.Worker worker, AtomicInteger atomicInteger, int end) { + this.actual = actual; + this.prefetch = prefetch; + this.worker = worker; + this.atomicInteger = atomicInteger; + this.end = end; + } + + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(prefetch); + } + + @Override + public void onNext(Integer integer) { + actual.onNext(integer); + } + + @Override + public void onError(Throwable throwable) { + actual.onError(throwable); + } + + @Override + public void onComplete() { + countDownLatch.countDown(); + actual.onComplete(); + } + + @Override + public void request(long l) { + worker.schedule(this); + } + + @Override + public void cancel() { + cancel = true; + } + + @Override + public void run() { + while (true) { + if (cancel) { + break; + } + int num = atomicInteger.getAndIncrement(); + if (num < end) { + try { + onNext(num); + } catch (Exception e) { + onError(e); + } + } else { + onComplete(); + break; + } + } + } + } +} diff --git a/src/main/java/org/wcong/test/rxjava/package-info.java b/src/main/java/org/wcong/test/rxjava/package-info.java new file mode 100644 index 0000000..7b13049 --- /dev/null +++ b/src/main/java/org/wcong/test/rxjava/package-info.java @@ -0,0 +1,3 @@ +package org.wcong.test.rxjava; + +// about rxjava \ No newline at end of file