数据库异常导致eureka预设问题排查
共 10214字,需浏览 21分钟
·
2020-08-28 04:38
来源:SegmentFault 思否社区
作者:无名
基于
spring-cloud-Greenwich.RELEASE
spring-boot-2.1.3.RELEASE
spring-boot-starter-actuator-2.1.3.RELEASE
Spring-cloud-netflix-eureka-client-2.1.0.RELEASE
背景
线上请求项目接口,spring-cloud-gateway返回404,排查发现是gateway无法从eureka-server获取到项目有效的注册信息。同时当时由于网络问题,项目无法连上数据库。但是这次出现的网络问题,可能影响到项目与数据库的连接,并不影响项目与eureka-server的连接。
通过日志,看到项目一直在对数据库做健康检测,并且因为无法连上而一直有异常日志,同时看到了Eureka下线通知的日志Saw local status change event DOWN,而这两个日志都是在同一个线程里打印的,线程串联DiscoveryClient-InstanceInfoReplicator-0,既然是同一个线程,那说明是两者之间必然有关联。
那是什么原因导致eureka-server没有项目的注册信息?这个要从Eureka-Client的健康检测说起。
健康监测
按照常规,要了解原理,就从阅读二进制入手。
Eureke-client的初始化基本上都是在DiscoveryClient类内完成的,包括启动健康监测定时任务。
public class DiscoveryClient implements EurekaClient {
private void initScheduledTasks() {
……
if (clientConfig.shouldRegisterWithEureka()) {
……
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
……
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
}
在InstanceInfoReplicator内接通本节能检查系统健康并刷新当前Eureka-client节点状态。
class InstanceInfoReplicator implements Runnable {
public void run() {
try {
discoveryClient.refreshInstanceInfo();
……
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
}
public class DiscoveryClient implements EurekaClient {
void refreshInstanceInfo() {
……
InstanceStatus status;
try {
status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
} catch (Exception e) {
logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
status = InstanceStatus.DOWN;
}
if (null != status) {
applicationInfoManager.setInstanceStatus(status);
}
}
}
这里通过HealthCheckHandler获取instanceInfo的status并修改节点状态和下发事件通知,如果获取到的status是DOWN,那这时候事件监听器就打印了我们在开头看到的日志,并且上报给Eureka-server的中断状态也是DOWN,最终导致此问题的出现:网关无法从Eureka-server获取到状态为UP的路由器。
public class ApplicationInfoManager {
public synchronized void setInstanceStatus(InstanceStatus status) {
InstanceStatus next = instanceStatusMapper.map(status);
if (next == null) {
return;
}
InstanceStatus prev = instanceInfo.setStatus(next);
if (prev != null) {
for (StatusChangeListener listener : listeners.values()) {
try {
listener.notify(new StatusChangeEvent(prev, next));
} catch (Exception e) {
logger.warn("failed to notify listener: {}", listener.getId(), e);
}
}
}
}
}
public class DiscoveryClient implements EurekaClient {
private void initScheduledTasks() {
……
if (clientConfig.shouldRegisterWithEureka()) {
……
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
}
这里的重点就是DiscoveryClient的
getHealthCheckHandler().getStatus(instanceInfo.getStatus())是怎么获取到值的?
getHealthCheckHandler返回的是EurekaHealthCheckHandler,继续跟进原始码进入到EurekaHealthCheckHandler类。
public class EurekaHealthCheckHandler implements HealthCheckHandler, ApplicationContextAware, InitializingBean {
private final CompositeHealthIndicator healthIndicator;
@Override
public void afterPropertiesSet() throws Exception {
final Map
healthIndicators = applicationContext.getBeansOfType(HealthIndicator.class); for (Map.Entry
entry : healthIndicators.entrySet()) { //ignore EurekaHealthIndicator and flatten the rest of the composite
//otherwise there is a never ending cycle of down. See gh-643
if (entry.getValue() instanceof DiscoveryCompositeHealthIndicator) {
DiscoveryCompositeHealthIndicator indicator = (DiscoveryCompositeHealthIndicator) entry.getValue();
for (DiscoveryCompositeHealthIndicator.Holder holder : indicator.getHealthIndicators()) {
if (!(holder.getDelegate() instanceof EurekaHealthIndicator)) {
healthIndicator.addHealthIndicator(holder.getDelegate().getName(), holder);
}
}
}
else {
healthIndicator.addHealthIndicator(entry.getKey(), entry.getValue());
}
}
}
}
在afterPropertiesSet方法内部通过applicationContext.getBeansOfType获取到所有的健康检测类HealthIndicator。
注:applicationContext.getBeansOfType方法是通过遍历BeanDefinition获取所有beanName,然后遍历beanName,确定如果当前beanName未创建实例替换创建了对应的Bean对象实例。则会applicationContext.getBeansOfType确保将指定类型的所有的Bean对象都创造好。
public class EurekaHealthCheckHandler implements HealthCheckHandler, ApplicationContextAware, InitializingBean {
public InstanceStatus getStatus(InstanceStatus instanceStatus) {
return getHealthStatus();
}
protected InstanceStatus getHealthStatus() {
final Status status = getHealthIndicator().health().getStatus();
return mapToInstanceStatus(status);
}
protected CompositeHealthIndicator getHealthIndicator() {
return healthIndicator;
}
}
调用CompositeHealthIndicator的health方法获取状态,从前面的afterPropertiesSet方法可以看到,CompositeHealthIndicator是一个HealthIndicator合集。
public class CompositeHealthIndicator implements HealthIndicator {
public void addHealthIndicator(String name, HealthIndicator indicator) {
this.registry.register(name, indicator);
}
@Override
public Health health() {
Map
healths = new LinkedHashMap<>(); for (Map.Entry
entry : this.registry.getAll() .entrySet()) {
healths.put(entry.getKey(), entry.getValue().health());
}
return this.aggregator.aggregate(healths);
}
}
public class OrderedHealthAggregator extends AbstractHealthAggregator {
public OrderedHealthAggregator() {
setStatusOrder(Status.DOWN, Status.OUT_OF_SERVICE, Status.UP, Status.UNKNOWN);
}
public void setStatusOrder(Status... statusOrder) {
String[] order = new String[statusOrder.length];
for (int i = 0; i < statusOrder.length; i++) {
order[i] = statusOrder[i].getCode();
}
setStatusOrder(Arrays.asList(order));
}
@Override
public final Health aggregate(Map
healths) { List
statusCandidates = healths.values().stream().map(Health::getStatus) .collect(Collectors.toList());
Status status = aggregateStatus(statusCandidates);
Map
details = aggregateDetails(healths); return new Health.Builder(status, details).build();
}
protected Status aggregateStatus(List
candidates) { // Only sort those status instances that we know about
List
filteredCandidates = new ArrayList<>(); for (Status candidate : candidates) {
if (this.statusOrder.contains(candidate.getCode())) {
filteredCandidates.add(candidate);
}
}
// If no status is given return UNKNOWN
if (filteredCandidates.isEmpty()) {
return Status.UNKNOWN;
}
// Sort given Status instances by configured order
filteredCandidates.sort(new StatusComparator(this.statusOrder));
return filteredCandidates.get(0);
}
private class StatusComparator implements Comparator
{ private final List
statusOrder; StatusComparator(List
statusOrder) { this.statusOrder = statusOrder;
}
@Override
public int compare(Status s1, Status s2) {
int i1 = this.statusOrder.indexOf(s1.getCode());
int i2 = this.statusOrder.indexOf(s2.getCode());
return (i1 < i2) ? -1 : (i1 != i2) ? 1 : s1.getCode().compareTo(s2.getCode());
}
}
}
CompositeHealthIndicator的health是遍历所有HealthIndicator,调用HealthIndicator的健康监测health方法获取status。再将status根据DOWN->OUT_OF_SERVICE->UP->UNKNOWN的顺序排序并获取第一个状态(如果有例程状态为DOWN,那获取的结果就是DOWN)。
public class EurekaHealthCheckHandler implements HealthCheckHandler, ApplicationContextAware, InitializingBean {
private static final Map
STATUS_MAPPING = new HashMap
() {{ put(Status.UNKNOWN, InstanceStatus.UNKNOWN);
put(Status.OUT_OF_SERVICE, InstanceStatus.OUT_OF_SERVICE);
put(Status.DOWN, InstanceStatus.DOWN);
put(Status.UP, InstanceStatus.UP);
}};
protected InstanceStatus mapToInstanceStatus(Status status) {
if (!STATUS_MAPPING.containsKey(status)) {
return InstanceStatus.UNKNOWN;
}
return STATUS_MAPPING.get(status);
}
}
最后将通用状态STATUS映射成Eureka的例程实例状态InstanceStatus,并修改自身的状态。
总结
Eureka-client通过接通本节能所有的HealthIndicator的health方法对应电子杂志的健康检查状态,有如果HealthIndicator检测查询查询结果为DOWN,那Eureka-client就会判定当前服务有问题,是不可用的,就会将自身状态设置为DOWN,并上报给Eureka-server。Eureka-server收到信息之后将该节点状态标识为DOWN,这样其他服务就无法从Eureka-server获取到该计数器。
本次事故的原因就是因为DataSourceHealthIndicator检查的结果是DOWN,导致Eureka-client的状态也有所改变DOWN。
扩展
如果项目有某个重要的功能,一旦这个功能出问题就希望能将当前例程下线,那就可以添加自定义HealthIndicator类,并在health方法检查改功能是否正常。 可以通过接口+HealthIndicator实现控制服务上下线:
@RestController
@RequestMapping("/healthIndicator")
public class MyHealthIndicator implements HealthIndicator {
private boolean up;
@GetMapping("setUpVal/{up}")
public void setUpVal(@PathVariable("up") boolean up) {
this.up = up;
}
@Override
public Health health() {
if (up) {
return Health.up().build();
}
return Health.down().build();
}
public MyHealthIndicator setUp(boolean up) {
this.up = up;
return this;
}
}
以上可以通过调用接口/healthIndicator/setUpVal/false来手动下线当前服务中断。
点击左下角阅读原文,到 SegmentFault 思否社区 和文章作者展开更多互动和交流。
- END -