请求合并的三种方式,大大提高接口性能!
阅读本文大概需要 8 分钟。
前言
Hystrix Collapser
hystrix
"hystrixAspect" class="com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect" /> 
collapser
我们在需要合并的方法上添加 @HystrixCollapser 注解,在定义好的合并方法上添加 @HystrixCommand 注解; single 方法只能传入一个参数,多参数情况下需要自己包装一个参数类,而 batch 方法需要 java.util.List;single 方法返回 java.util.concurrent.Future, batch 方法返回java.util.List,且要保证返回的结果数量和传入的参数数量一致。
public class HystrixCollapserSample { 
@HystrixCollapser(batchMethod = "batch")
public Futuresingle(String input) {
return null; // single方法不会被执行到
}
public Listbatch(List {inputs) 
return inputs.stream().map(it -> Boolean.TRUE).collect(Collectors.toList());
}
}
源码实现
在 spring-boot 内注册切面类的 bean,里面包含 @HystrixCollapser 注解切面; 在方法执行时检测到方法被 HystrixCollapser 注解后,spring 调用 methodsAnnotatedWithHystrixCommand方法来执行 hystrix 代理;hystrix 获取一个 collapser 实例(在当前 scope 内检测不到即创建); hystrix 将当前请求的参数提交给 collapser, 由 collapser 存储在一个 concurrentHashMap (RequestArgumentType -> CollapsedRequest)内,此方法会创建一个 Observable 对象,并返回一个 观察此对象的 Future 给业务线程;collpser 在创建时会创建一个 timer 线程,定时消费存储的请求,timer 会将多个请求构造成一个合并后的请求,调用 batch 执行后将结果顺序映射到输出参数,并通知 Future 任务已完成。 
需要注意,由于需要等待 timer 执行真正的请求操作,collapser 会导致所有的请求的 cost 都会增加约 timerInterval/2 ms; 
配置
collapserKey,这个可以不用配置,hystrix 会默认使用当前方法名; batchMethod,配置 batch 方法名,我们一般会将 single 方法和 batch 方法定义在同一个类内,直接填方法名即可; scope,最坑的配置项,也是逼我读源码的元凶, com.netflix.hystrix.HystrixCollapser.Scope枚举类,有 REQUEST, GLOBAL 两种选项,在 scope 为 REQUEST 时,hystrix 会为每个请求都创建一个 collapser, 此时你会发现 batch 方法执行时,传入的请求数总为1。而且 REQUEST 项还是默认项,不明白这样请求合并还有什么意义;collapserProperties, 在此选项内我们可以配置 hystrixCommand 的通用配置;
maxRequestsInBatch, 构造批量请求时,使用的单个请求的最大数量; timerDelayInMilliseconds, 此选项配置 collapser 的 timer 线程多久会合并一次请求; requestCache.enabled, 配置提交请求时是否缓存; 
@HystrixCollapser( 
batchMethod = "batch",
collapserKey = "single",
scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
collapserProperties = {
@HystrixProperty(name = "maxRequestsInBatch", value = "100"),
@HystrixProperty(name = "timerDelayInMilliseconds", value = "1000"),
@HystrixProperty(name = "requestCache.enabled", value = "true")
})
BatchCollapser
设计
是一种 Collection,类似于 ArrayList 或 Queue,可以存重复元素且有顺序; 在多线程环境中能安全地将里面的数据全取出来进行消费,而不用自己实现锁。 
java.util.concurrent 包内的 LinkedBlockingDeque 刚好符合要求,首先它实现了 BlockingDeque 接口,多线程环境下的存取操作是安全的;此外,它还提供 drainTo(Collection super E> c, int maxElements)方法,可以将容器内 maxElements 个元素安全地取出来,放到 Collection c 中。实现
public class BatchCollapser<E> implements InitializingBean { 
private static final Logger logger = LoggerFactory.getLogger(BatchCollapser.class);
private static volatile Mapinstance = Maps.newConcurrentMap(); 
private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1);
private volatile LinkedBlockingDequebatchContainer = new LinkedBlockingDeque<>(); 
private Handler, Boolean> cleaner;
private long interval;
private int threshHold;
private BatchCollapser(Handler, Boolean> cleaner, int threshHold, long interval)
{
this.cleaner = cleaner;
this.threshHold = threshHold;
this.interval = interval;
}
@Override
public void afterPropertiesSet() throws Exception {
SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> {
try {
this.clean();
} catch (Exception e) {
logger.error("clean container exception", e);
}
}, 0, interval, TimeUnit.MILLISECONDS);
}
public void submit(E event) {
batchContainer.add(event);
if (batchContainer.size() >= threshHold) {
clean();
}
}
private void clean() {
ListtransferList = Lists.newArrayListWithExpectedSize(threshHold); 
batchContainer.drainTo(transferList, 100);
if (CollectionUtils.isEmpty(transferList)) {
return;
}
try {
cleaner.handle(transferList);
} catch (Exception e) {
logger.error("batch execute error, transferList:{}", transferList, e);
}
}
public staticBatchCollapser getInstance(Handler , Boolean> cleaner, int threshHold, long interval)
{
Class jobClass = cleaner.getClass();
if (instance.get(jobClass) == null) {
synchronized (BatchCollapser.class) {
if (instance.get(jobClass) == null) {
instance.put(jobClass, new BatchCollapser<>(cleaner, threshHold, interval));
}
}
}
return instance.get(jobClass);
}
}
由于合并器的全局性需求,需要将合并器实现为一个单例,另外为了提升它的通用性,内部使用使用 concurrentHashMap 和 double check 实现了一个简单的单例工厂。 为了区分不同用途的合并器,工厂需要传入一个实现了 Handler 的实例,通过实例的 class 来对请求进行分组存储。 由于 java.util.Timer的阻塞特性,一个 Timer 线程在阻塞时不会启动另一个同样的 Timer 线程,所以使用ScheduledExecutorService定时启动 Timer 线程。
ConcurrentHashMultiset
设计
ConcurrentHashMultiset,它不同于普通的 set 结构存储相同元素时直接覆盖原有元素,而是给每个元素保持一个计数 count, 插入重复时元素的 count 值加1。而且它在添加和删除时并不加锁也能保证线程安全,具体实现是通过一个 while(true) 循环尝试操作,直到操作够所需要的数量。ConcurrentHashMultiset 这种排重计数的特性,非常适合数据统计这种元素在短时间内重复率很高的场景,经过排重后的数量计算,可以大大降低下游服务器的压力,即使重复率不高,能用少量的内存空间换取系统可用性的提高,也是很划算的。实现
ConcurrentHashMultiset 进行请求合并与使用普通容器在整体结构上并无太大差异,具体类似于:if (ConcurrentHashMultiset.isEmpty()) { 
return;
}
ListtransferList = Lists.newArrayList(); 
ConcurrentHashMultiset.elementSet().forEach(request -> {
int count = ConcurrentHashMultiset.count(request);
if (count <= 0) {
return;
}
transferList.add(count == 1 ? request : new Request(request.getIncrement() * count));
ConcurrentHashMultiset.remove(request, count);
});
小结
hystrix collapser: 需要每个请求的结果,并且不在意每个请求的 cost 会增加;BatchCollapser: 不在意请求的结果,需要请求合并能在时间和数量两个维度上触发;ConcurrentHashMultiset:请求重复率很高的统计类场景;
BatchCollapser 和 ConcurrentHashMultiset 结合一下,在BatchCollapser 里使用 ConcurrentHashMultiset 作为容器,这样就可以结合两者的优势了。推荐阅读:
内容包含Java基础、JavaWeb、MySQL性能优化、JVM、锁、百万并发、消息队列、高性能缓存、反射、Spring全家桶原理、微服务、Zookeeper、数据结构、限流熔断降级......等技术栈!
⬇戳阅读原文领取!                                       朕已阅 
评论

