ThreadLocal 不香了?ScopedValue才是王道?
共 21569字,需浏览 44分钟
·
2024-07-28 10:15
阅读本文大概需要 15 分钟。
来自:网络,侵删
remove()
会造成严重的内存泄露问题。在JDK 20 Early-Access Build 28版本中便针对ThreadLocal类重新设计了一个ScopedValue类。
ThreadLocal
基本概念
应用案例
@Service
public class ShoppingCartService {
private ThreadLocal<ShoppingCart> cartHolder = new ThreadLocal<>();
public ShoppingCart getCurrentCart() {
ShoppingCart cart = cartHolder.get();
if (cart == null) {
cart = new ShoppingCart();
cartHolder.set(cart);
}
return cart;
}
public void checkout() {
// 获取当前购物车
ShoppingCart cart = getCurrentCart();
// 执行结账操作
// 清除当前线程中购物车的信息,防止内存泄露
cartHolder.remove();
}
}
// 购物车类
class ShoppingCart {
private List<Product> products = new ArrayList<>();
public void addProduct(Product product) {
products.add(product);
}
public List<Product> getProducts() {
return products;
}
}
ShoppingCartService
是一个 Spring Bean,用来管理购物车信息。在这个 Bean 里,使用了 ThreadLocal<ShoppingCart>
来保存每个线程的购物车信息。getCurrentCart
方法首先会从 ThreadLocal
中获取购物车信息,如果当前线程没有对应的购物车信息,那么就创建一个新的购物车,并保存到 ThreadLocal
中。checkout
方法用来执行结账操作,结账完成后,需要通过cartHolder.remove();
清除当前线程中的购物车信息,以防止内存泄露。这样,即使在多线程环境下,每个线程都有自己独立的购物车信息,互不影响。这就是 ThreadLocal
在解决 Spring Bean 线程安全问题上的一个应用场景。
@Aspect
@Component
public class UserConsistencyAspect {
// 每个UserVo启用线程隔离,在进入切面后开始创建,在业务逻辑中用完就被GC回收
private static final ThreadLocal<UserVo> userHolder = new ThreadLocal<>();
@Pointcut("@annotation(org.nozomi.common.annotation.GetUser)")
public void userAuthPoint() {}
@Around("userAuthPoint()")
public Object injectUserFromRequest(ProceedingJoinPoint joinPoint) throws Throwable {
Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
UserVo operator = (UserVo) authentication.getPrincipal();
if (operator == null) {
return Response.fail("用户不存在");
}
userHolder.set(operator);
return joinPoint.proceed();
}
/**
* 取出当前线程中的UserVo对象,这些UserVo是跟随http创建的线程而隔离的
*
* @return 当前线程的UserVo
*/
public static UserVo getUser() {
return userHolder.get();
}
}
UserConsistencyAspect .getUser()
方法就可以获取到这个http session中的User对象了。
@Service
public class ProductService {
private final ThreadLocal<Session> sessionThreadLocal = new ThreadLocal<>();
public Product getProductById(String id) {
Session session = getSession();
return session.get(Product.class, id);
}
public void updateProduct(Product product) {
Session session = getSession();
session.update(product);
}
private Session getSession() {
Session session = sessionThreadLocal.get();
if (session == null) {
session = sessionFactory.openSession();
sessionThreadLocal.set(session);
}
return session;
}
public void closeSession() {
Session session = sessionThreadLocal.get();
if (session != null) {
session.close();
sessionThreadLocal.remove();
}
}
}
getSession()
方法时,都会从ThreadLocal中获取到属于自己的Session。但是事实上这些session的处理已经在mybatis或hibernate中都已经通过ThreadLocal处理好了不需要开发者再在业务中对session进行隔离。这里的例子主要是为了解释 ThreadLocal 是如何工作的,并不是实际开发中推荐的做法。
StructuredTaskScope
Callable
的形式向它提交任务,我们将得到一个future
返回,并且这个callable
将在由作用域Scope
为我们创建的虚线程种执行。这很像Executor
。但二者之间也有很大的区别。
public static Weather readWeather() throws Exception {
// try-with-resource
try(var scope = new StructuredTaskScope<Weather>()) {
Future<Weather> future = scope.fork(Weather::readWeatherFrom);
scope.join();
return future.resultNow();
}
}
try-with-resource
模式。通过fork()
方法fork一个Callable类型的任务,fork()
方法返回一个Future对象,我们调用join()方法阻塞调用,它将阻塞当前线程,直到所有提交(frok)给StructuredTaskScope的任务都完成。最后调用Future的resultNow()
获取结果并返回。resultNow()
将抛出异常,如果我们在Future完成前调用它,所以我们要在join()
方法中调用并将其返回。
ScopedValue
基本概念
-
ThreadLocal变量是可变的,任何运行在当前线程中的代码都可以修改该变量的值,很容易产生一些难以调试的bug。 -
ThreadLocal变量的生命周期会很长。当使用ThreadLocal变量的 set
方法,为当前线程设置了值之后,这个值在线程的整个生命周期中都会保留,直到调用remove
方法来删除。但是绝大部分开发人员不会主动调用remove
来进行删除,这可能造成内存泄漏。 -
ThreadLocal变量可以被继承。如果一个子线程从父线程中继承ThreadLocal变量,那么该子线程需要独立存储父线程中的全部ThreadLocal变量,这会产生比较大的内存开销。
基本用法
jdk.incubator.concurrent
包中的ScopedValue
类来表示。使用ScopedValue的第一步是创建ScopedValue
对象,通过静态方法newInstance
来完成,ScopedValue对象一般声明为static final
。由于ScopedValue是孵化功能,要想使用需要在项目的第一级包目录的同级目录中创建一个java类module-info.java
来将其引入模块中:
module dioxide.cn.module {
requires jdk.incubator.concurrent;
}
VM Option
中启用预览功能--enable-preview
。下一步是指定ScopedValue
对象的值和作用域,通过静态方法where
来完成。where
方法有 3 个参数:
-
ScopedValue
对象 -
ScopedValue
对象所绑定的值 -
Runnable
或Callable
对象,表示ScopedValue
对象的作用域
Runnable
或Callable
对象执行过程中,其中的代码可以用ScopedValue
对象的get
方法获取到where
方法调用时绑定的值。这个作用域是动态的,取决于Runnable
或Callable
对象所调用的方法,以及这些方法所调用的其他方法。当Runnable
或Callable
对象执行完成之后,ScopedValue
对象会失去绑定,不能再通过get
方法获取值。在当前作用域中,ScopedValue
对象的值是不可变的,除非再次调用where
方法绑定新的值。这个时候会创建一个嵌套的作用域,新的值仅在嵌套的作用域中有效。使用作用域值有以下几个优势:
-
提高数据安全性:由于作用域值只能在当前范围内访问,因此可以避免数据泄露或被恶意修改。 -
提高数据效率:由于作用域值是不可变的,并且可以在线程之间共享,因此可以减少数据复制或同步的开销。 -
提高代码清晰度:由于作用域值只能在当前范围内访问,因此可以减少参数传递或全局变量的使用。
public class Main {
// 声明了一个静态的、最终的 ScopedValue<String> 实例
// ScopedValue 是一个支持在特定范围内(如任务或线程)中传递值的类
// 它的使用类似于 ThreadLocal,但更适合于结构化并发
private static final ScopedValue<String> VALUE = ScopedValue.newInstance();
public static void main(String[] args) throws Exception {
System.out.println(Arrays.toString(stringScope()));
}
public static Object[] stringScope() throws Exception {
return ScopedValue.where(VALUE, "value", () -> {
// 使用 try-with-resource 来绑定结构化并发的作用域
// 用于自动管理资源的生命周期,这是一个结构化任务范围
// 在这个范围内创建的所有子任务都将被视为范围的一部分
// 如果范围中的任何任务失败,所有其他任务都将被取消
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 使用了 scope.fork 来创建两个并行的任务
// 每个任务都在执行上下文中获取 VALUE 的值,并对其进行操作
Future<String> user = scope.fork(VALUE::get);
Future<Integer> order = scope.fork(() -> VALUE.get().length());
// join() 方法等待所有范围内的任务完成
// throwIfFailed() 方法会检查所有任务的结果,如果任何任务失败,则会抛出异常
scope.join().throwIfFailed();
// 在所有任务完成后,使用 resultNow() 方法获取每个任务的结果,并将结果放入一个对象数组中
return new Object[]{user.resultNow(), order.resultNow()};
}
});
}
}
ScopedValue
和结构化并发来创建并执行多个并行任务,并安全地传递和操作任务上下文中的值。
源码分析
A value that is set once and is then available for reading for a bounded period of execution by a thread. A ScopedValue allows for safely and efficiently sharing data for a bounded period of execution without passing the data as method arguments. ScopedValue defines the where(ScopedValue, Object, Runnable) method to set the value of a ScopedValue for the bouned period of execution by a thread of the runnable's run method. The unfolding execution of the methods executed by run defines a dynamic scope. The scoped value is bound while executing in the dynamic scope, it reverts to being unbound when the run method completes (normally or with an exception). Code executing in the dynamic scope uses the ScopedValue get method to read its value. Like a thread-local variable, a scoped value has multiple incarnations, one per thread. The particular incarnation that is used depends on which thread calls its methods.
ScopedValue
是一个对象,它被设置一次后,在执行期间由一个线程有限期地读取。ScopedValue
允许在有限的执行期间内在不将数据作为方法参数传递的情况下安全、有效地共享数据。ScopedValue
定义了 where(ScopedValue, Object, Runnable)
方法,这个方法在一个线程执行 runnable 的 run 方法的有限执行期间内设置 ScopedValue
的值。由 run 执行的方法展开执行定义了一个动态作用域。在动态作用域中执行时,作用域值是绑定的,当 run 方法完成时(正常或异常),它恢复到未绑定状态。在动态作用域中执行的代码使用 ScopedValue
的 get 方法来读取其值。与线程局部变量类似,作用域值有多个化身,每个线程一个。使用哪个化身取决于哪个线程调用其方法。ScopedValue
的一个典型用法是在 final 和 static 字段中声明。字段的可访问性将决定哪些组件可以绑定或读取其值。ScopedValue中有3个内部类,分别是Snapshot、Carrier、Cache,他们在ScopedValue中起着至关重要的角色。
Snapshot
An immutable map from ScopedValue to values. Unless otherwise specified, passing a null argument to a constructor or method in this class will cause a NullPointerException
to be thrown.
NullPointerException
异常。这个类的主要用途是为ScopedValue实例创建一个不可变的映射,这样在运行时,无论其它代码如何修改原始的ScopedValue实例,Snapshot中的值都不会发生变化。它为了提供一个安全的方式来在多线程环境下共享值。
Carrier
A mapping of scoped values, as keys, to values. A Carrier is used to accumlate mappings so that an operation (a Runnable
orCallable
) can be executed with all scoped values in the mapping bound to values. The following example runs an operation with k1 bound (or rebound) to v1, and k2 bound (or rebound) to v2.ScopedValue.where(k1, v1).where(k2, v2).run(() -> ... );
A Carrier is immutable and thread-safe. The where method returns a new Carrier object, it does not mutate an existing mapping. Unless otherwise specified, passing a null argument to a method in this class will cause aNullPointerException
to be thrown.
Runnable
或Callable
),在该操作中,映射中的所有scoped values都绑定到值。Carrier是不可变的,并且是线程安全的。where
方法返回一个新的Carrier对象,不会改变现有的映射。这是用于在ScopedValue实例和对应值之间创建和保持映射关系的工具,使得这些映射关系可以在执行操作时被一并应用。
Cache
A small fixed-size key-value cache. When a scoped value's get() method is invoked, we record the result of the lookup in this per-thread cache for fast access in future.
get()
方法时,我们在这个每线程缓存中记录查找的结果,以便在将来快速访问。这个类的主要作用是优化性能。通过缓存get()
方法的结果,可以避免在多次获取同一个ScopedValue的值时进行重复的查找操作。只有当ScopedValue的值被更改时,才需要更新缓存。
where()
where()
方法是ScopedValue类的核心方法与入口,它接收三个参数。当操作完成时(正常或出现异常),ScopedValue将在当前线程中恢复为未绑定状态,或恢复为先前绑定时的先前值。
graph TB
A("ScopedValue.where(key, value, op)")
A --> B("ScopedValue.Carrier.of(key, value)")
B --> C("ScopedValue.Carrier.where(key, value, prev)")
C --> D("返回ScopedValue.Carrier对象")
op
已经创建了一个StructuredTaskScope但没有关闭它,那么退出op会导致在动态范围内创建的每个StructuredTaskScope被关闭。这可能需要阻塞,直到所有子线程都完成了它们的子任务。关闭是按照创建它们的相反顺序完成的。
ScopedValue.where(key, value, op);
等价于使用ScopedValue.where(key, value).call(op);
public static <T, R> R where(ScopedValue<T> key,
T value,
Callable<? extends R> op) throws Exception {
return where(key, value).call(op);
}
Carrier.of(key, value);
方法
/*
* 返回由单个绑定组成的新集合
*/
static <T> Carrier of(ScopedValue<T> key, T value) {
return where(key, value, null);
}
/**
* 向该map添加绑定,返回一个新的 Carrier 实例
*/
private static final <T> Carrier where(ScopedValue<T> key, T value,
Carrier prev) {
return new Carrier(key, value, prev);
}
call()
graph TB
D("ScopedValue.Carrier")
D --> E("ScopedValue.Carrier.call(op)")
E -->|分支1| F("ScopedValue.Cache.invalidate()")
E -->|分支2| G("ScopedValue.Carrier.runWith(newSnapshot, op)")
G --> H("ScopedValueContainer.call(op)")
H --> I("ScopedValueContainer.callWithoutScope(op)")
I --> J("Callable.call()")
小结
推荐阅读:
程序员在线工具站:cxytools.com 推荐一个我自己写的工具站:http://cxytools.com,专为程序员设计,包括时间日期、JSON处理、SQL格式化、随机字符串生成、UUID生成、随机数生成、文本Hash...等功能,提升开发效率。
⬇戳阅读原文直达! 朕已阅