avatar

线程并发类

通过wait,notify管理并发

1
2
3
4
1.两个方法都需要放置到synchronized的作用域中
2.一旦执行wait方法,会释放synchronized所关联的锁,进入阻塞状态,无法再次主动地到可执行状态
3.一旦执行notify方法,会通知因调用wait方法而等待的线程,如有多个线程等待,则会任意挑选一个线程来唤醒
4.notifyAll会唤醒因wait而进入到阻塞状态的线程,但他们都未得到锁,因此会竞争锁,得到锁的继续执行,在锁被释放后,其他线程会继续竞争,依次类推

以生产者消费者问题观察wait和notify

基于Object类的wait、notify和notifyAll的方法,只能建立一个阻塞队列

代码展示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
public class ThreadConcurrencyTest {

/* 表示生产者线程的编号 */
private int i;
/* 表示待消费与已消费 */
private boolean b = true;

public synchronized void product(int i) {

if (!b) {

try {
/* 未消费等待 */
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

this.i = i;

/* 标记为已生产 */
b = false;

/* 通知消费者已生产,可以消费 */
notify();

}

public synchronized Integer consume() {

if (b) {

try {
/* 未生产等待 */
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

/* 标记为已消费 */
b = true;

/* 通知需要生产 */
notify();

return this.i;

}

/* 生产者线程 */
public static class Producer extends Thread {

private final ThreadConcurrencyTest threadConcurrencyTest;

Producer(ThreadConcurrencyTest threadConcurrencyTest) {
this.threadConcurrencyTest = threadConcurrencyTest;
}

@Override
public void run() {
for (int j = 0; j <= 5; j++) {

threadConcurrencyTest.product(j);

System.out.println("生产者: " + j);

}
}
}

/* 消费者线程 */
public static class Consumer extends Thread {

private final ThreadConcurrencyTest threadConcurrencyTest;

Consumer(ThreadConcurrencyTest threadConcurrencyTest) {
this.threadConcurrencyTest = threadConcurrencyTest;
}

@Override
public void run() {

Integer consume;

do {

consume = threadConcurrencyTest.consume();

System.out.println("消费者: " + consume);

/* 消费者数量 */
} while (consume != 9);

}
}

public static void main(String[] args) {

ThreadConcurrencyTest threadConcurrencyTest = new ThreadConcurrencyTest();

new Producer(threadConcurrencyTest).start();

new Consumer(threadConcurrencyTest).start();

}
}

打印展示

当生产者与消费者不相等时,程序将无法停止

以生产者消费者问题观察wait和notify

通过Condition实现线程间的通讯

通过Condition类,可以在不同的线程里创建多个阻塞队列

代码展示

该代码是死循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import java.util.LinkedList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ThreadConcurrencyTest {

private final Lock lock;

private final Condition notFull;

private final Condition notEmpty;

private final int maxSize;

private final LinkedList<String> list;

public ThreadConcurrencyTest(int maxSize) {
lock = new ReentrantLock();
notFull = lock.newCondition();
notEmpty = lock.newCondition();
this.maxSize = maxSize;
list = new LinkedList<>();
}

/* 生产 */
public void producer() {

lock.lock();

try {

/* 达到生产计划 */
while (list.size() == maxSize) {

System.out.println(Thread.currentThread().getName() + " 生产停止");

/* 阻塞生产线程 */
notFull.await();

}

list.add("Java");

System.out.println(Thread.currentThread().getName() + " 已生产数量: " + list.size());

Thread.sleep(1000);

/* 唤醒消费线程 */
notEmpty.signalAll();

} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void Consumer() {

lock.lock();

try {

while (list.size() == 0) {

System.out.println(Thread.currentThread().getName() + " 消费停止");

/* 阻塞消费 */
notEmpty.await();

}

System.out.println("消费" + list.poll() + "产品一个");

System.out.println(Thread.currentThread().getName() + "剩余产品数量: " + list.size());

Thread.sleep(1000);

/* 唤醒生产线程 */
notFull.signalAll();

} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public static class ProducerThread implements Runnable {

private final ThreadConcurrencyTest threadConcurrencyTest;

public ProducerThread(ThreadConcurrencyTest threadConcurrencyTest) {
this.threadConcurrencyTest = threadConcurrencyTest;
}

@Override
public void run() {
while (true) {
threadConcurrencyTest.producer();
}
}
}

public static class ConsumerThread implements Runnable {

private final ThreadConcurrencyTest threadConcurrencyTest;

public ConsumerThread(ThreadConcurrencyTest threadConcurrencyTest) {
this.threadConcurrencyTest = threadConcurrencyTest;
}

@Override
public void run() {
while (true) {
threadConcurrencyTest.Consumer();
}
}
}

public static void main(String[] args) {

ThreadConcurrencyTest threadConcurrencyTest = new ThreadConcurrencyTest(5);

ProducerThread producerThread = new ProducerThread(threadConcurrencyTest);

ConsumerThread consumerThread = new ConsumerThread(threadConcurrencyTest);

for (int i = 0; i < 5; i++) new Thread(producerThread).start();

for (int i = 0; i < 5; i++) new Thread(consumerThread).start();

}
}

打印展示

Condition对象是通过Lock对象生成的,并且可以创建多个Condition对象

通过Condition实现线程间的通讯

通过Semaphore管理多线程竞争

1
2
Semaphore类是个计数信号量
Semaphore会将超出permits可用资源数目的资源存放到阻塞队列中,进入阻塞队列后,当发现有可用的资源时,被阻塞的资源会被唤醒

代码展示

Semaphore构造函数

Semaphore构造函数,permits参数表示初始化可用的资源数目,fair表示是否是公平锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import java.util.concurrent.Semaphore;

public class ThreadConcurrencyTest {

public static class ConnectionProvide {

/* 连接数据库 */
public void provide() {

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static class HandLeUserThread extends Thread {

private final Semaphore semaphore;

private final String threadName;

private final ConnectionProvide connectionProvide;

public HandLeUserThread(String threadName, Semaphore semaphore, ConnectionProvide connectionProvide) {
this.threadName = threadName;
this.semaphore = semaphore;
this.connectionProvide = connectionProvide;
}

@Override
public void run() {

if (semaphore.availablePermits() > 0) System.out.println(threadName + " 开始连接应用");
else System.out.println(threadName + " 无可连接应用");

try {

/* 申请资源 */
semaphore.acquire();

connectionProvide.provide();

System.out.println(threadName + " 获得连接");

} catch (InterruptedException e) {
e.printStackTrace();
} finally {
/* 释放资源 */
semaphore.release();
}
}
}

public static void main(String[] args) {

ConnectionProvide connectionProvide = new ConnectionProvide();

Semaphore semaphore = new Semaphore(2, true);

for (int i = 0; i < 5; i++) new HandLeUserThread(String.valueOf(i), semaphore, connectionProvide).start();

}
}

打印展示

通过Semaphore管理多线程竞争

CountDownLatch同步计数器

1
CountDownLatch有一个正数计数器,countDown方法会对计数器做减操作,知道所有计数器都归0(或中断、超时),await线程才会继续,否则会一直阻塞,这样能够保证CountDownLatch所标记的前置任务都完成后,主任务在执行

代码展示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.*;

public class ThreadConcurrencyTest {

public static class Task implements Callable<String> {

private final CountDownLatch countDownLatch;

private final String taskName;

public Task(CountDownLatch countDownLatch, String taskName) {
this.countDownLatch = countDownLatch;
this.taskName = taskName;
}

@Override
public String call() throws Exception {

try {
return taskName + "返回";
} finally {

countDownLatch.countDown();

System.out.println(taskName + " --- " + countDownLatch.getCount());

}
}
}

public static void main(String[] args) {

/* 定义线程池 */
ExecutorService executorService = Executors.newCachedThreadPool();

List<String> list = new ArrayList<>();

CountDownLatch countDownLatch = new CountDownLatch(3);

Future<String> future1 = executorService.submit(new Task(countDownLatch, "Task1"));
Future<String> future2 = executorService.submit(new Task(countDownLatch, "Task2"));
Future<String> future3 = executorService.submit(new Task(countDownLatch, "Task3"));


try {

countDownLatch.await(4000, TimeUnit.MICROSECONDS);

System.out.println("count已为0");

list.add(future1.get());
list.add(future2.get());
list.add(future3.get());

} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}

list.forEach(System.out::println);

}
}

打印展示

CountDownLatch同步计数器

CompletableFuture

1
2
3
4
5
1.基于JDK1.8的lambad表达式
2.在supplyAsync方法中定义要执行的任务
3.通过thenAccept方法消费之前线程的结果
4.通过thenCombine方法整合多个线程的返回结果
5.通过applyToEither方法使用先返回的结果

代码展示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.BiFunction;
import java.util.concurrent.CompletableFuture;

public class ThreadConcurrencyTest {

public static void main(String[] args) {

/* 消费之前线程的结果 */
thenAcceptTest();

System.out.println("----------");

/* 整合多个线程的返回结果 */
thenCombineTest();

System.out.println("----------");

/* 使用先返回的结果 */
applyToEitherTest();

}

public static void thenAcceptTest() {

/* lambad表达式 */
CompletableFuture.supplyAsync(() -> "one two three")
.thenAccept(s -> System.out.println("thenAccept测试结果: 123 " + s));

/* 非lambad表达式 */
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "one two three";
}
}).thenAccept(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println("thenAccept测试结果: 123 " + s);
}
});

}

public static void thenCombineTest() {

/* lambad表达式 */
String s = CompletableFuture.supplyAsync(() -> {

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

return "part1";

}).thenCombine(CompletableFuture.supplyAsync(() -> {

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

return "part2";

}), (result1, result2) -> "thenCombine测试结果: " + result1 + " --- " + result2).join();

System.out.println(s);

/* 非lambad表达式 */
String j = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

return "part1";

}
}).thenCombine(CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

return "part2";

}
}), new BiFunction<String, String, String>() {
@Override
public String apply(String result1, String result2) {
return "thenCombine测试结果: " + result1 + " --- " + result2;
}
}).join();

System.out.println(j);

}

public static void applyToEitherTest() {

/* lambad表达式 */
String s = CompletableFuture.supplyAsync(() -> {

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

return "return 1";

}).applyToEither(CompletableFuture.supplyAsync(() -> {

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

return "return 2";

}), result -> "applyToEither方法测试结果: " + result).join();

System.out.println(s);

/* 非lambad表达式 */
String j = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

return "return 1";

}
}).applyToEither(CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

return "return 2";

}
}), new Function<String, String>() {
@Override
public String apply(String s) {
return "applyToEither方法测试结果: " + s;
}
}).join();

System.out.println(j);

}
}

打印展示

CompletableFuture

文章作者: 123
文章链接: https://gao5805123.github.io/123/2021/05/25/%E7%BA%BF%E7%A8%8B%E5%B9%B6%E5%8F%91%E7%B1%BB/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 123
打赏
  • 微信
    微信
  • 支付宝
    支付宝