数据库异常导致eureka预设问题排查
来源:SegmentFault 思否社区
作者:无名
基于
spring-cloud-Greenwich.RELEASE
spring-boot-2.1.3.RELEASE
spring-boot-starter-actuator-2.1.3.RELEASE
Spring-cloud-netflix-eureka-client-2.1.0.RELEASE
背景
线上请求项目接口,spring-cloud-gateway返回404,排查发现是gateway无法从eureka-server获取到项目有效的注册信息。同时当时由于网络问题,项目无法连上数据库。但是这次出现的网络问题,可能影响到项目与数据库的连接,并不影响项目与eureka-server的连接。
通过日志,看到项目一直在对数据库做健康检测,并且因为无法连上而一直有异常日志,同时看到了Eureka下线通知的日志Saw local status change event DOWN,而这两个日志都是在同一个线程里打印的,线程串联DiscoveryClient-InstanceInfoReplicator-0,既然是同一个线程,那说明是两者之间必然有关联。
那是什么原因导致eureka-server没有项目的注册信息?这个要从Eureka-Client的健康检测说起。
健康监测
按照常规,要了解原理,就从阅读二进制入手。
Eureke-client的初始化基本上都是在DiscoveryClient类内完成的,包括启动健康监测定时任务。
public class DiscoveryClient implements EurekaClient {private void initScheduledTasks() {……if (clientConfig.shouldRegisterWithEureka()) {……// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSize……instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());} else {logger.info("Not registering with Eureka server per configuration");}}}
在InstanceInfoReplicator内接通本节能检查系统健康并刷新当前Eureka-client节点状态。
class InstanceInfoReplicator implements Runnable {public void run() {try {discoveryClient.refreshInstanceInfo();……} catch (Throwable t) {logger.warn("There was a problem with the instance info replicator", t);} finally {Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}}}public class DiscoveryClient implements EurekaClient {void refreshInstanceInfo() {……InstanceStatus status;try {status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());} catch (Exception e) {logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);status = InstanceStatus.DOWN;}if (null != status) {applicationInfoManager.setInstanceStatus(status);}}}
这里通过HealthCheckHandler获取instanceInfo的status并修改节点状态和下发事件通知,如果获取到的status是DOWN,那这时候事件监听器就打印了我们在开头看到的日志,并且上报给Eureka-server的中断状态也是DOWN,最终导致此问题的出现:网关无法从Eureka-server获取到状态为UP的路由器。
public class ApplicationInfoManager {public synchronized void setInstanceStatus(InstanceStatus status) {InstanceStatus next = instanceStatusMapper.map(status);if (next == null) {return;}InstanceStatus prev = instanceInfo.setStatus(next);if (prev != null) {for (StatusChangeListener listener : listeners.values()) {try {listener.notify(new StatusChangeEvent(prev, next));} catch (Exception e) {logger.warn("failed to notify listener: {}", listener.getId(), e);}}}}}public class DiscoveryClient implements EurekaClient {private void initScheduledTasks() {……if (clientConfig.shouldRegisterWithEureka()) {……statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {@Overridepublic String getId() {return "statusChangeListener";}@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {// log at warn level if DOWN was involvedlogger.warn("Saw local status change event {}", statusChangeEvent);} else {logger.info("Saw local status change event {}", statusChangeEvent);}instanceInfoReplicator.onDemandUpdate();}};} else {logger.info("Not registering with Eureka server per configuration");}}}
这里的重点就是DiscoveryClient的
getHealthCheckHandler().getStatus(instanceInfo.getStatus())是怎么获取到值的?
getHealthCheckHandler返回的是EurekaHealthCheckHandler,继续跟进原始码进入到EurekaHealthCheckHandler类。
public class EurekaHealthCheckHandler implements HealthCheckHandler, ApplicationContextAware, InitializingBean {private final CompositeHealthIndicator healthIndicator;@Overridepublic void afterPropertiesSet() throws Exception {final MaphealthIndicators = applicationContext.getBeansOfType(HealthIndicator.class); for (Map.Entryentry : healthIndicators.entrySet()) { //ignore EurekaHealthIndicator and flatten the rest of the composite//otherwise there is a never ending cycle of down. See gh-643if (entry.getValue() instanceof DiscoveryCompositeHealthIndicator) {DiscoveryCompositeHealthIndicator indicator = (DiscoveryCompositeHealthIndicator) entry.getValue();for (DiscoveryCompositeHealthIndicator.Holder holder : indicator.getHealthIndicators()) {if (!(holder.getDelegate() instanceof EurekaHealthIndicator)) {healthIndicator.addHealthIndicator(holder.getDelegate().getName(), holder);}}}else {healthIndicator.addHealthIndicator(entry.getKey(), entry.getValue());}}}}
在afterPropertiesSet方法内部通过applicationContext.getBeansOfType获取到所有的健康检测类HealthIndicator。
注:applicationContext.getBeansOfType方法是通过遍历BeanDefinition获取所有beanName,然后遍历beanName,确定如果当前beanName未创建实例替换创建了对应的Bean对象实例。则会applicationContext.getBeansOfType确保将指定类型的所有的Bean对象都创造好。
public class EurekaHealthCheckHandler implements HealthCheckHandler, ApplicationContextAware, InitializingBean {public InstanceStatus getStatus(InstanceStatus instanceStatus) {return getHealthStatus();}protected InstanceStatus getHealthStatus() {final Status status = getHealthIndicator().health().getStatus();return mapToInstanceStatus(status);}protected CompositeHealthIndicator getHealthIndicator() {return healthIndicator;}}
调用CompositeHealthIndicator的health方法获取状态,从前面的afterPropertiesSet方法可以看到,CompositeHealthIndicator是一个HealthIndicator合集。
public class CompositeHealthIndicator implements HealthIndicator {public void addHealthIndicator(String name, HealthIndicator indicator) {this.registry.register(name, indicator);}@Overridepublic Health health() {Maphealths = new LinkedHashMap<>(); for (Map.Entryentry : this.registry.getAll() .entrySet()) {healths.put(entry.getKey(), entry.getValue().health());}return this.aggregator.aggregate(healths);}}public class OrderedHealthAggregator extends AbstractHealthAggregator {public OrderedHealthAggregator() {setStatusOrder(Status.DOWN, Status.OUT_OF_SERVICE, Status.UP, Status.UNKNOWN);}public void setStatusOrder(Status... statusOrder) {String[] order = new String[statusOrder.length];for (int i = 0; i < statusOrder.length; i++) {order[i] = statusOrder[i].getCode();}setStatusOrder(Arrays.asList(order));}@Overridepublic final Health aggregate(Maphealths) { ListstatusCandidates = healths.values().stream().map(Health::getStatus) .collect(Collectors.toList());Status status = aggregateStatus(statusCandidates);Mapdetails = aggregateDetails(healths); return new Health.Builder(status, details).build();}protected Status aggregateStatus(Listcandidates) { // Only sort those status instances that we know aboutListfilteredCandidates = new ArrayList<>(); for (Status candidate : candidates) {if (this.statusOrder.contains(candidate.getCode())) {filteredCandidates.add(candidate);}}// If no status is given return UNKNOWNif (filteredCandidates.isEmpty()) {return Status.UNKNOWN;}// Sort given Status instances by configured orderfilteredCandidates.sort(new StatusComparator(this.statusOrder));return filteredCandidates.get(0);}private class StatusComparator implements Comparator{ private final ListstatusOrder; StatusComparator(ListstatusOrder) { this.statusOrder = statusOrder;}@Overridepublic int compare(Status s1, Status s2) {int i1 = this.statusOrder.indexOf(s1.getCode());int i2 = this.statusOrder.indexOf(s2.getCode());return (i1 < i2) ? -1 : (i1 != i2) ? 1 : s1.getCode().compareTo(s2.getCode());}}}
CompositeHealthIndicator的health是遍历所有HealthIndicator,调用HealthIndicator的健康监测health方法获取status。再将status根据DOWN->OUT_OF_SERVICE->UP->UNKNOWN的顺序排序并获取第一个状态(如果有例程状态为DOWN,那获取的结果就是DOWN)。
public class EurekaHealthCheckHandler implements HealthCheckHandler, ApplicationContextAware, InitializingBean {private static final MapSTATUS_MAPPING = new HashMap() {{ put(Status.UNKNOWN, InstanceStatus.UNKNOWN);put(Status.OUT_OF_SERVICE, InstanceStatus.OUT_OF_SERVICE);put(Status.DOWN, InstanceStatus.DOWN);put(Status.UP, InstanceStatus.UP);}};protected InstanceStatus mapToInstanceStatus(Status status) {if (!STATUS_MAPPING.containsKey(status)) {return InstanceStatus.UNKNOWN;}return STATUS_MAPPING.get(status);}}
最后将通用状态STATUS映射成Eureka的例程实例状态InstanceStatus,并修改自身的状态。
总结
Eureka-client通过接通本节能所有的HealthIndicator的health方法对应电子杂志的健康检查状态,有如果HealthIndicator检测查询查询结果为DOWN,那Eureka-client就会判定当前服务有问题,是不可用的,就会将自身状态设置为DOWN,并上报给Eureka-server。Eureka-server收到信息之后将该节点状态标识为DOWN,这样其他服务就无法从Eureka-server获取到该计数器。
本次事故的原因就是因为DataSourceHealthIndicator检查的结果是DOWN,导致Eureka-client的状态也有所改变DOWN。
扩展
- 如果项目有某个重要的功能,一旦这个功能出问题就希望能将当前例程下线,那就可以添加自定义HealthIndicator类,并在health方法检查改功能是否正常。 
- 可以通过接口+HealthIndicator实现控制服务上下线: 
@RestController@RequestMapping("/healthIndicator")public class MyHealthIndicator implements HealthIndicator {private boolean up;@GetMapping("setUpVal/{up}")public void setUpVal(@PathVariable("up") boolean up) {this.up = up;}@Overridepublic Health health() {if (up) {return Health.up().build();}return Health.down().build();}public MyHealthIndicator setUp(boolean up) {this.up = up;return this;}}
以上可以通过调用接口/healthIndicator/setUpVal/false来手动下线当前服务中断。
点击左下角阅读原文,到 SegmentFault 思否社区 和文章作者展开更多互动和交流。
- END -

