Commit f1be7bac authored by 郑航's avatar 郑航

elastic-job 动态注入cron表达式

新增tracelistener用于给线程加tracingid
parent af14582f
package cn.quantgroup.tech.elastic.job.lite.annotation;
import org.springframework.core.annotation.AliasFor;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.core.annotation.AliasFor;
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticSimpleJob {
@Deprecated
@AliasFor("cron")
public abstract String value() default "";
@Deprecated
@AliasFor("value")
public abstract String cron() default "";
public abstract String jobName() default "";
public String jobName() default "";
public abstract int shardingTotalCount() default 1;
......
package cn.quantgroup.tech.elastic.job.lite.autoconfigure;
import java.util.Map;
import javax.annotation.PostConstruct;
import javax.sql.DataSource;
import cn.quantgroup.tech.elastic.job.lite.annotation.ElasticSimpleJob;
import cn.quantgroup.tech.elastic.job.lite.listener.AddTraceListener;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigService;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.apache.commons.lang3.StringUtils;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -15,14 +21,9 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Configuration;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import javax.annotation.PostConstruct;
import javax.sql.DataSource;
import java.util.Map;
@Configuration
@ConditionalOnClass(ElasticSimpleJob.class)
......@@ -44,12 +45,12 @@ public class ElasticJobAutoConfiguration {
regCenter.init();
Map<String, SimpleJob> map = applicationContext.getBeansOfType(SimpleJob.class);
AddTraceListener addTraceListener = new AddTraceListener();;
for (Map.Entry<String, SimpleJob> entry : map.entrySet()) {
SimpleJob simpleJob = entry.getValue();
ElasticSimpleJob elasticSimpleJobAnnotation = AopUtils.getTargetClass(simpleJob).getAnnotation(ElasticSimpleJob.class);
if(elasticSimpleJobAnnotation != null){
String cron = StringUtils.defaultIfBlank(elasticSimpleJobAnnotation.cron(), elasticSimpleJobAnnotation.value());
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(elasticSimpleJobAnnotation.jobName(), cron, elasticSimpleJobAnnotation.shardingTotalCount()).shardingItemParameters(elasticSimpleJobAnnotation.shardingItemParameters()).build(), elasticSimpleJobAnnotation.jobName());
SimpleJobConfiguration simpleJobConfiguration = getJobConfig(elasticSimpleJobAnnotation);
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build();
String dataSourceRef = elasticSimpleJobAnnotation.dataSource();
......@@ -61,13 +62,28 @@ public class ElasticJobAutoConfiguration {
DataSource dataSource = (DataSource) applicationContext.getBean(dataSourceRef);
JobEventRdbConfiguration jobEventRdbConfiguration = new JobEventRdbConfiguration(dataSource);
SpringJobScheduler jobScheduler = new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration, jobEventRdbConfiguration);
SpringJobScheduler jobScheduler = new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration, jobEventRdbConfiguration,addTraceListener);
jobScheduler.init();
} else {
SpringJobScheduler jobScheduler = new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration);
SpringJobScheduler jobScheduler = new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration,addTraceListener);
jobScheduler.init();
}
}
}
}
private SimpleJobConfiguration getJobConfig(ElasticSimpleJob elasticSimpleJobAnnotation) {
String jobName = elasticSimpleJobAnnotation.jobName();
// only "application"
Config config = ConfigService.getAppConfig(); //config instance is singleton for each namespace and is never null
String cronKey = String.format("%s.cron",jobName);
String countKey = String.format("%s.shardingTotalCount",jobName);
String paramKey = String.format("%s.shardingItemParameters",jobName);
String cronValue = config.getProperty(cronKey,StringUtils.EMPTY);
Integer countValue = config.getIntProperty(countKey,0);
String paramValue = config.getProperty(paramKey,StringUtils.EMPTY);
return new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(elasticSimpleJobAnnotation.jobName(), cronValue, countValue).shardingItemParameters(paramValue).build(), elasticSimpleJobAnnotation.jobName());
}
}
\ No newline at end of file
package cn.quantgroup.tech.elastic.job.lite.listener;
import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.MDC;
/**
* Created by Baiye on 2020-06-09.
*
* @author Baiye
*/
public class AddTraceListener implements ElasticJobListener {
private final static String PLACEHOLDER = "X-B3-TraceId";
@Override
public void beforeJobExecuted(ShardingContexts shardingContexts) {
MDC.put(PLACEHOLDER, RandomStringUtils.randomAlphabetic(10));
}
@Override
public void afterJobExecuted(ShardingContexts shardingContexts) {
MDC.remove(PLACEHOLDER);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment