【123期】一文搞定那些难缠的并发面试题
阅读本文大概需要 16 分钟。
1、Object 的 wait()和notify() 方法
调用 wait()方法后,当前线程会进入等待状态,直到其他线程调用notify()或notifyAll() 来唤醒。
调用 notify() 方法后,可以唤醒正在等待的单一线程。
2、并发特性 - 原子性、有序性、可见性
原子性:即一个操作或者多个操作 要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行。
可见性:指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。
有序性:即程序执行的顺序按照代码的先后顺序执行,不进行指令重排列。
3、synchronized 实现原理?
普通同步方法,锁是当前实例对象
静态同步方法,锁是当前类的 class 对象
同步方法块,锁是括号里面的对象
4、volatile 的实现原理?
volatile可见性:对一个volatile 的读,总可以看到对这个变量最终的写。
volatile 原子性:volatile对单个读 / 写具有原子性(32 位 Long、Double),但是复合操作除外,例如i++ 。
JVM 底层采用“内存屏障”来实现 volatile 语义,防止指令重排序。
volatile 经常用于两个两个场景:状态标记变量、Double Check 。
5、Java 内存模型(JMM)
一方面,要为程序员提供足够强的内存可见性保证。
另一方面,对编译器和处理器的限制要尽可能地放松。JMM 对程序员屏蔽了 CPU 以及 OS 内存的使用问题,能够使程序在不同的 CPU 和 OS 内存上都能够达到预期的效果。
在并发编程模式中,势必会遇到上面三个概念:
原子性:一个操作或者多个操作要么全部执行要么全部不执行。
可见性:当多个线程同时访问一个共享变量时,如果其中某个线程更改了该共享变量,其他线程应该可以立刻看到这个改变。
有序性:程序的执行要按照代码的先后顺序执行。
通过 volatile、synchronized、final、concurrent 包等 实现。
6、有关队列 AQS 队列同步器
如果当前线程获取同步状态失败(锁)时,AQS 则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程
当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。
AQS 内部维护的是** CLH 双向同步队列**
7、锁的特性
8、ReentrantLock 锁
ReentrantLock 实现 Lock 接口,基于内部的 Sync 实现。
Sync 实现 AQS ,提供了 FairSync 和 NonFairSync 两种实现。
Condition
9、ReentrantReadWriteLock
在同一时间,可以允许多个读线程同时访问。
但是,在写线程访问时,所有读线程和写线程都会被阻塞。
公平性:支持公平性和非公平性。
重入性:支持重入。读写锁最多支持 65535 个递归写入锁和 65535 个递归读取锁。
锁降级:遵循获取写锁,再获取读锁,最后释放写锁的次序,如此写锁能够降级成为读锁。
ReentrantReadWriteLock 实现 ReadWriteLock 接口,可重入的读写锁实现类。
10、Synchronized 和 Lock 的区别
Lock 是一个接口,而 synchronized 是 Java 中的关键字,synchronized 是内置的语言实现;
synchronized 在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而 Lock 在发生异常时,如果没有主动通过 unLock() 去释放锁,则很可能造成死锁现象,因此使用 Lock 时需要在 finally 块中释放锁;
Lock 可以让等待锁的线程响应中断,而 synchronized 却不行,使用 synchronized 时,- 等待的线程会一直等待下去,不能够响应中断;
通过 Lock 可以知道有没有成功获取锁,而 synchronized 却无法办到。
Lock 可以提高多个线程进行读操作的效率。
与 synchronized 相比,ReentrantLock 提供了更多,更加全面的功能,具备更强的扩展性。例如:时间锁等候,可中断锁等候,锁投票。
ReentrantLock 还提供了条件 Condition ,对线程的等待、唤醒操作更加详细和灵活,所以在多个条件变量和高度竞争锁的地方,ReentrantLock 更加适合(以后会阐述 Condition)。
ReentrantLock 提供了可轮询的锁请求。它会尝试着去获取锁,如果成功则继续,否则可以等到下次运行时处理,而 synchronized则一旦进入锁请求要么成功要么阻塞,所以相比synchronized 而言,ReentrantLock 会不容易产生死锁些。
ReentrantLock 支持更加灵活的同步代码块,但是使用 synchronized时,只能在同一个synchronized块结构中获取和释放。注意,ReentrantLock 的锁释放一定要在finally 中处理,否则可能会产生严重的后果。
ReentrantLock 支持中断处理,且性能较 synchronized 会好些。
11、Java 中线程同步的方式
sychronized 同步方法或代码块
volatile
Lock
ThreadLocal
阻塞队列(LinkedBlockingQueue)
使用原子变量(java.util.concurrent.atomic)
变量的不可变性
12、CAS 是一种什么样的同步机制?多线程下为什么不使用 int 而使用 AtomicInteger?
if (this.value == A) {
this.value = B
return true;
} else {
return false;
}
// AtomicInteger.java
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
Unsafe 是 CAS 的核心类,Java 无法直接访问底层操作系统,而是通过本地 native` 方法来访问。不过尽管如此,JVM 还是开了一个后门:Unsafe ,它提供了硬件级别的原子操作。
valueOffset 为变量值在内存中的偏移地址,Unsafe 就是通过偏移地址来得到数据的原值的。
value当前值,使用volatile 修饰,保证多线程环境下看见的是同一个。
// AtomicInteger.java
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}
// Unsafe.java
// compareAndSwapInt(var1, var2, var5, var5 + var4)其实换成 compareAndSwapInt(obj, offset, expect, update)比较清楚,意思就是如果 obj 内的 value 和 expect 相等,就证明没有其他线程改变过这个变量,那么就更新它为 update,如果这一步的 CAS 没有成功,那就采用自旋的方式继续进行 CAS 操作,取出乍一看这也是两个步骤了啊,其实在 JNI 里是借助于一个 CPU 指令完成的。所以还是原子操作。
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
// 该方法为本地方法,有四个参数,分别代表:对象、对象的地址、预期值、修改值
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
13、HashMap 是不是线程安全?如何体现?如何变得安全?
如何变得安全:
Hashtable:通过 synchronized 来保证线程安全的,独占锁,悲观策略。吞吐量较低,性能较为低下
SynchronizedHashMap :通过 Collections.synchronizedMap() 方法对 HashMap 进行包装,返回一个 SynchronizedHashMap 对象,在源码中 SynchronizedHashMap 也是用过 synchronized 来保证线程安全的。但是实现方式和 Hashtable 略有不同(前者是 synchronized 方法,后者是通过 synchronized 对互斥变量加锁实现)
ConcurrentHashMap:JUC 中的线程安全容器,高效并发。ConcurrentHashMap 的 key、value 都不允许为 null。
14、ConcurrentHashMap 的实现方式?
table 中存放 Node 节点数据,默认 Node 数据大小为 16,扩容大小总是 2^N。
为了保证可见性,Node 节点中的 val 和 next 节点都用 volatile 修饰。
当链表长度大于 8 时,会转换成红黑树,节点会被包装成 TreeNode放在TreeBin 中。
put():1. 计算键所对应的 hash 值;2. 如果哈希表还未初始化,调用 initTable() 初始化,否则在 table 中找到 index 位置,并通过 CAS 添加节点。如果链表节点数目超过 8,则将链表转换为红黑树。如果节点总数超过,则进行扩容操作。
get():无需加锁,直接根据 key 的 hash 值遍历 node。
15、CountDownLatch 和 CyclicBarrier 的区别?并发工具类
两者区别:
CountDownLatch 的作用是允许 1 或 N 个线程等待其他线程完成执行;而 CyclicBarrier 则是允许 N 个线程相互等待。
CountDownLatch 的计数器无法被重置;CyclicBarrier 的计数器可以被重置后使用,因此它被称为是循环的 barrier 。
如有必要,在许可可用前会阻塞每一个 acquire,然后再获取该许可。
每个 release 添加一个许可,从而可能释放一个正在阻塞的获取者。
16、怎么控制线程,尽可能减少上下文切换?
无锁并发编程。多线程竞争锁时,会引起上下文切换,所以多线程处理数据时,可以使用一些方法来避免使用锁。如将数据的ID按照Hash算法取模分段,不同的线程处理不同段的数据。
CAS算法。Java的Atomic包使用CAS算法来更新数据,而不需要加锁。
使用最少线程。避免创建不需要的线程,比如任务很少,但是创建了很多线程来处理,这样会造成大量线程都处于等待状态。
协程。在单线程里实现多任务的调度,并在单线程里维持多个任务间的切换。
17、什么是乐观锁和悲观锁?
18、阻塞队列
抛出异常:add(e) 、remove()、element()
返回特殊值:offer(e) 、pool()、peek()
阻塞:put(e) 、take()
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的无界阻塞队列。
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
ArrayBlockingQueue,一个由数组实现的有界阻塞队列。该队列采用 FIFO 的原则对元素进行排序添加的。内部使用可重入锁 ReentrantLock + Condition 来完成多线程环境的并发操作。
19、线程池
RUNNING:接收并处理任务。
SHUTDOWN:不接收但处理现有任务。
STOP:不接收也不处理任务,同时终端当前处理的任务。
TIDYING:所有任务终止,线程池会变为 TIDYING 状态。当线程池变为 TIDYING 状态时,会执行钩子函数 terminated()。
TERMINATED:线程池彻底终止的状态。
内部变量** ctl **定义为 AtomicInteger ,记录了“线程池中的任务数量”和“线程池的状态”两个信息。共 32 位,其中高 3 位表示”线程池状态”,低 29 位表示”线程池中的任务数量”。
线程池创建参数
corePoolSize
maximumPoolSize
keepAliveTime
unit
workQueue
ArrayBlockingQueue:基于数组结构的有界阻塞队列,FIFO。
LinkedBlockingQueue:基于链表结构的有界阻塞队列,FIFO。
SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作,反之亦然。
PriorityBlockingQueue:具有优先界别的阻塞队列。
threadFactory
handler
线程池提供了四种拒绝策略:
AbortPolicy:直接抛出异常,默认策略;
CallerRunsPolicy:用调用者所在的线程来执行任务;
DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
DiscardPolicy:直接丢弃任务;
当然我们也可以实现自己的拒绝策略,例如记录日志等等,实现 RejectedExecutionHandler 接口即可。
当添加新的任务到线程池时:
线程数量未达到 corePoolSize,则新建一个线程(核心线程)执行任务
线程数量达到了 corePoolSize,则将任务移入队列等待
队列已满,新建线程(非核心线程)执行任务
队列已满,总线程数又达到了 maximumPoolSize,就会由 handler 的拒绝策略来处理
线程池可通过 Executor 框架来进行创建:
FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue()));
}
CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
20、为什么要使用线程池?
创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处理效率。线程池缓存线程,可用已有的闲置线程来执行新任务(keepAliveTime)
线程并发数量过多,抢占系统资源从而导致阻塞。运用线程池能有效的控制线程最大并发数,避免以上的问题。
对线程进行一些简单的管理(延时执行、定时循环执行的策略等)
21、生产者消费者问题
public class ProducerConsumer {
public static void main(String[] args) {
ProducerConsumer main = new ProducerConsumer();
Queuebuffer = new LinkedList<>();
int maxSize = 5;
new Thread(main.new Producer(buffer, maxSize), "Producer1").start();
new Thread(main.new Consumer(buffer, maxSize), "Comsumer1").start();
new Thread(main.new Consumer(buffer, maxSize), "Comsumer2").start();
}
class Producer implements Runnable {
private Queuequeue;
private int maxSize;
Producer(Queuequeue, int maxSize) {
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
while (true) {
synchronized (queue) {
while (queue.size() == maxSize) {
try {
System.out.println("Queue is full");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Random random = new Random();
int i = random.nextInt();
System.out.println(Thread.currentThread().getName() + " Producing value : " + i);
queue.add(i);
queue.notifyAll();
}
}
}
}
class Consumer implements Runnable {
private Queuequeue;
private int maxSize;
public Consumer(Queuequeue, int maxSize) {
super();
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
while (true) {
synchronized (queue) {
while (queue.isEmpty()) {
try {
System.out.println("Queue is empty");
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " Consuming value : " + queue.remove());
queue.notifyAll();
}
}
}
}
}
推荐阅读:
微信扫描二维码,关注我的公众号
朕已阅