Spark metrics实现KafkaSink
背景
目前支持的Sink包括:
ConsoleSink CSVSink JmxSink MetricsServlet GraphiteSink GangliaSink
实践
private[spark] trait Sink {def start(): Unitdef stop(): Unitdef report(): Unit}
package org.apache.spark.metrics.sinkimport java.util.concurrent.TimeUnitimport java.util.{Locale, Properties}import com.codahale.metrics.MetricRegistryimport org.apache.kafka.clients.producer.KafkaProducerimport org.apache.spark.SecurityManagerimport org.apache.spark.internal.Loggingprivate[spark] class KafkaSink(val property: Properties, val registry: MetricRegistry,securityMgr: SecurityManager) extends Sink with Logging{val KAFKA_KEY_PERIOD = "period"val KAFKA_DEFAULT_PERIOD = 10val KAFKA_KEY_UNIT = "unit"val KAFKA_DEFAULT_UNIT = "SECONDS"val KAFKA_TOPIC = "topic"val KAFKA_DEFAULT_TOPIC = "kafka-sink-topic"val KAFAK_BROKERS = "kafka-brokers"val KAFAK_DEFAULT_BROKERS = "XXX:9092"val TOPIC = Option(property.getProperty(KAFKA_TOPIC)).getOrElse(KAFKA_DEFAULT_TOPIC)val BROKERS = Option(property.getProperty(KAFAK_BROKERS)).getOrElse(throw new IllegalStateException("kafka-brokers is null!"))private val kafkaProducerConfig = new Properties()kafkaProducerConfig.put("bootstrap.servers",BROKERS)kafkaProducerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")kafkaProducerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")private val producer = new KafkaProducer[String, String](kafkaProducerConfig)private val reporter: KafkaReporter = KafkaReporter.forRegistry(registry).topic(TOPIC).build(producer)val pollPeriod = Option(property.getProperty(KAFKA_KEY_PERIOD)) match {case Some(s) => s.toIntcase None => KAFKA_DEFAULT_PERIOD}val pollUnit: TimeUnit = Option(property.getProperty(KAFKA_KEY_UNIT)) match {case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))case None => TimeUnit.valueOf(KAFKA_DEFAULT_UNIT)}override def start(): Unit = {log.info("I4 Metrics System KafkaSink Start ......")reporter.start(pollPeriod, pollUnit)}override def stop(): Unit = {log.info("I4 Metrics System KafkaSink Stop ......")reporter.stop()producer.close()}override def report(): Unit = {log.info("I4 Metrics System KafkaSink Report ......")reporter.report()}}
package org.apache.spark.metrics.sink;import com.alibaba.fastjson.JSONObject;import com.codahale.metrics.*;import com.twitter.bijection.Injection;import com.twitter.bijection.avro.GenericAvroCodecs;import org.apache.avro.Schema;import org.apache.avro.generic.GenericData;import org.apache.avro.generic.GenericRecord;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.Map;import java.util.SortedMap;import java.util.concurrent.TimeUnit;public class KafkaReporter extends ScheduledReporter {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaReporter.class);public static KafkaReporter.Builder forRegistry(MetricRegistry registry) {return new KafkaReporter.Builder(registry);}private KafkaProducer producer;private Clock clock;private String topic;private KafkaReporter(MetricRegistry registry,TimeUnit rateUnit,TimeUnit durationUnit,MetricFilter filter,Clock clock,String topic,KafkaProducer producer) {super(registry, "kafka-reporter", filter, rateUnit, durationUnit);this.producer = producer;this.topic = topic;this.clock = clock;}public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters, SortedMap<String, Histogram> histograms, SortedMap<String, Meter> meters, SortedMap<String, Timer> timers) {final long timestamp = TimeUnit.MILLISECONDS.toSeconds(clock.getTime());// Gaugefor (Map.Entry<String, Gauge> entry : gauges.entrySet()) {reportGauge(timestamp,entry.getKey(), entry.getValue());}// Histogram// for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {// reportHistogram(timestamp, entry.getKey(), entry.getValue());// }}private void reportGauge(long timestamp, String name, Gauge gauge) {report(timestamp, name, gauge.getValue());}private void reportHistogram(long timestamp, String name, Histogram histogram) {final Snapshot snapshot = histogram.getSnapshot();report(timestamp, name, snapshot.getMax());}private void report(long timestamp, String name, Object values) {JSONObject jsonObject = new JSONObject();jsonObject.put("name",name);jsonObject.put("timestamp",timestamp);jsonObject.put("value",values);producer.send(new ProducerRecord(topic,name, jsonObject.toJSONString()));}public static class Builder {private final MetricRegistry registry;private TimeUnit rateUnit;private TimeUnit durationUnit;private MetricFilter filter;private Clock clock;private String topic;private Builder(MetricRegistry registry) {this.registry = registry;this.rateUnit = TimeUnit.SECONDS;this.durationUnit = TimeUnit.MILLISECONDS;this.filter = MetricFilter.ALL;this.clock = Clock.defaultClock();}/*** Convert rates to the given time unit.** @param rateUnit a unit of time* @return {@code this}*/public KafkaReporter.Builder convertRatesTo(TimeUnit rateUnit) {this.rateUnit = rateUnit;return this;}/*** Convert durations to the given time unit.** @param durationUnit a unit of time* @return {@code this}*/public KafkaReporter.Builder convertDurationsTo(TimeUnit durationUnit) {this.durationUnit = durationUnit;return this;}/*** Use the given {@link Clock} instance for the time.** @param clock a {@link Clock} instance* @return {@code this}*/public Builder withClock(Clock clock) {this.clock = clock;return this;}/*** Only report metrics which match the given filter.** @param filter a {@link MetricFilter}* @return {@code this}*/public KafkaReporter.Builder filter(MetricFilter filter) {this.filter = filter;return this;}/*** Only report metrics which match the given filter.** @param topic a* @return {@code this}*/public KafkaReporter.Builder topic(String topic) {this.topic = topic;return this;}/*** Builds a {@link KafkaReporter} with the given properties, writing {@code .csv} files to the* given directory.** @return a {@link KafkaReporter}*/public KafkaReporter build(KafkaProducer producer) {return new KafkaReporter(registry,rateUnit,durationUnit,filter,clock,topic,producer);}}}
如何使用
spark.metrics.conf.*.sink.kafka.class=org.apache.spark.metrics.sink.KafkaSinkspark.metrics.conf.*.sink.kafka.kafka-brokers=XXX:9092
评论
