Commit 6380b45e authored by 王亮's avatar 王亮

add kafka for micrometer.

parent f92bfcf7
...@@ -12,5 +12,4 @@ import org.springframework.boot.context.properties.ConfigurationProperties; ...@@ -12,5 +12,4 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
public class EnochAgentProperties { public class EnochAgentProperties {
private String kafkaHost; private String kafkaHost;
private String kafkaTopic; private String kafkaTopic;
} }
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>qg-boot-parent</artifactId>
<groupId>cn.quantgroup.boot</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>cn.quantgroup.ext</groupId>
<artifactId>qg-micrometer-register-kafka-starter</artifactId>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package cn.quantgroup.ext.micrometer.register.kafka;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* metric collect
*/
@Data
@ConfigurationProperties(value = "tech.enoch")
public class EnochAgentProperties {
private String kafkaHost;
private String kafkaTopic;
}
package cn.quantgroup.ext.micrometer.register.kafka;
import io.micrometer.core.instrument.step.StepRegistryConfig;
public interface KafkaConfig extends StepRegistryConfig {
@Override
default String prefix() {
return "kafka";
}
default String services() {
return "localhost:9092";
}
default String topic() {
return "metrics";
}
}
package cn.quantgroup.ext.micrometer.register.kafka;
import static java.util.stream.Collectors.joining;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.FunctionTimer;
import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.Measurement;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.step.StepMeterRegistry;
import io.micrometer.core.instrument.util.DoubleFormat;
import io.micrometer.core.instrument.util.MeterPartition;
import io.micrometer.core.instrument.util.NamedThreadFactory;
import io.micrometer.core.instrument.util.StringUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaMeterRegistry extends StepMeterRegistry {
private static final ThreadFactory DEFAULT_THREAD_FACTORY = new NamedThreadFactory(
"kafka-metrics-publisher");
private final KafkaConfig config;
private KafkaProducer<String, String> kafkaProducer;
public KafkaMeterRegistry(KafkaConfig config, Clock clock) {
super(config, clock);
this.config = config;
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.services());
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProducer = new KafkaProducer<>(properties);
start(DEFAULT_THREAD_FACTORY);
}
@Override
public void start(ThreadFactory threadFactory) {
super.start(threadFactory);
}
@Override
protected void publish() {
for (List<Meter> batch : MeterPartition.partition(this, config.batchSize())) {
batch.stream()
.flatMap(m -> m.match(
gauge -> writeGauge(gauge.getId(), gauge.value()),
counter -> writeCounter(counter.getId(), counter.count()),
this::writeTimer,
this::writeSummary,
this::writeLongTaskTimer,
gauge -> writeGauge(gauge.getId(), gauge.value(getBaseTimeUnit())),
counter -> writeCounter(counter.getId(), counter.count()),
this::writeFunctionTimer,
this::writeMeter)).forEach(i -> {
kafkaProducer.send(new ProducerRecord<>(config.topic(), i));
});
}
}
@Override
protected TimeUnit getBaseTimeUnit() {
return TimeUnit.MILLISECONDS;
}
private Stream<String> writeGauge(Meter.Id id, Double value) {
if (Double.isFinite(value)) {
return Stream.of(influxLineProtocol(id, "gauge", Stream.of(new Field("value", value))));
}
return Stream.empty();
}
private String influxLineProtocol(Meter.Id id, String metricType, Stream<Field> fields) {
String tags = getConventionTags(id).stream()
.filter(t -> StringUtils.isNotBlank(t.getValue()))
.map(t -> "," + t.getKey() + "=" + t.getValue())
.collect(joining(""));
return getConventionName(id)
+ tags + ",metric_type=" + metricType + " "
+ fields.map(Field::toString).collect(joining(","))
+ " " + clock.wallTime();
}
static class Field {
final String key;
final double value;
Field(String key, double value) {
// `time` cannot be a field key or tag key
if (key.equals("time")) {
throw new IllegalArgumentException("'time' is an invalid field key in InfluxDB");
}
this.key = key;
this.value = value;
}
@Override
public String toString() {
return key + "=" + DoubleFormat.decimalOrNan(value);
}
}
private Stream<String> writeCounter(Meter.Id id, double count) {
if (Double.isFinite(count)) {
return Stream.of(influxLineProtocol(id, "counter", Stream.of(new Field("value", count))));
}
return Stream.empty();
}
private Stream<String> writeTimer(Timer timer) {
final Stream<Field> fields = Stream.of(
new Field("sum", timer.totalTime(getBaseTimeUnit())),
new Field("count", timer.count()),
new Field("mean", timer.mean(getBaseTimeUnit())),
new Field("upper", timer.max(getBaseTimeUnit()))
);
return Stream.of(influxLineProtocol(timer.getId(), "histogram", fields));
}
private Stream<String> writeSummary(DistributionSummary summary) {
final Stream<Field> fields = Stream.of(
new Field("sum", summary.totalAmount()),
new Field("count", summary.count()),
new Field("mean", summary.mean()),
new Field("upper", summary.max())
);
return Stream.of(influxLineProtocol(summary.getId(), "histogram", fields));
}
private Stream<String> writeLongTaskTimer(LongTaskTimer timer) {
Stream<Field> fields = Stream.of(
new Field("active_tasks", timer.activeTasks()),
new Field("duration", timer.duration(getBaseTimeUnit()))
);
return Stream.of(influxLineProtocol(timer.getId(), "long_task_timer", fields));
}
private Stream<String> writeFunctionTimer(FunctionTimer timer) {
double sum = timer.totalTime(getBaseTimeUnit());
if (Double.isFinite(sum)) {
Stream.Builder<Field> builder = Stream.builder();
builder.add(new Field("sum", sum));
builder.add(new Field("count", timer.count()));
double mean = timer.mean(getBaseTimeUnit());
if (Double.isFinite(mean)) {
builder.add(new Field("mean", mean));
}
return Stream.of(influxLineProtocol(timer.getId(), "histogram", builder.build()));
}
return Stream.empty();
}
private Stream<String> writeMeter(Meter m) {
List<Field> fields = new ArrayList<>();
for (Measurement measurement : m.measure()) {
double value = measurement.getValue();
if (!Double.isFinite(value)) {
continue;
}
String fieldKey = measurement.getStatistic().getTagValueRepresentation()
.replaceAll("(.)(\\p{Upper})", "$1_$2").toLowerCase();
fields.add(new Field(fieldKey, value));
}
if (fields.isEmpty()) {
return Stream.empty();
}
Meter.Id id = m.getId();
return Stream.of(influxLineProtocol(id, id.getType().name().toLowerCase(), fields.stream()));
}
}
package cn.quantgroup.ext.micrometer.register.kafka;
import io.micrometer.core.instrument.Clock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@ComponentScan(basePackageClasses = KafkaMeterRegistry.class)
@EnableConfigurationProperties(EnochAgentProperties.class)
@ConditionalOnProperty(prefix = "tech.enoch", name = "enabled", havingValue = "true")
public class MicroMeterKafkaAutoConfiguration {
private final EnochAgentProperties enochAgentProperties;
public MicroMeterKafkaAutoConfiguration(
EnochAgentProperties enochAgentProperties) {
this.enochAgentProperties = enochAgentProperties;
}
@Bean
public KafkaConfig kafkaConfig() {
return new
KafkaConfig() {
@Override
public String get(String s) {
return null;
}
@Override
public String services(){
return enochAgentProperties.getKafkaHost();
}
@Override
public String topic(){
return enochAgentProperties.getKafkaTopic();
}
};
}
@Bean
public KafkaMeterRegistry kafkaMeterRegistry(KafkaConfig kafkaConfig) {
return new KafkaMeterRegistry(kafkaConfig, Clock.SYSTEM);
}
}
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.quantgroup.ext.micrometer.register.kafka.MicroMeterKafkaAutoConfiguration
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment