通过wait,notify管理并发
1 2 3 4
| 1.两个方法都需要放置到synchronized的作用域中 2.一旦执行wait方法,会释放synchronized所关联的锁,进入阻塞状态,无法再次主动地到可执行状态 3.一旦执行notify方法,会通知因调用wait方法而等待的线程,如有多个线程等待,则会任意挑选一个线程来唤醒 4.notifyAll会唤醒因wait而进入到阻塞状态的线程,但他们都未得到锁,因此会竞争锁,得到锁的继续执行,在锁被释放后,其他线程会继续竞争,依次类推
|
以生产者消费者问题观察wait和notify
基于Object类的wait、notify和notifyAll的方法,只能建立一个阻塞队列
代码展示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
| public class ThreadConcurrencyTest {
private int i; private boolean b = true;
public synchronized void product(int i) {
if (!b) {
try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } }
this.i = i;
b = false;
notify();
}
public synchronized Integer consume() {
if (b) {
try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } }
b = true;
notify();
return this.i;
}
public static class Producer extends Thread {
private final ThreadConcurrencyTest threadConcurrencyTest;
Producer(ThreadConcurrencyTest threadConcurrencyTest) { this.threadConcurrencyTest = threadConcurrencyTest; }
@Override public void run() { for (int j = 0; j <= 5; j++) {
threadConcurrencyTest.product(j);
System.out.println("生产者: " + j);
} } }
public static class Consumer extends Thread {
private final ThreadConcurrencyTest threadConcurrencyTest;
Consumer(ThreadConcurrencyTest threadConcurrencyTest) { this.threadConcurrencyTest = threadConcurrencyTest; }
@Override public void run() {
Integer consume;
do {
consume = threadConcurrencyTest.consume();
System.out.println("消费者: " + consume);
} while (consume != 9);
} }
public static void main(String[] args) {
ThreadConcurrencyTest threadConcurrencyTest = new ThreadConcurrencyTest();
new Producer(threadConcurrencyTest).start();
new Consumer(threadConcurrencyTest).start();
} }
|
打印展示
当生产者与消费者不相等时,程序将无法停止
通过Condition实现线程间的通讯
通过Condition类,可以在不同的线程里创建多个阻塞队列
代码展示
该代码是死循环
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
| import java.util.LinkedList; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;
public class ThreadConcurrencyTest {
private final Lock lock;
private final Condition notFull;
private final Condition notEmpty;
private final int maxSize;
private final LinkedList<String> list;
public ThreadConcurrencyTest(int maxSize) { lock = new ReentrantLock(); notFull = lock.newCondition(); notEmpty = lock.newCondition(); this.maxSize = maxSize; list = new LinkedList<>(); }
public void producer() {
lock.lock();
try {
while (list.size() == maxSize) {
System.out.println(Thread.currentThread().getName() + " 生产停止");
notFull.await();
}
list.add("Java");
System.out.println(Thread.currentThread().getName() + " 已生产数量: " + list.size());
Thread.sleep(1000);
notEmpty.signalAll();
} catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
public void Consumer() {
lock.lock();
try {
while (list.size() == 0) {
System.out.println(Thread.currentThread().getName() + " 消费停止");
notEmpty.await();
}
System.out.println("消费" + list.poll() + "产品一个");
System.out.println(Thread.currentThread().getName() + "剩余产品数量: " + list.size());
Thread.sleep(1000);
notFull.signalAll();
} catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
public static class ProducerThread implements Runnable {
private final ThreadConcurrencyTest threadConcurrencyTest;
public ProducerThread(ThreadConcurrencyTest threadConcurrencyTest) { this.threadConcurrencyTest = threadConcurrencyTest; }
@Override public void run() { while (true) { threadConcurrencyTest.producer(); } } }
public static class ConsumerThread implements Runnable {
private final ThreadConcurrencyTest threadConcurrencyTest;
public ConsumerThread(ThreadConcurrencyTest threadConcurrencyTest) { this.threadConcurrencyTest = threadConcurrencyTest; }
@Override public void run() { while (true) { threadConcurrencyTest.Consumer(); } } }
public static void main(String[] args) {
ThreadConcurrencyTest threadConcurrencyTest = new ThreadConcurrencyTest(5);
ProducerThread producerThread = new ProducerThread(threadConcurrencyTest);
ConsumerThread consumerThread = new ConsumerThread(threadConcurrencyTest);
for (int i = 0; i < 5; i++) new Thread(producerThread).start();
for (int i = 0; i < 5; i++) new Thread(consumerThread).start();
} }
|
打印展示
Condition对象是通过Lock对象生成的,并且可以创建多个Condition对象
通过Semaphore管理多线程竞争
1 2
| Semaphore类是个计数信号量 Semaphore会将超出permits可用资源数目的资源存放到阻塞队列中,进入阻塞队列后,当发现有可用的资源时,被阻塞的资源会被唤醒
|
代码展示
Semaphore构造函数,permits参数表示初始化可用的资源数目,fair表示是否是公平锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| import java.util.concurrent.Semaphore;
public class ThreadConcurrencyTest {
public static class ConnectionProvide {
public void provide() {
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
public static class HandLeUserThread extends Thread {
private final Semaphore semaphore;
private final String threadName;
private final ConnectionProvide connectionProvide;
public HandLeUserThread(String threadName, Semaphore semaphore, ConnectionProvide connectionProvide) { this.threadName = threadName; this.semaphore = semaphore; this.connectionProvide = connectionProvide; }
@Override public void run() {
if (semaphore.availablePermits() > 0) System.out.println(threadName + " 开始连接应用"); else System.out.println(threadName + " 无可连接应用");
try {
semaphore.acquire();
connectionProvide.provide();
System.out.println(threadName + " 获得连接");
} catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } } }
public static void main(String[] args) {
ConnectionProvide connectionProvide = new ConnectionProvide();
Semaphore semaphore = new Semaphore(2, true);
for (int i = 0; i < 5; i++) new HandLeUserThread(String.valueOf(i), semaphore, connectionProvide).start();
} }
|
打印展示
CountDownLatch同步计数器
1
| CountDownLatch有一个正数计数器,countDown方法会对计数器做减操作,知道所有计数器都归0(或中断、超时),await线程才会继续,否则会一直阻塞,这样能够保证CountDownLatch所标记的前置任务都完成后,主任务在执行
|
代码展示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| import java.util.List; import java.util.ArrayList; import java.util.concurrent.*;
public class ThreadConcurrencyTest {
public static class Task implements Callable<String> {
private final CountDownLatch countDownLatch;
private final String taskName;
public Task(CountDownLatch countDownLatch, String taskName) { this.countDownLatch = countDownLatch; this.taskName = taskName; }
@Override public String call() throws Exception {
try { return taskName + "返回"; } finally {
countDownLatch.countDown();
System.out.println(taskName + " --- " + countDownLatch.getCount());
} } }
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
List<String> list = new ArrayList<>();
CountDownLatch countDownLatch = new CountDownLatch(3);
Future<String> future1 = executorService.submit(new Task(countDownLatch, "Task1")); Future<String> future2 = executorService.submit(new Task(countDownLatch, "Task2")); Future<String> future3 = executorService.submit(new Task(countDownLatch, "Task3"));
try {
countDownLatch.await(4000, TimeUnit.MICROSECONDS);
System.out.println("count已为0");
list.add(future1.get()); list.add(future2.get()); list.add(future3.get());
} catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { executorService.shutdown(); }
list.forEach(System.out::println);
} }
|
打印展示
CompletableFuture
1 2 3 4 5
| 1.基于JDK1.8的lambad表达式 2.在supplyAsync方法中定义要执行的任务 3.通过thenAccept方法消费之前线程的结果 4.通过thenCombine方法整合多个线程的返回结果 5.通过applyToEither方法使用先返回的结果
|
代码展示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
| import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.function.BiFunction; import java.util.concurrent.CompletableFuture;
public class ThreadConcurrencyTest {
public static void main(String[] args) {
thenAcceptTest();
System.out.println("----------");
thenCombineTest();
System.out.println("----------");
applyToEitherTest();
}
public static void thenAcceptTest() {
CompletableFuture.supplyAsync(() -> "one two three") .thenAccept(s -> System.out.println("thenAccept测试结果: 123 " + s));
CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { return "one two three"; } }).thenAccept(new Consumer<String>() { @Override public void accept(String s) { System.out.println("thenAccept测试结果: 123 " + s); } });
}
public static void thenCombineTest() {
String s = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
return "part1";
}).thenCombine(CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
return "part2";
}), (result1, result2) -> "thenCombine测试结果: " + result1 + " --- " + result2).join();
System.out.println(s);
String j = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() {
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
return "part1";
} }).thenCombine(CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() {
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
return "part2";
} }), new BiFunction<String, String, String>() { @Override public String apply(String result1, String result2) { return "thenCombine测试结果: " + result1 + " --- " + result2; } }).join();
System.out.println(j);
}
public static void applyToEitherTest() {
String s = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
return "return 1";
}).applyToEither(CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
return "return 2";
}), result -> "applyToEither方法测试结果: " + result).join();
System.out.println(s);
String j = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() {
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
return "return 1";
} }).applyToEither(CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() {
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
return "return 2";
} }), new Function<String, String>() { @Override public String apply(String s) { return "applyToEither方法测试结果: " + s; } }).join();
System.out.println(j);
} }
|
打印展示