数据库异常导致eureka预设问题排查

SegmentFault

共 10214字,需浏览 21分钟

 ·

2020-08-28 04:38

来源:SegmentFault 思否社区

作者:无名 




基于


spring-cloud-Greenwich.RELEASE
spring-boot-2.1.3.RELEASE
spring-boot-starter-actuator-2.1.3.RELEASE
Spring-cloud-netflix-eureka-client-2.1.0.RELEASE





背景


线上请求项目接口,spring-cloud-gateway返回404,排查发现是gateway无法从eureka-server获取到项目有效的注册信息。同时当时由于网络问题,项目无法连上数据库。但是这次出现的网络问题,可能影响到项目与数据库的连接,并不影响项目与eureka-server的连接。


通过日志,看到项目一直在对数据库做健康检测,并且因为无法连上而一直有异常日志,同时看到了Eureka下线通知的日志Saw local status change event DOWN,而这两个日志都是在同一个线程里打印的,线程串联DiscoveryClient-InstanceInfoReplicator-0,既然是同一个线程,那说明是两者之间必然有关联。


那是什么原因导致eureka-server没有项目的注册信息?这个要从Eureka-Client的健康检测说起。





健康监测


按照常规,要了解原理,就从阅读二进制入手。


Eureke-client的初始化基本上都是在DiscoveryClient类内完成的,包括启动健康监测定时任务。


public class DiscoveryClient implements EurekaClient {private void initScheduledTasks() { ……if (clientConfig.shouldRegisterWithEureka()) { ……// InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator(this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSize …… instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }}


在InstanceInfoReplicator内接通本节能检查系统健康并刷新当前Eureka-client节点状态。


class InstanceInfoReplicator implements Runnable {public void run() {try { discoveryClient.refreshInstanceInfo(); …… } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }}
public class DiscoveryClient implements EurekaClient {void refreshInstanceInfo() { …… InstanceStatus status;try { status = getHealthCheckHandler().getStatus(instanceInfo.getStatus()); } catch (Exception e) { logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e); status = InstanceStatus.DOWN; }if (null != status) { applicationInfoManager.setInstanceStatus(status); } }}


这里通过HealthCheckHandler获取instanceInfo的status并修改节点状态和下发事件通知,如果获取到的status是DOWN,那这时候事件监听器就打印了我们在开头看到的日志,并且上报给Eureka-server的中断状态也是DOWN,最终导致此问题的出现:网关无法从Eureka-server获取到状态为UP的路由器。


public class ApplicationInfoManager {public synchronized void setInstanceStatus(InstanceStatus status) { InstanceStatus next = instanceStatusMapper.map(status);if (next == null) {return; } InstanceStatus prev = instanceInfo.setStatus(next);if (prev != null) {for (StatusChangeListener listener : listeners.values()) {try { listener.notify(new StatusChangeEvent(prev, next)); } catch (Exception e) { logger.warn("failed to notify listener: {}", listener.getId(), e); } } } }}
public class DiscoveryClient implements EurekaClient {private void initScheduledTasks() { ……if (clientConfig.shouldRegisterWithEureka()) { …… statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {@Overridepublic String getId() {return "statusChangeListener"; }@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {// log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; } else { logger.info("Not registering with Eureka server per configuration"); } }}


这里的重点就是DiscoveryClient的

getHealthCheckHandler().getStatus(instanceInfo.getStatus())是怎么获取到值的?


getHealthCheckHandler返回的是EurekaHealthCheckHandler,继续跟进原始码进入到EurekaHealthCheckHandler类。


public class EurekaHealthCheckHandler implements HealthCheckHandler, ApplicationContextAware, InitializingBean {private final CompositeHealthIndicator healthIndicator;@Overridepublic void afterPropertiesSet() throws Exception {final Map healthIndicators = applicationContext.getBeansOfType(HealthIndicator.class);for (Map.Entry entry : healthIndicators.entrySet()) {//ignore EurekaHealthIndicator and flatten the rest of the composite//otherwise there is a never ending cycle of down. See gh-643if (entry.getValue() instanceof DiscoveryCompositeHealthIndicator) { DiscoveryCompositeHealthIndicator indicator = (DiscoveryCompositeHealthIndicator) entry.getValue();for (DiscoveryCompositeHealthIndicator.Holder holder : indicator.getHealthIndicators()) {if (!(holder.getDelegate() instanceof EurekaHealthIndicator)) { healthIndicator.addHealthIndicator(holder.getDelegate().getName(), holder); } } }else { healthIndicator.addHealthIndicator(entry.getKey(), entry.getValue()); } } }}


在afterPropertiesSet方法内部通过applicationContext.getBeansOfType获取到所有的健康检测类HealthIndicator。


注:applicationContext.getBeansOfType方法是通过遍历BeanDefinition获取所有beanName,然后遍历beanName,确定如果当前beanName未创建实例替换创建了对应的Bean对象实例。则会applicationContext.getBeansOfType确保将指定类型的所有的Bean对象都创造好。


public class EurekaHealthCheckHandler implements HealthCheckHandler, ApplicationContextAware, InitializingBean {public InstanceStatus getStatus(InstanceStatus instanceStatus) {return getHealthStatus(); }
protected InstanceStatus getHealthStatus() {final Status status = getHealthIndicator().health().getStatus();return mapToInstanceStatus(status); }
protected CompositeHealthIndicator getHealthIndicator() {return healthIndicator; }}


调用CompositeHealthIndicator的health方法获取状态,从前面的afterPropertiesSet方法可以看到,CompositeHealthIndicator是一个HealthIndicator合集。


public class CompositeHealthIndicator implements HealthIndicator {public void addHealthIndicator(String name, HealthIndicator indicator) {this.registry.register(name, indicator); }@Overridepublic Health health() { Map healths = new LinkedHashMap<>();for (Map.Entry entry : this.registry.getAll() .entrySet()) { healths.put(entry.getKey(), entry.getValue().health()); }return this.aggregator.aggregate(healths); }}
public class OrderedHealthAggregator extends AbstractHealthAggregator {public OrderedHealthAggregator() { setStatusOrder(Status.DOWN, Status.OUT_OF_SERVICE, Status.UP, Status.UNKNOWN); }
public void setStatusOrder(Status... statusOrder) { String[] order = new String[statusOrder.length];for (int i = 0; i < statusOrder.length; i++) { order[i] = statusOrder[i].getCode(); } setStatusOrder(Arrays.asList(order)); }
@Overridepublic final Health aggregate(Map healths) { List statusCandidates = healths.values().stream().map(Health::getStatus) .collect(Collectors.toList()); Status status = aggregateStatus(statusCandidates); Map details = aggregateDetails(healths);return new Health.Builder(status, details).build(); }
protected Status aggregateStatus(List candidates) {// Only sort those status instances that we know about List filteredCandidates = new ArrayList<>();for (Status candidate : candidates) {if (this.statusOrder.contains(candidate.getCode())) { filteredCandidates.add(candidate); } }// If no status is given return UNKNOWNif (filteredCandidates.isEmpty()) {return Status.UNKNOWN; }// Sort given Status instances by configured order filteredCandidates.sort(new StatusComparator(this.statusOrder));return filteredCandidates.get(0); }
private class StatusComparator implements Comparator {private final List statusOrder; StatusComparator(List statusOrder) {this.statusOrder = statusOrder; }@Overridepublic int compare(Status s1, Status s2) {int i1 = this.statusOrder.indexOf(s1.getCode());int i2 = this.statusOrder.indexOf(s2.getCode());return (i1 < i2) ? -1 : (i1 != i2) ? 1 : s1.getCode().compareTo(s2.getCode()); } }}


CompositeHealthIndicator的health是遍历所有HealthIndicator,调用HealthIndicator的健康监测health方法获取status。再将status根据DOWN->OUT_OF_SERVICE->UP->UNKNOWN的顺序排序并获取第一个状态(如果有例程状态为DOWN,那获取的结果就是DOWN)。


public class EurekaHealthCheckHandler implements HealthCheckHandler, ApplicationContextAware, InitializingBean {private static final Map STATUS_MAPPING =new HashMap() {{ put(Status.UNKNOWN, InstanceStatus.UNKNOWN); put(Status.OUT_OF_SERVICE, InstanceStatus.OUT_OF_SERVICE); put(Status.DOWN, InstanceStatus.DOWN); put(Status.UP, InstanceStatus.UP); }};
protected InstanceStatus mapToInstanceStatus(Status status) {if (!STATUS_MAPPING.containsKey(status)) {return InstanceStatus.UNKNOWN; }return STATUS_MAPPING.get(status); }}


最后将通用状态STATUS映射成Eureka的例程实例状态InstanceStatus,并修改自身的状态。





总结


Eureka-client通过接通本节能所有的HealthIndicator的health方法对应电子杂志的健康检查状态,有如果HealthIndicator检测查询查询结果为DOWN,那Eureka-client就会判定当前服务有问题,是不可用的,就会将自身状态设置为DOWN,并上报给Eureka-server。Eureka-server收到信息之后将该节点状态标识为DOWN,这样其他服务就无法从Eureka-server获取到该计数器。


本次事故的原因就是因为DataSourceHealthIndicator检查的结果是DOWN,导致Eureka-client的状态也有所改变DOWN。





扩展


  1. 如果项目有某个重要的功能,一旦这个功能出问题就希望能将当前例程下线,那就可以添加自定义HealthIndicator类,并在health方法检查改功能是否正常。
  2. 可以通过接口+HealthIndicator实现控制服务上下线:


@RestController@RequestMapping("/healthIndicator")public class MyHealthIndicator implements HealthIndicator {private boolean up;
@GetMapping("setUpVal/{up}")public void setUpVal(@PathVariable("up") boolean up) {this.up = up; }
@Overridepublic Health health() {if (up) {return Health.up().build(); }return Health.down().build(); }
public MyHealthIndicator setUp(boolean up) {this.up = up;return this; }}


以上可以通过调用接口/healthIndicator/setUpVal/false来手动下线当前服务中断。





点击左下角阅读原文,到 SegmentFault 思否社区 和文章作者展开更多互动和交流。


- END -


浏览 19
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报