Commit 5660fd0b authored by 王亮's avatar 王亮

fix influxdb line protocol escape bugs.

parent 2733ebb7
......@@ -16,7 +16,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- <maven.test.skip>true</maven.test.skip>-->
<maven.javadoc.skip>true</maven.javadoc.skip>
<revision>2.6.3.4</revision>
<revision>2.6.3.5</revision>
<maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
<maven-source-plugin.version>3.2.1</maven-source-plugin.version>
<maven-javadoc-plugin.version>3.3.1</maven-javadoc-plugin.version>
......
......@@ -11,7 +11,7 @@
<packaging>pom</packaging>
<properties>
<revision>2.6.3.4</revision>
<revision>2.6.3.5</revision>
<spring-boot.version>2.6.3</spring-boot.version>
<spring-cloud.version>2021.0.1</spring-cloud.version>
<spring-cloud-alibaba.version>2021.0.1.0</spring-cloud-alibaba.version>
......
package cn.quantgroup.boot.micrometer.register.kafka;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* Influxdb line protocol special characters.
*/
public enum CharacterEnum {
Comma(",", Arrays.asList(ElementEnum.Measurement, ElementEnum.FieldKey, ElementEnum.TagKey,
ElementEnum.TagValue), "\\,"),
Space(" ", Arrays.asList(ElementEnum.Measurement, ElementEnum.FieldKey, ElementEnum.TagKey,
ElementEnum.TagValue), "\\ "),
EqualsSign("=", Arrays.asList(ElementEnum.FieldKey, ElementEnum.TagKey, ElementEnum.TagValue),
"\\="),
DoubleQuote("\"", Collections.singletonList(ElementEnum.FieldValue), "\\\""),
BackSlash("\\", Collections.singletonList(ElementEnum.FieldValue), "\\\\");
CharacterEnum(String value, List<ElementEnum> element, String elapsedValue) {
this.value = value;
this.element = element;
this.elapsedValue = elapsedValue;
}
//Escape characters
private final String value;
//Element
private final List<ElementEnum> element;
//Escaped characters
private final String elapsedValue;
public String getValue() {
return value;
}
public List<ElementEnum> getElement() {
return element;
}
public String getElapsedValue() {
return elapsedValue;
}
}
package cn.quantgroup.boot.micrometer.register.kafka;
/**
* influx db v2 element enum.
*/
public enum ElementEnum {
Measurement,TagKey,TagValue,FieldKey,FieldValue
}
......@@ -15,10 +15,12 @@ import io.micrometer.core.instrument.util.MeterPartition;
import io.micrometer.core.instrument.util.NamedThreadFactory;
import io.micrometer.core.instrument.util.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
......@@ -31,10 +33,10 @@ public class KafkaMeterRegistry extends StepMeterRegistry {
private static final ThreadFactory DEFAULT_THREAD_FACTORY = new NamedThreadFactory(
"kafka-metrics-publisher");
private final KafkaConfig config;
private KafkaProducer<String, String> kafkaProducer;
private final KafkaProducer<String, String> kafkaProducer;
private final String key;
private ApplicationContext applicationContext;
private final ApplicationContext applicationContext;
public KafkaMeterRegistry(KafkaConfig config, Clock clock,
ApplicationContext applicationContext) {
......@@ -92,12 +94,14 @@ public class KafkaMeterRegistry extends StepMeterRegistry {
private String influxLineProtocol(Meter.Id id, String metricType, Stream<Field> fields) {
String tags = getConventionTags(id).stream().filter(t -> StringUtils.isNotBlank(t.getValue()))
.map(t -> "," + escapeField(t.getKey()) + "=" + escapeField(t.getValue()))
.map(t -> "," + escapeLine(ElementEnum.TagKey, t.getKey()) + "=" + escapeLine(
ElementEnum.TagValue, t.getValue()))
.collect(joining(""));
String ip = applicationContext.getEnvironment().getProperty("spring.cloud.client.ip-address");
tags = tags + (",ip=" + ip);
return getConventionName(id) + tags + ",metric_type=" + metricType + " " + fields.map(
return getConventionName(id) + tags + ",metric_type=" + escapeLine(ElementEnum.Measurement,
metricType) + " " + fields.map(
Field::toString).collect(joining(",")) + " " + clock.wallTime();
}
......@@ -117,7 +121,7 @@ public class KafkaMeterRegistry extends StepMeterRegistry {
@Override
public String toString() {
return escapeField(key) + "=" + DoubleFormat.decimalOrNan(value);
return escapeLine(ElementEnum.FieldKey, key) + "=" + DoubleFormat.decimalOrNan(value);
}
}
......@@ -184,8 +188,14 @@ public class KafkaMeterRegistry extends StepMeterRegistry {
}
private static String escapeField(String names) {
return names.replace(" ", "\\ ").replace(",", "\\,");
private static String escapeLine(ElementEnum elementEnum, String line) {
AtomicReference<String> result = new AtomicReference<>("");
Arrays.stream(CharacterEnum.values()).filter(i -> i.getElement().contains(elementEnum))
.forEach(i -> {
result.set(line.replace(i.getValue(), i.getElapsedValue()));
});
return result.get();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment