Commit 5ff433f6 authored by 王亮's avatar 王亮

add kakfa partion.

parent 2d3ba4e9
......@@ -16,7 +16,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- <maven.test.skip>true</maven.test.skip>-->
<maven.javadoc.skip>true</maven.javadoc.skip>
<revision>2.6.3.5</revision>
<revision>2.6.3.6</revision>
<maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
<maven-source-plugin.version>3.2.1</maven-source-plugin.version>
<maven-javadoc-plugin.version>3.3.1</maven-javadoc-plugin.version>
......
......@@ -11,7 +11,7 @@
<packaging>pom</packaging>
<properties>
<revision>2.6.3.5</revision>
<revision>2.6.3.6</revision>
<spring-boot.version>2.6.3</spring-boot.version>
<spring-cloud.version>2021.0.1</spring-cloud.version>
<spring-cloud-alibaba.version>2021.0.1.0</spring-cloud-alibaba.version>
......
......@@ -14,10 +14,12 @@ import io.micrometer.core.instrument.util.DoubleFormat;
import io.micrometer.core.instrument.util.MeterPartition;
import io.micrometer.core.instrument.util.NamedThreadFactory;
import io.micrometer.core.instrument.util.StringUtils;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
......@@ -37,6 +39,7 @@ public class KafkaMeterRegistry extends StepMeterRegistry {
private final String key;
private final ApplicationContext applicationContext;
private final int partition;
public KafkaMeterRegistry(KafkaConfig config, Clock clock,
ApplicationContext applicationContext) {
......@@ -56,7 +59,7 @@ public class KafkaMeterRegistry extends StepMeterRegistry {
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProducer = new KafkaProducer<>(properties);
partition = kafkaProducer.partitionsFor(config.topic()).size();
start(DEFAULT_THREAD_FACTORY);
}
......@@ -75,7 +78,7 @@ public class KafkaMeterRegistry extends StepMeterRegistry {
gauge -> writeGauge(gauge.getId(), gauge.value(getBaseTimeUnit())),
counter -> writeCounter(counter.getId(), counter.count()), this::writeFunctionTimer,
this::writeMeter)).forEach(i -> {
kafkaProducer.send(new ProducerRecord<>(config.topic(), key, i));
kafkaProducer.send(new ProducerRecord<>(config.topic(), (int)System.currentTimeMillis()%partition,key, i));
});
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment