Commit e40a446b authored by 王亮's avatar 王亮

fix some bug of getting ip .

parent c25256f1
...@@ -2,8 +2,6 @@ package cn.quantgroup.boot.micrometer.register.kafka; ...@@ -2,8 +2,6 @@ package cn.quantgroup.boot.micrometer.register.kafka;
import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.joining;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpoint;
import io.micrometer.core.instrument.Clock; import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.FunctionTimer; import io.micrometer.core.instrument.FunctionTimer;
...@@ -38,7 +36,8 @@ public class KafkaMeterRegistry extends StepMeterRegistry { ...@@ -38,7 +36,8 @@ public class KafkaMeterRegistry extends StepMeterRegistry {
private final String key; private final String key;
private ApplicationContext applicationContext; private ApplicationContext applicationContext;
public KafkaMeterRegistry(KafkaConfig config, Clock clock, ApplicationContext applicationContext) { public KafkaMeterRegistry(KafkaConfig config, Clock clock,
ApplicationContext applicationContext) {
super(config, clock); super(config, clock);
this.config = config; this.config = config;
if (ObjectUtils.isEmpty(System.getProperty("NAMESPACE"))) { if (ObjectUtils.isEmpty(System.getProperty("NAMESPACE"))) {
...@@ -68,19 +67,14 @@ public class KafkaMeterRegistry extends StepMeterRegistry { ...@@ -68,19 +67,14 @@ public class KafkaMeterRegistry extends StepMeterRegistry {
protected void publish() { protected void publish() {
for (List<Meter> batch : MeterPartition.partition(this, config.batchSize())) { for (List<Meter> batch : MeterPartition.partition(this, config.batchSize())) {
batch.stream() batch.stream().flatMap(m -> m.match(gauge -> writeGauge(gauge.getId(), gauge.value()),
.flatMap(m -> m.match( counter -> writeCounter(counter.getId(), counter.count()), this::writeTimer,
gauge -> writeGauge(gauge.getId(), gauge.value()), this::writeSummary, this::writeLongTaskTimer,
counter -> writeCounter(counter.getId(), counter.count()), gauge -> writeGauge(gauge.getId(), gauge.value(getBaseTimeUnit())),
this::writeTimer, counter -> writeCounter(counter.getId(), counter.count()), this::writeFunctionTimer,
this::writeSummary, this::writeMeter)).forEach(i -> {
this::writeLongTaskTimer, kafkaProducer.send(new ProducerRecord<>(config.topic(), key, i));
gauge -> writeGauge(gauge.getId(), gauge.value(getBaseTimeUnit())), });
counter -> writeCounter(counter.getId(), counter.count()),
this::writeFunctionTimer,
this::writeMeter)).forEach(i -> {
kafkaProducer.send(new ProducerRecord<>(config.topic(), key,i));
});
} }
} }
...@@ -97,19 +91,14 @@ public class KafkaMeterRegistry extends StepMeterRegistry { ...@@ -97,19 +91,14 @@ public class KafkaMeterRegistry extends StepMeterRegistry {
} }
private String influxLineProtocol(Meter.Id id, String metricType, Stream<Field> fields) { private String influxLineProtocol(Meter.Id id, String metricType, Stream<Field> fields) {
String tags = getConventionTags(id).stream() String tags = getConventionTags(id).stream().filter(t -> StringUtils.isNotBlank(t.getValue()))
.filter(t -> StringUtils.isNotBlank(t.getValue()))
.map(t -> "," + escapeField(t.getKey()) + "=" + escapeField(t.getValue())) .map(t -> "," + escapeField(t.getKey()) + "=" + escapeField(t.getValue()))
.collect(joining("")); .collect(joining(""));
NacosDiscoveryEndpoint nacosDiscoveryEndpoint = (NacosDiscoveryEndpoint) this.applicationContext.getBean("nacosDiscoveryEndpoint"); String ip = applicationContext.getEnvironment().getProperty("spring.cloud.client.ip-address");
String ip = ((NacosDiscoveryProperties) nacosDiscoveryEndpoint.nacosDiscovery()
.get("NacosDiscoveryProperties")).getIp();
tags = tags + (",ip=" + ip); tags = tags + (",ip=" + ip);
return getConventionName(id) return getConventionName(id) + tags + ",metric_type=" + metricType + " " + fields.map(
+ tags + ",metric_type=" + metricType + " " Field::toString).collect(joining(",")) + " " + clock.wallTime();
+ fields.map(Field::toString).collect(joining(","))
+ " " + clock.wallTime();
} }
static class Field { static class Field {
...@@ -125,6 +114,7 @@ public class KafkaMeterRegistry extends StepMeterRegistry { ...@@ -125,6 +114,7 @@ public class KafkaMeterRegistry extends StepMeterRegistry {
this.key = key; this.key = key;
this.value = value; this.value = value;
} }
@Override @Override
public String toString() { public String toString() {
return escapeField(key) + "=" + DoubleFormat.decimalOrNan(value); return escapeField(key) + "=" + DoubleFormat.decimalOrNan(value);
...@@ -139,32 +129,24 @@ public class KafkaMeterRegistry extends StepMeterRegistry { ...@@ -139,32 +129,24 @@ public class KafkaMeterRegistry extends StepMeterRegistry {
} }
private Stream<String> writeTimer(Timer timer) { private Stream<String> writeTimer(Timer timer) {
final Stream<Field> fields = Stream.of( final Stream<Field> fields = Stream.of(new Field("sum", timer.totalTime(getBaseTimeUnit())),
new Field("sum", timer.totalTime(getBaseTimeUnit())), new Field("count", timer.count()), new Field("mean", timer.mean(getBaseTimeUnit())),
new Field("count", timer.count()), new Field("upper", timer.max(getBaseTimeUnit())));
new Field("mean", timer.mean(getBaseTimeUnit())),
new Field("upper", timer.max(getBaseTimeUnit()))
);
return Stream.of(influxLineProtocol(timer.getId(), "histogram", fields)); return Stream.of(influxLineProtocol(timer.getId(), "histogram", fields));
} }
private Stream<String> writeSummary(DistributionSummary summary) { private Stream<String> writeSummary(DistributionSummary summary) {
final Stream<Field> fields = Stream.of( final Stream<Field> fields = Stream.of(new Field("sum", summary.totalAmount()),
new Field("sum", summary.totalAmount()), new Field("count", summary.count()), new Field("mean", summary.mean()),
new Field("count", summary.count()), new Field("upper", summary.max()));
new Field("mean", summary.mean()),
new Field("upper", summary.max())
);
return Stream.of(influxLineProtocol(summary.getId(), "histogram", fields)); return Stream.of(influxLineProtocol(summary.getId(), "histogram", fields));
} }
private Stream<String> writeLongTaskTimer(LongTaskTimer timer) { private Stream<String> writeLongTaskTimer(LongTaskTimer timer) {
Stream<Field> fields = Stream.of( Stream<Field> fields = Stream.of(new Field("active_tasks", timer.activeTasks()),
new Field("active_tasks", timer.activeTasks()), new Field("duration", timer.duration(getBaseTimeUnit())));
new Field("duration", timer.duration(getBaseTimeUnit()))
);
return Stream.of(influxLineProtocol(timer.getId(), "long_task_timer", fields)); return Stream.of(influxLineProtocol(timer.getId(), "long_task_timer", fields));
} }
...@@ -202,8 +184,8 @@ public class KafkaMeterRegistry extends StepMeterRegistry { ...@@ -202,8 +184,8 @@ public class KafkaMeterRegistry extends StepMeterRegistry {
} }
private static String escapeField(String names){ private static String escapeField(String names) {
return names.replace(" ","\\ ").replace(",","\\,"); return names.replace(" ", "\\ ").replace(",", "\\,");
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment