Java并发控制机制详解
Java资料站
共 26739字,需浏览 54分钟
· 2021-05-28
点击上方蓝色字体,选择“标星公众号”
优质文章,第一时间送达
作者 | ypp91zr
来源 | urlify.cn/nqe2ei
Java内存模型
volatile变量–多线程间可见
同步关键字synchronized
public synchronized void method(){}
public void method(){
synchronized(this){
// do something …
}
}
public void method(Object o){
// before
synchronized(o){
// do something ...
}
// after
}
public synchronized static void method(){}
synchronized(obj){
while(<?>){
obj.wait();
// 收到通知后,继续执行。
}
}
public class BlockQueue{
private List list = new ArrayList();
public synchronized Object pop() throws InterruptedException{
while (list.size()==0){
this.wait();
}
if (list.size()>0){
return list.remove(0);
} else{
return null;
}
}
public synchronized Object put(Object obj){
list.add(obj);
this.notify();
}
}
Reentrantlock重入锁
try {
if (lock.tryLock(5, TimeUnit.SECONDS)) { //如果已经被lock,尝试等待5s,看是否可以获得锁,如果5s后仍然无法获得锁则返回false继续执行
// lock.lockInterruptibly();可以响应中断事件
try {
//操作
} finally {
lock.unlock();
}
}
} catch (InterruptedException e) {
e.printStackTrace(); //当前线程被中断时(interrupt),会抛InterruptedException
}
ReadWriteLock读写锁
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private Lock readLock = readWriteLock.readLock();
private Lock writeLock = readWriteLock.writeLock();
public Object handleRead() throws InterruptedException {
try {
readLock.lock();
Thread.sleep(1000);
return value;
}finally{
readLock.unlock();
}
}
public Object handleRead() throws InterruptedException {
try {
writeLock.lock();
Thread.sleep(1000);
return value;
}finally{
writeLock.unlock();
}
}
Condition对象
public class ArrayBlockingQueue extends AbstractQueue
implements BlockingQueue, java.io.Serializable {
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition(); // 生成与Lock绑定的Condition
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal(); // 通知
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) // 如果队列为空
notEmpty.await(); // 则消费者队列要等待一个非空的信号
return extract();
} finally {
lock.unlock();
}
}
private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal(); // 通知put() 线程队列已有空闲空间
return x;
}
// other code
}
Semaphore信号量
public Semaphore(int permits) {}
public Semaphore(int permits, boolean fair){} // 可以指定是否公平
public void acquire() throws InterruptedException {} //尝试获得一个准入的许可。若无法获得,则线程会等待,知道有线程释放一个许可或者当前线程被中断。
public void acquireUninterruptibly(){} // 类似于acquire(),但是不会响应中断。
public boolean tryAcquire(){} // 尝试获取,如果成功则为true,否则false。这个方法不会等待,立即返回。
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {} // 尝试等待多长时间
public void release() //用于在现场访问资源结束后,释放一个许可,以使其他等待许可的线程可以进行资源访问。
public class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem() throws InterruptedException {
available.acquire();
// 申请一个许可
// 同时只能有100个线程进入取得可用项,
// 超过100个则需要等待
return getNextAvailableItem();
}
public void putItem(Object x) {
// 将给定项放回池内,标记为未被使用
if (markAsUnused(x)) {
available.release();
// 新增了一个可用项,释放一个许可,请求资源的线程被激活一个
}
}
// 仅作示例参考,非真实数据
protected Object[] items = new Object[MAX_AVAILABLE]; // 用于对象池复用对象
protected boolean[] used = new boolean[MAX_AVAILABLE]; // 标记作用
protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null;
}
protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else {
return false;
}
}
}
return false;
}
}
ThreadLocal线程局部变量
public class TestNum {
// 通过匿名内部类覆盖ThreadLocal的initialValue()方法,指定初始值
private static ThreadLocal seqNum = new ThreadLocal() {
public Integer initialValue() {
return 0;
}
};
// 获取下一个序列值
public int getNextNum() {
seqNum.set(seqNum.get() + 1);
return seqNum.get();
}public static void main(String[] args) {
TestNum sn = new TestNum();
//3个线程共享sn,各自产生序列号
TestClient t1 = new TestClient(sn);
TestClient t2 = new TestClient(sn);
TestClient t3 = new TestClient(sn);
t1.start();
t2.start();
t3.start();
}
private static class TestClient extends Thread {
private TestNum sn;
public TestClient(TestNum sn) {
this.sn = sn;
}
public void run() {
for (int i = 0; i < 3; i++) {
// 每个线程打出3个序列值
System.out.println("thread[" + Thread.currentThread().getName() + "] --> sn["
+ sn.getNextNum() + "]");
}
}
}
}
thread[Thread-0] –> sn[1]
thread[Thread-1] –> sn[1]
thread[Thread-2] –> sn[1]
thread[Thread-1] –> sn[2]
thread[Thread-0] –> sn[2]
thread[Thread-1] –> sn[3]
thread[Thread-2] –> sn[2]
thread[Thread-0] –> sn[3]
thread[Thread-2] –> sn[3]
锁的性能和优化
public synchronized void syncMehod(){
beforeMethod();
mutexMethod();
afterMethod();
}
public void syncMehod(){
beforeMethod();
synchronized(this){
mutexMethod();
}
afterMethod();
}
public class LinkedBlockingQueue extends AbstractQueue
implements BlockingQueue, java.io.Serializable {
/* Lock held by take, poll, etc /
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly(); // 不能有两个线程同时读取数据
try {
while (count.get() == 0) { // 如果当前没有可用数据,一直等待put()的通知
notEmpty.await();
}
x = dequeue(); // 从头部移除一项
c = count.getAndDecrement(); // size减1
if (c > 1)
notEmpty.signal(); // 通知其他take()操作
} finally {
takeLock.unlock(); // 释放锁
}
if (c == capacity)
signalNotFull(); // 通知put()操作,已有空余空间
return x;
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly(); // 不能有两个线程同时put数据
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) { // 队列满了 则等待
notFull.await();
}
enqueue(node); // 加入队列
c = count.getAndIncrement();// size加1
if (c + 1 < capacity)
notFull.signal(); // 如果有足够空间,通知其他线程
} finally {
putLock.unlock();// 释放锁
}
if (c == 0)
signalNotEmpty();// 插入成功后,通知take()操作读取数据
}
// other code
}
public void syncMehod(){
synchronized(lock){
method1();
}
synchronized(lock){
method2();
}
}
public void syncMehod(){
synchronized(lock){
method1();
method2();
}
}
无锁的并行计算
public class TestAtomic {
private static final int MAX_THREADS = 3;
private static final int TASK_COUNT = 3;
private static final int TARGET_COUNT = 100 * 10000;
private AtomicInteger acount = new AtomicInteger(0);
private int count = 0;
synchronized int inc() {
return ++count;
}
synchronized int getCount() {
return count;
}
public class SyncThread implements Runnable {
String name;
long startTime;
TestAtomic out;
public SyncThread(TestAtomic o, long startTime) {
this.out = o;
this.startTime = startTime;
}
@Override
public void run() {
int v = out.inc();
while (v < TARGET_COUNT) {
v = out.inc();
}
long endTime = System.currentTimeMillis();
System.out.println("SyncThread spend:" + (endTime - startTime) + "ms" + ", v=" + v);
}
}
public class AtomicThread implements Runnable {
String name;
long startTime;
public AtomicThread(long startTime) {
this.startTime = startTime;
}
@Override
public void run() {
int v = acount.incrementAndGet();
while (v < TARGET_COUNT) {
v = acount.incrementAndGet();
}
long endTime = System.currentTimeMillis();
System.out.println("AtomicThread spend:" + (endTime - startTime) + "ms" + ", v=" + v);
}
}
@Test
public void testSync() throws InterruptedException {
ExecutorService exe = Executors.newFixedThreadPool(MAX_THREADS);
long startTime = System.currentTimeMillis();
SyncThread sync = new SyncThread(this, startTime);
for (int i = 0; i < TASK_COUNT; i++) {
exe.submit(sync);
}
Thread.sleep(10000);
}
@Test
public void testAtomic() throws InterruptedException {
ExecutorService exe = Executors.newFixedThreadPool(MAX_THREADS);
long startTime = System.currentTimeMillis();
AtomicThread atomic = new AtomicThread(startTime);
for (int i = 0; i < TASK_COUNT; i++) {
exe.submit(atomic);
}
Thread.sleep(10000);
}
}
testSync():
SyncThread spend:201ms, v=1000002
SyncThread spend:201ms, v=1000000
SyncThread spend:201ms, v=1000001
testAtomic():
AtomicThread spend:43ms, v=1000000
AtomicThread spend:44ms, v=1000001
AtomicThread spend:46ms, v=1000002
结束语
评论
豆瓣9.7,这部Java神作第3版重磅上市!
文末赠书Java 程序员们开年就有重磅好消息,《Effective Java 中文版(原书第 3 版)》要上市啦!该书的第1版出版于 2001 年,当时就在业界流传开来,受到广泛赞誉。时至今日,已热销近20年,本书第 3 版已是 Java 程序员的必读神书,被誉为“Java 四大名著之一”,甚至连
编码之外
0
光纤详解:光纤跳线如何分类,多向单模转换?
本文来自“光纤详解:光纤跳线如何分类,多向单模转换?”,光纤跳线作为光网络布线最基础的元件之一,被广泛应用于光纤链路的搭建中。如今,光纤制造商根据应用场景的不同推出众多类型的光纤跳线,如MPO/LC/SC/FC/ST光纤跳线,单工/双工光纤跳线,单模/多模光纤跳线等,它们之间各有特色,且不可替代。本
架构师技术联盟
0
让扩散模型听话的小秘籍?CAN:通过操控权重来控制条件生成模型,图像生成效率大升级!
↑ 点击蓝字 关注极市平台作者丨科技猛兽编辑丨极市平台极市导读 本文提出的 CAN 模型 (Condition-Aware Neural Network) 是一种对图像生成模型添加控制的方法。CAN 可以通过动态操纵神经网络的权重来控制图像生成过程。作者在 ImageNet 图像
极市平台
0
大量 Java 开源项目停更...
点击关注公众号,Java 干货及时推送↓推荐阅读:投了 100 多份简历后…出品 | OSC开源社区(ID:oschina2013)Sonatype 发布了最新的一份《软件供应链状况》报告,深入探讨了如何在充满选择的世界中定义更好的软件,并探讨人工智能 (AI) 对软件开发的深远
Java技术栈
0
一站式解决方案:基于 Arthas 实现服务发现和权限控制
来源:juejin.cn/post/7281849496983994383👉 欢迎加入小哈的星球 ,你将获得: 专属的项目实战 / Java 学习路线 / 一对一提问 / 学习打卡 / 赠书福利全栈前后端分离博客项目 2.0 版本完结啦, 演示链接
小哈学Java
0
Java 神作,必读!
Java 程序员们开年就有重磅好消息,《Effective Java 中文版(原书第 3 版)》要上市啦!该书的第1版出版于 2001 年,当时就在业界流传开来,受到广泛赞誉。时至今日,已热销近20年,本书第 3 版已是 Java 程序员的必读神书,被誉为“Java 四大名著之一”,甚至连 Java
小哈学Java
0
GPT的风也吹到了CV,详解自回归视觉模型的先驱! ImageGPT:使用图像序列训练图像 GPT模型
作者丨科技猛兽编辑丨极市平台导读 在 CIFAR-10 上,iGPT 使用 linear probing 实现了 96.3% 的精度,优于有监督的 Wide ResNet,并通过完全微调实现了 99.0% 的精度,匹配顶级监督预训练模型。本文目录1 自回归视觉模型的先驱 ImageGPT:
机器学习初学者
0
【深度学习】图解自注意力机制(Self-Attention)
一、注意力机制和自注意力机制的区别Attention机制与Self-Attention机制的区别传统的Attention机制发生在Target的元素和Source中的所有元素之间。简单讲就是说Attention机制中的权重的计算需要Target来参与。即在Encoder-Decoder 模型中,At
机器学习初学者
0