From b9016e583ccdaad758d40045921f91d4bbacde24 Mon Sep 17 00:00:00 2001
From: zhaoziwen <516160271@qq.com>
Date: Fri, 9 Sep 2016 16:54:10 +0800
Subject: [PATCH 1/3] consumer producer
---
concurrent/concurrent.iml | 1 +
.../me.zzw.app.concurrent/TCyclicBarrier.java | 78 +++++
.../main/me.zzw.app.concurrent/TFurture.java | 78 +++++
.../me.zzw.app.concurrent/TSemaphore.java | 71 +++++
.../TcountDownLatch.java | 63 ++++
.../TcountDownLatch2.java | 55 ++++
.../main/me.zzw.app.concurrent/Tphaser.java | 38 +++
.../TrecursiveAction.java | 59 ++++
.../me.zzw.app.concurrent/TthreadFactory.java | 13 +
.../consumerAndProductor/Main.java | 128 ++++++++
.../consumerAndProductor/Test.java | 273 ++++++++++++++++++
11 files changed, 857 insertions(+)
create mode 100644 concurrent/src/main/me.zzw.app.concurrent/TCyclicBarrier.java
create mode 100644 concurrent/src/main/me.zzw.app.concurrent/TFurture.java
create mode 100644 concurrent/src/main/me.zzw.app.concurrent/TSemaphore.java
create mode 100644 concurrent/src/main/me.zzw.app.concurrent/TcountDownLatch.java
create mode 100644 concurrent/src/main/me.zzw.app.concurrent/TcountDownLatch2.java
create mode 100644 concurrent/src/main/me.zzw.app.concurrent/Tphaser.java
create mode 100644 concurrent/src/main/me.zzw.app.concurrent/TrecursiveAction.java
create mode 100644 concurrent/src/main/me.zzw.app.concurrent/TthreadFactory.java
create mode 100644 concurrent/src/main/me.zzw.app.concurrent/consumerAndProductor/Main.java
create mode 100644 concurrent/src/main/me.zzw.app.concurrent/consumerAndProductor/Test.java
diff --git a/concurrent/concurrent.iml b/concurrent/concurrent.iml
index d734bf0..21e3f17 100644
--- a/concurrent/concurrent.iml
+++ b/concurrent/concurrent.iml
@@ -4,6 +4,7 @@
+
diff --git a/concurrent/src/main/me.zzw.app.concurrent/TCyclicBarrier.java b/concurrent/src/main/me.zzw.app.concurrent/TCyclicBarrier.java
new file mode 100644
index 0000000..b2292fc
--- /dev/null
+++ b/concurrent/src/main/me.zzw.app.concurrent/TCyclicBarrier.java
@@ -0,0 +1,78 @@
+package main.me.zzw.app.concurrent;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+
+/**
+ * Created by infosea on 2016-09-09.
+ */
+class Solver {
+ final int N;
+ final float[][] data;
+ final CyclicBarrier barrier;
+
+ class Worker implements Runnable {
+ int myRow;
+
+ public Worker(int myRow) {
+ this.myRow = myRow;
+ }
+
+ @Override
+ public void run() {
+ // while( !done()){
+ processRow(myRow);
+
+ try{
+ barrier.await();
+ }catch (InterruptedException e){
+ return;
+ }catch (BrokenBarrierException e){
+ return;
+ }
+ // }
+ }
+
+ private boolean done() {
+ return true;
+ }
+
+ private void processRow(int myRow) {
+ System.out.println("is processing row " + myRow);
+ }
+ }
+
+ public Solver(float[][] matrix) {
+ this.data = matrix;
+ this.N = matrix.length;
+ Runnable barrierAction = () ->mergeRows(data);
+ barrier = new CyclicBarrier(N, barrierAction);
+
+ List threads = new ArrayList<>(N);
+ for (int i =0; i< N ;i++){
+ Thread thread = new Thread(new Worker(i));
+ threads.add(thread);
+ thread.start();
+ }
+
+ // wait until done
+ for ( Thread thread : threads)
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void mergeRows(float[][] data) {
+ System.out.println( "has finished " + data);
+ }
+}
+public class TCyclicBarrier {
+ public static void main(String[] args){
+ float[][] matrix = new float[][]{{1.2f,1,3f},{2.2f,2.3f}};
+ Solver solver = new Solver(matrix);
+ }
+}
diff --git a/concurrent/src/main/me.zzw.app.concurrent/TFurture.java b/concurrent/src/main/me.zzw.app.concurrent/TFurture.java
new file mode 100644
index 0000000..bab8625
--- /dev/null
+++ b/concurrent/src/main/me.zzw.app.concurrent/TFurture.java
@@ -0,0 +1,78 @@
+package main.me.zzw.app.concurrent;
+
+import jdk.nashorn.internal.codegen.CompilerConstants;
+
+import java.io.File;
+import java.util.concurrent.*;
+
+/**
+ * Created by infosea on 2016-09-09.
+ */
+interface ArchiveSearcher {
+ String search(String target);
+}
+
+class App {
+ ExecutorService executor = Executors.newFixedThreadPool(5);
+ ArchiveSearcher searcher = new ArchiveSearcher() {
+ private String[] strings ={ "a","b","c", "a", "e", "f", "g"};
+ @Override
+ public String search(String target) {
+ String strs = "";
+ for(String s : strings){
+ if(s.equals(target)){
+ strs +=s;
+ }
+ }
+ return strs;
+ }
+ };
+ void showSearch(final String target) throws InterruptedException {
+ Future future = executor.submit(() -> searcher.search(target));
+ displayOtherThings();
+ try {
+ displayText(future.get());
+ }catch (ExecutionException ex) {cleanup(); return;}
+ }
+
+ void showSearch2(final String target) throws InterruptedException {
+ FutureTask future =
+ new FutureTask( () -> searcher.search(target));
+ executor.execute(future);
+ displayOtherThings();
+ try {
+ displayText(future.get());
+ }catch (ExecutionException ex) {cleanup(); return;}
+ }
+
+ void shutdown(){
+ executor.shutdownNow();
+ }
+
+ private void displayOtherThings() {
+ System.out.println(" display other things");
+ }
+
+ private void displayText(String text){
+ System.out.println(text);
+ }
+
+ private void cleanup(){
+ executor.shutdown();
+ }
+
+}
+public class TFurture {
+ public static void main(String[] args){
+ App app = new App();
+ try {
+ app.showSearch2("a");
+ app.showSearch2("b");
+ app.showSearch2("c");
+ app.showSearch2("e");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ app.shutdown();
+ }
+}
diff --git a/concurrent/src/main/me.zzw.app.concurrent/TSemaphore.java b/concurrent/src/main/me.zzw.app.concurrent/TSemaphore.java
new file mode 100644
index 0000000..03f8291
--- /dev/null
+++ b/concurrent/src/main/me.zzw.app.concurrent/TSemaphore.java
@@ -0,0 +1,71 @@
+package main.me.zzw.app.concurrent;
+
+import java.util.Objects;
+import java.util.concurrent.Semaphore;
+
+/**
+ * Created by infosea on 2016-09-09.
+ */
+class Pool{
+ private static final int MAX_VAILABLE = 100;
+ private final Semaphore available = new Semaphore(MAX_VAILABLE, true);
+
+ public Object getItem() throws InterruptedException {
+ available.acquire();
+ return getNextAvailableItem();
+ }
+
+ public void putItem(Object x) {
+ if (markAsUnused(x))
+ available.release();
+ }
+
+ // Not a particularly efficient data structure; just for demo
+
+ protected Object[] items = new Integer[MAX_VAILABLE];
+
+ public Pool(){
+ for (int i=0;i tasks){
+ final Phaser phaser = new Phaser(1); // "1" to register self
+ // create and start threads
+ for (final Runnable task : tasks){
+ phaser.register();
+ new Thread() {
+ @Override
+ public void run() {
+ phaser.arriveAndAwaitAdvance(); // await all creation
+ task.run();
+ }
+ }.start();
+ }
+
+ // allow thread to start and deregister self
+ phaser.arriveAndDeregister();
+ }
+ public static void main(String[] args){
+ final int tasksSize = 5;
+ List tasks = new ArrayList<>(tasksSize);
+ for(int i= 0; i< tasksSize; i++){
+ tasks.add(() -> System.out.println("task is running"));
+ }
+ Tphaser tphaser = new Tphaser();
+ tphaser.runTasks(tasks);
+ }
+
+}
diff --git a/concurrent/src/main/me.zzw.app.concurrent/TrecursiveAction.java b/concurrent/src/main/me.zzw.app.concurrent/TrecursiveAction.java
new file mode 100644
index 0000000..d7637c4
--- /dev/null
+++ b/concurrent/src/main/me.zzw.app.concurrent/TrecursiveAction.java
@@ -0,0 +1,59 @@
+package main.me.zzw.app.concurrent;
+
+import java.util.Arrays;
+import java.util.concurrent.RecursiveAction;
+
+/**
+ * Created by infosea on 2016-09-09.
+ */
+
+public class TrecursiveAction {
+ static class SortTask extends RecursiveAction{
+ final long[] array; final int lo, hi;
+
+ public SortTask(long[] array, int lo, int hi) {
+ this.array = array;
+ this.lo = lo;
+ this.hi = hi;
+ }
+
+ public SortTask(long[] array) {
+ this(array,0,array.length);
+ }
+
+ @Override
+ protected void compute() {
+ if (hi - lo < THRESHOLD)
+ sortSquentially(lo, hi);
+ else{
+ int mid = (lo + hi) >>> 1;
+ invokeAll(new SortTask(array, lo, mid),
+ new SortTask(array, mid, hi));
+ merge(lo, mid, hi);
+ }
+ }
+ // implementation details follow;
+ static final int THRESHOLD = 1000;
+ void sortSquentially(int lo, int hi) {
+ Arrays.sort(array, lo, hi);
+ }
+ void merge(int lo, int mid, int hi) {
+ long[] buf = Arrays.copyOfRange(array, lo, mid);
+ for(int i = 0, j = lo, k = mid; i < buf.length; j++)
+ array[j] = (k == hi || buf[i] < array[k]) ?
+ buf[i++] : array[k++];
+ }
+ }
+
+ public static void main(String[] args){
+ long[] sortingLongs = new long[100000] ;
+ for(int i = 0; i< 100000; i++){
+ sortingLongs[i] = Long.valueOf(i).longValue();
+ }
+ TrecursiveAction.SortTask sortTask = new TrecursiveAction.SortTask(sortingLongs);
+ sortTask.compute();
+ for(int i = 0; i < sortTask.array.length; i++){
+ System.out.println(sortTask.array[i]);
+ }
+ }
+}
diff --git a/concurrent/src/main/me.zzw.app.concurrent/TthreadFactory.java b/concurrent/src/main/me.zzw.app.concurrent/TthreadFactory.java
new file mode 100644
index 0000000..7b080b0
--- /dev/null
+++ b/concurrent/src/main/me.zzw.app.concurrent/TthreadFactory.java
@@ -0,0 +1,13 @@
+package main.me.zzw.app.concurrent;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Created by infosea on 2016-09-09.
+ */
+public class TthreadFactory {
+ public static void main(String[] args){
+ ThreadFactory factory = Executors.defaultThreadFactory();
+ }
+}
diff --git a/concurrent/src/main/me.zzw.app.concurrent/consumerAndProductor/Main.java b/concurrent/src/main/me.zzw.app.concurrent/consumerAndProductor/Main.java
new file mode 100644
index 0000000..cba7219
--- /dev/null
+++ b/concurrent/src/main/me.zzw.app.concurrent/consumerAndProductor/Main.java
@@ -0,0 +1,128 @@
+package main.me.zzw.app.concurrent.consumerAndProductor;
+
+import java.util.Stack;
+
+/**
+ * Created by infosea on 2016-09-09.
+ */
+class Consumer extends Thread{
+ Base base;
+
+ public Consumer(Base base) {
+ this.base = base;
+ }
+
+ public void consume(){
+ base.pop();
+
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ consume();
+ }
+ }
+}
+
+class Producer extends Thread{
+ public static int item = 0;
+ Base base;
+
+ public Producer(Base base) {
+ this.base = base;
+ }
+
+ public void product(){
+ while(true) {
+ base.push(new Item("item" + ++item));
+ }
+ }
+
+ @Override
+ public void run() {
+ product();
+ }
+}
+
+class Item {
+ private String name;
+
+ public Item(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+}
+
+class Base {
+ private Stack- items;
+
+ public Base(Stack
- items) {
+ this.items = items;
+ }
+
+ public Stack
- getItems() {
+ return items;
+ }
+
+ public void pop() {
+ synchronized (items) {
+ if (items.size() !=0) {
+ Item item = items.pop();
+ System.out.println("consume item : " + item.getName());
+ items.notifyAll();
+
+ } else {
+ try {
+ System.out.println(items.size());
+ items.wait();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ }
+
+ public void push(Item item){
+ synchronized (items) {
+ if(items.size() ==0) {
+ items.push(item);
+ System.out.println("product item : item" + item);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ items.notifyAll();
+ }else{
+ try {
+ System.out.println(items.size());
+ items.wait();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
+
+public class Main {
+ public static void main(String[] args){
+
+ Base base = new Base(new Stack
- ());
+ Consumer consumer1 = new Consumer(base);
+ Producer producer1 = new Producer(base);
+ consumer1.start();
+ producer1.start();
+ Consumer consumer2 = new Consumer(base);
+ Producer producer2 = new Producer(base);
+ consumer2.start();
+ producer2.start();
+ }
+
+}
diff --git a/concurrent/src/main/me.zzw.app.concurrent/consumerAndProductor/Test.java b/concurrent/src/main/me.zzw.app.concurrent/consumerAndProductor/Test.java
new file mode 100644
index 0000000..ff8d8c7
--- /dev/null
+++ b/concurrent/src/main/me.zzw.app.concurrent/consumerAndProductor/Test.java
@@ -0,0 +1,273 @@
+package main.me.zzw.app.concurrent.consumerAndProductor;
+
+/**
+ * Created by infosea on 2016-09-09.
+ */
+import java.util.LinkedList;
+
+/**
+ * 仓库类Storage实现缓冲区
+ *
+ * Email:530025983@qq.com
+ *
+ * @author MONKEY.D.MENG 2011-03-15
+ *
+ */
+ class Storage
+{
+ // 仓库最大存储量
+ private final int MAX_SIZE = 100;
+
+ // 仓库存储的载体
+ private LinkedList