Commit 0843bf24 authored by 王亮's avatar 王亮

unit test env,for adding NAMESPACE scope logic.

parent 6380b45e
...@@ -23,6 +23,7 @@ import java.util.stream.Stream; ...@@ -23,6 +23,7 @@ import java.util.stream.Stream;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.util.ObjectUtils;
public class KafkaMeterRegistry extends StepMeterRegistry { public class KafkaMeterRegistry extends StepMeterRegistry {
...@@ -31,10 +32,18 @@ public class KafkaMeterRegistry extends StepMeterRegistry { ...@@ -31,10 +32,18 @@ public class KafkaMeterRegistry extends StepMeterRegistry {
private final KafkaConfig config; private final KafkaConfig config;
private KafkaProducer<String, String> kafkaProducer; private KafkaProducer<String, String> kafkaProducer;
private final String key;
public KafkaMeterRegistry(KafkaConfig config, Clock clock) { public KafkaMeterRegistry(KafkaConfig config, Clock clock) {
super(config, clock); super(config, clock);
this.config = config; this.config = config;
if (ObjectUtils.isEmpty(System.getProperty("NAMESPACE"))) {
key = "default";
} else {
key = System.getProperty("NAMESPACE");
}
Properties properties = new Properties(); Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.services()); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.services());
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
...@@ -66,7 +75,7 @@ public class KafkaMeterRegistry extends StepMeterRegistry { ...@@ -66,7 +75,7 @@ public class KafkaMeterRegistry extends StepMeterRegistry {
counter -> writeCounter(counter.getId(), counter.count()), counter -> writeCounter(counter.getId(), counter.count()),
this::writeFunctionTimer, this::writeFunctionTimer,
this::writeMeter)).forEach(i -> { this::writeMeter)).forEach(i -> {
kafkaProducer.send(new ProducerRecord<>(config.topic(), i)); kafkaProducer.send(new ProducerRecord<>(config.topic(), key,i));
}); });
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment