Java并发控制机制详解
点击上方蓝色字体,选择“标星公众号”
优质文章,第一时间送达
作者 | ypp91zr
来源 | urlify.cn/nqe2ei
Java内存模型
volatile变量–多线程间可见
同步关键字synchronized
public synchronized void method(){}
public void method(){
synchronized(this){
// do something …
}
}
public void method(Object o){
// before
synchronized(o){
// do something ...
}
// after
}
public synchronized static void method(){}
synchronized(obj){
while(<?>){
obj.wait();
// 收到通知后,继续执行。
}
}
public class BlockQueue{
private List list = new ArrayList();
public synchronized Object pop() throws InterruptedException{
while (list.size()==0){
this.wait();
}
if (list.size()>0){
return list.remove(0);
} else{
return null;
}
}
public synchronized Object put(Object obj){
list.add(obj);
this.notify();
}
}
Reentrantlock重入锁
try {
if (lock.tryLock(5, TimeUnit.SECONDS)) { //如果已经被lock,尝试等待5s,看是否可以获得锁,如果5s后仍然无法获得锁则返回false继续执行
// lock.lockInterruptibly();可以响应中断事件
try {
//操作
} finally {
lock.unlock();
}
}
} catch (InterruptedException e) {
e.printStackTrace(); //当前线程被中断时(interrupt),会抛InterruptedException
}
ReadWriteLock读写锁
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private Lock readLock = readWriteLock.readLock();
private Lock writeLock = readWriteLock.writeLock();
public Object handleRead() throws InterruptedException {
try {
readLock.lock();
Thread.sleep(1000);
return value;
}finally{
readLock.unlock();
}
}
public Object handleRead() throws InterruptedException {
try {
writeLock.lock();
Thread.sleep(1000);
return value;
}finally{
writeLock.unlock();
}
}
Condition对象
public class ArrayBlockingQueue extends AbstractQueue
implements BlockingQueue, java.io.Serializable {
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition(); // 生成与Lock绑定的Condition
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal(); // 通知
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) // 如果队列为空
notEmpty.await(); // 则消费者队列要等待一个非空的信号
return extract();
} finally {
lock.unlock();
}
}
private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal(); // 通知put() 线程队列已有空闲空间
return x;
}
// other code
}
Semaphore信号量
public Semaphore(int permits) {}
public Semaphore(int permits, boolean fair){} // 可以指定是否公平
public void acquire() throws InterruptedException {} //尝试获得一个准入的许可。若无法获得,则线程会等待,知道有线程释放一个许可或者当前线程被中断。
public void acquireUninterruptibly(){} // 类似于acquire(),但是不会响应中断。
public boolean tryAcquire(){} // 尝试获取,如果成功则为true,否则false。这个方法不会等待,立即返回。
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {} // 尝试等待多长时间
public void release() //用于在现场访问资源结束后,释放一个许可,以使其他等待许可的线程可以进行资源访问。
public class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem() throws InterruptedException {
available.acquire();
// 申请一个许可
// 同时只能有100个线程进入取得可用项,
// 超过100个则需要等待
return getNextAvailableItem();
}
public void putItem(Object x) {
// 将给定项放回池内,标记为未被使用
if (markAsUnused(x)) {
available.release();
// 新增了一个可用项,释放一个许可,请求资源的线程被激活一个
}
}
// 仅作示例参考,非真实数据
protected Object[] items = new Object[MAX_AVAILABLE]; // 用于对象池复用对象
protected boolean[] used = new boolean[MAX_AVAILABLE]; // 标记作用
protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null;
}
protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else {
return false;
}
}
}
return false;
}
}
ThreadLocal线程局部变量
public class TestNum {
// 通过匿名内部类覆盖ThreadLocal的initialValue()方法,指定初始值
private static ThreadLocal seqNum = new ThreadLocal() {
public Integer initialValue() {
return 0;
}
};
// 获取下一个序列值
public int getNextNum() {
seqNum.set(seqNum.get() + 1);
return seqNum.get();
}public static void main(String[] args) {
TestNum sn = new TestNum();
//3个线程共享sn,各自产生序列号
TestClient t1 = new TestClient(sn);
TestClient t2 = new TestClient(sn);
TestClient t3 = new TestClient(sn);
t1.start();
t2.start();
t3.start();
}
private static class TestClient extends Thread {
private TestNum sn;
public TestClient(TestNum sn) {
this.sn = sn;
}
public void run() {
for (int i = 0; i < 3; i++) {
// 每个线程打出3个序列值
System.out.println("thread[" + Thread.currentThread().getName() + "] --> sn["
+ sn.getNextNum() + "]");
}
}
}
}
thread[Thread-0] –> sn[1]
thread[Thread-1] –> sn[1]
thread[Thread-2] –> sn[1]
thread[Thread-1] –> sn[2]
thread[Thread-0] –> sn[2]
thread[Thread-1] –> sn[3]
thread[Thread-2] –> sn[2]
thread[Thread-0] –> sn[3]
thread[Thread-2] –> sn[3]
锁的性能和优化
public synchronized void syncMehod(){
beforeMethod();
mutexMethod();
afterMethod();
}
public void syncMehod(){
beforeMethod();
synchronized(this){
mutexMethod();
}
afterMethod();
}
public class LinkedBlockingQueue extends AbstractQueue
implements BlockingQueue, java.io.Serializable {
/* Lock held by take, poll, etc /
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly(); // 不能有两个线程同时读取数据
try {
while (count.get() == 0) { // 如果当前没有可用数据,一直等待put()的通知
notEmpty.await();
}
x = dequeue(); // 从头部移除一项
c = count.getAndDecrement(); // size减1
if (c > 1)
notEmpty.signal(); // 通知其他take()操作
} finally {
takeLock.unlock(); // 释放锁
}
if (c == capacity)
signalNotFull(); // 通知put()操作,已有空余空间
return x;
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly(); // 不能有两个线程同时put数据
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) { // 队列满了 则等待
notFull.await();
}
enqueue(node); // 加入队列
c = count.getAndIncrement();// size加1
if (c + 1 < capacity)
notFull.signal(); // 如果有足够空间,通知其他线程
} finally {
putLock.unlock();// 释放锁
}
if (c == 0)
signalNotEmpty();// 插入成功后,通知take()操作读取数据
}
// other code
}
public void syncMehod(){
synchronized(lock){
method1();
}
synchronized(lock){
method2();
}
}
public void syncMehod(){
synchronized(lock){
method1();
method2();
}
}
无锁的并行计算
public class TestAtomic {
private static final int MAX_THREADS = 3;
private static final int TASK_COUNT = 3;
private static final int TARGET_COUNT = 100 * 10000;
private AtomicInteger acount = new AtomicInteger(0);
private int count = 0;
synchronized int inc() {
return ++count;
}
synchronized int getCount() {
return count;
}
public class SyncThread implements Runnable {
String name;
long startTime;
TestAtomic out;
public SyncThread(TestAtomic o, long startTime) {
this.out = o;
this.startTime = startTime;
}
@Override
public void run() {
int v = out.inc();
while (v < TARGET_COUNT) {
v = out.inc();
}
long endTime = System.currentTimeMillis();
System.out.println("SyncThread spend:" + (endTime - startTime) + "ms" + ", v=" + v);
}
}
public class AtomicThread implements Runnable {
String name;
long startTime;
public AtomicThread(long startTime) {
this.startTime = startTime;
}
@Override
public void run() {
int v = acount.incrementAndGet();
while (v < TARGET_COUNT) {
v = acount.incrementAndGet();
}
long endTime = System.currentTimeMillis();
System.out.println("AtomicThread spend:" + (endTime - startTime) + "ms" + ", v=" + v);
}
}
@Test
public void testSync() throws InterruptedException {
ExecutorService exe = Executors.newFixedThreadPool(MAX_THREADS);
long startTime = System.currentTimeMillis();
SyncThread sync = new SyncThread(this, startTime);
for (int i = 0; i < TASK_COUNT; i++) {
exe.submit(sync);
}
Thread.sleep(10000);
}
@Test
public void testAtomic() throws InterruptedException {
ExecutorService exe = Executors.newFixedThreadPool(MAX_THREADS);
long startTime = System.currentTimeMillis();
AtomicThread atomic = new AtomicThread(startTime);
for (int i = 0; i < TASK_COUNT; i++) {
exe.submit(atomic);
}
Thread.sleep(10000);
}
}
testSync():
SyncThread spend:201ms, v=1000002
SyncThread spend:201ms, v=1000000
SyncThread spend:201ms, v=1000001
testAtomic():
AtomicThread spend:43ms, v=1000000
AtomicThread spend:44ms, v=1000001
AtomicThread spend:46ms, v=1000002
结束语
评论