Spark metrics实现KafkaSink

共 8379字,需浏览 17分钟

 ·

2021-03-31 19:59

背景

监控是Spark非常重要的一部分。Spark的运行情况是由ListenerBus以及MetricsSystem 来完成的。通过Spark的Metrics系统,我们可以把Spark Metrics的收集到的信息发送到各种各样的Sink,比如HTTP、JMX以及CSV文件。
目前支持的Sink包括:
  • ConsoleSink
  • CSVSink
  • JmxSink
  • MetricsServlet
  • GraphiteSink
  • GangliaSink
有时我们需要实时获取metrics数据通过spark分析展示等需求,这个时候若有个KafkaSink将metrics指标数据实时往kafka发送那就太方便了,故有了这篇博文。

实践

所有的Sink都需要继承Sink这个特质:
private[spark] trait Sink { def start(): Unit def stop(): Unit def report(): Unit}

当该Sink注册到metrics系统中时,会调用start方法进行一些初始化操作,再通过report方式进行真正的输出操作,stop方法可以进行一些连接关闭等操作。直接上代码:
package org.apache.spark.metrics.sink
import java.util.concurrent.TimeUnitimport java.util.{Locale, Properties}
import com.codahale.metrics.MetricRegistryimport org.apache.kafka.clients.producer.KafkaProducerimport org.apache.spark.SecurityManagerimport org.apache.spark.internal.Logging
private[spark] class KafkaSink(val property: Properties, val registry: MetricRegistry, securityMgr: SecurityManager) extends Sink with Logging{
val KAFKA_KEY_PERIOD = "period" val KAFKA_DEFAULT_PERIOD = 10
val KAFKA_KEY_UNIT = "unit" val KAFKA_DEFAULT_UNIT = "SECONDS"
val KAFKA_TOPIC = "topic" val KAFKA_DEFAULT_TOPIC = "kafka-sink-topic"
val KAFAK_BROKERS = "kafka-brokers" val KAFAK_DEFAULT_BROKERS = "XXX:9092"
val TOPIC = Option(property.getProperty(KAFKA_TOPIC)).getOrElse(KAFKA_DEFAULT_TOPIC) val BROKERS = Option(property.getProperty(KAFAK_BROKERS)).getOrElse(throw new IllegalStateException("kafka-brokers is null!"))
private val kafkaProducerConfig = new Properties() kafkaProducerConfig.put("bootstrap.servers",BROKERS) kafkaProducerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") kafkaProducerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
private val producer = new KafkaProducer[String, String](kafkaProducerConfig)
private val reporter: KafkaReporter = KafkaReporter.forRegistry(registry) .topic(TOPIC) .build(producer)

val pollPeriod = Option(property.getProperty(KAFKA_KEY_PERIOD)) match { case Some(s) => s.toInt case None => KAFKA_DEFAULT_PERIOD }
val pollUnit: TimeUnit = Option(property.getProperty(KAFKA_KEY_UNIT)) match { case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT)) case None => TimeUnit.valueOf(KAFKA_DEFAULT_UNIT) }
override def start(): Unit = { log.info("I4 Metrics System KafkaSink Start ......") reporter.start(pollPeriod, pollUnit) }
override def stop(): Unit = { log.info("I4 Metrics System KafkaSink Stop ......") reporter.stop() producer.close() }
override def report(): Unit = { log.info("I4 Metrics System KafkaSink Report ......") reporter.report() }}
KafkaReporter类:
package org.apache.spark.metrics.sink;
import com.alibaba.fastjson.JSONObject;import com.codahale.metrics.*;import com.twitter.bijection.Injection;import com.twitter.bijection.avro.GenericAvroCodecs;import org.apache.avro.Schema;import org.apache.avro.generic.GenericData;import org.apache.avro.generic.GenericRecord;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;
import java.util.Map;import java.util.SortedMap;import java.util.concurrent.TimeUnit;
public class KafkaReporter extends ScheduledReporter {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaReporter.class);
public static KafkaReporter.Builder forRegistry(MetricRegistry registry) { return new KafkaReporter.Builder(registry); }
private KafkaProducer producer; private Clock clock; private String topic;
private KafkaReporter(MetricRegistry registry, TimeUnit rateUnit, TimeUnit durationUnit, MetricFilter filter, Clock clock, String topic, KafkaProducer producer) { super(registry, "kafka-reporter", filter, rateUnit, durationUnit); this.producer = producer; this.topic = topic; this.clock = clock; }
@Override public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters, SortedMap<String, Histogram> histograms, SortedMap<String, Meter> meters, SortedMap<String, Timer> timers) { final long timestamp = TimeUnit.MILLISECONDS.toSeconds(clock.getTime());
// Gauge for (Map.Entry<String, Gauge> entry : gauges.entrySet()) { reportGauge(timestamp,entry.getKey(), entry.getValue()); } // Histogram// for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {// reportHistogram(timestamp, entry.getKey(), entry.getValue());// } }

private void reportGauge(long timestamp, String name, Gauge gauge) { report(timestamp, name, gauge.getValue()); }
private void reportHistogram(long timestamp, String name, Histogram histogram) { final Snapshot snapshot = histogram.getSnapshot(); report(timestamp, name, snapshot.getMax()); }
private void report(long timestamp, String name, Object values) { JSONObject jsonObject = new JSONObject(); jsonObject.put("name",name); jsonObject.put("timestamp",timestamp); jsonObject.put("value",values); producer.send(new ProducerRecord(topic,name, jsonObject.toJSONString())); }

public static class Builder {
private final MetricRegistry registry; private TimeUnit rateUnit; private TimeUnit durationUnit; private MetricFilter filter; private Clock clock; private String topic;
private Builder(MetricRegistry registry) { this.registry = registry; this.rateUnit = TimeUnit.SECONDS; this.durationUnit = TimeUnit.MILLISECONDS; this.filter = MetricFilter.ALL; this.clock = Clock.defaultClock(); }
/** * Convert rates to the given time unit. * * @param rateUnit a unit of time * @return {@code this} */ public KafkaReporter.Builder convertRatesTo(TimeUnit rateUnit) { this.rateUnit = rateUnit; return this; }
/** * Convert durations to the given time unit. * * @param durationUnit a unit of time * @return {@code this} */ public KafkaReporter.Builder convertDurationsTo(TimeUnit durationUnit) { this.durationUnit = durationUnit; return this; }
/** * Use the given {@link Clock} instance for the time. * * @param clock a {@link Clock} instance * @return {@code this} */ public Builder withClock(Clock clock) { this.clock = clock; return this; }
/** * Only report metrics which match the given filter. * * @param filter a {@link MetricFilter} * @return {@code this} */ public KafkaReporter.Builder filter(MetricFilter filter) { this.filter = filter; return this; }
/** * Only report metrics which match the given filter. * * @param topic a * @return {@code this} */ public KafkaReporter.Builder topic(String topic) { this.topic = topic; return this; }
/** * Builds a {@link KafkaReporter} with the given properties, writing {@code .csv} files to the * given directory. * * @return a {@link KafkaReporter} */ public KafkaReporter build(KafkaProducer producer) { return new KafkaReporter(registry, rateUnit, durationUnit, filter, clock, topic, producer); } }}

其中的report方法就是获取各种类型指标,并进行对应的输出操作的时机。

如何使用

可在配置文件或者程序中设定需要注册的sink,并带上对应的参数即可:
spark.metrics.conf.*.sink.kafka.class=org.apache.spark.metrics.sink.KafkaSinkspark.metrics.conf.*.sink.kafka.kafka-brokers=XXX:9092

原文:https://www.jianshu.com/p/cee005368b61
浏览 39
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报