Commit 4274c098 authored by data爬虫-冯 军凯's avatar data爬虫-冯 军凯

项目初始化

parents
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.quantgroup</groupId>
<artifactId>qg-rt-dc</artifactId>
<packaging>pom</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>qg-rt-dc</name>
<parent>
<groupId>cn.quantgroup</groupId>
<artifactId>commons-parent</artifactId>
<version>0.2.4</version>
</parent>
<modules>
<module>qg-data-dc</module>
<module>qg-data-service</module>
</modules>
<properties>
<project.environment>dev</project.environment>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<resource.delimiter>@</resource.delimiter>
<java.version>1.8</java.version>
<spring.boot.version>1.5.9.RELEASE</spring.boot.version>
<drools-version>7.5.0.Final</drools-version>
<spring-context-support.version>4.1.6.RELEASE</spring-context-support.version>
<fastjson.version>1.2.21</fastjson.version>
<okhttp.version>3.4.2</okhttp.version>
<retrofit.version>2.1.0</retrofit.version>
<rxjava.version>1.2.3</rxjava.version>
<jackson.version>2.8.7</jackson.version>
<maven.test.skip>true</maven.test.skip>
<qiniu-java-sdk.version>7.1.1</qiniu-java-sdk.version>
<mysql.version>5.1.44</mysql.version>
</properties>
<build>
<resources>
<resource>
<filtering>true</filtering>
<directory>src/main/resources/</directory>
<includes>
<include>
*.properties
</include>
</includes>
</resource>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.xml</include>
</includes>
</resource>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
</resource>
<resource>
<directory>${project.build.directory}/generated-resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-maven-plugin</artifactId>
<version>1.3.5</version>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.16</version>
</dependency>
<!-- 添加权限 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<!--swagger -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.0.3</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.0.3</version>
</dependency>
<!--swagger -->
<!--spring-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper</artifactId>
<version>5.1.2</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<!--google-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>24.0-jre</version>
</dependency>
<!--db-->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>${hikaricp.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!-- mybatis 生成器-->
<dependency>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-core</artifactId>
<version>1.3.5</version>
</dependency>
<!--json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.46</version>
</dependency>
<dependency>
<groupId>org.dom4j</groupId>
<artifactId>dom4j</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<!--ok http -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp.version}</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>logging-interceptor</artifactId>
<version>${okhttp.version}</version>
</dependency>
<!--retrofit -->
<dependency>
<groupId>com.squareup.retrofit2</groupId>
<artifactId>retrofit</artifactId>
<version>${retrofit.version}</version>
</dependency>
<dependency>
<groupId>com.squareup.retrofit2</groupId>
<artifactId>adapter-rxjava</artifactId>
<version>${retrofit.version}</version>
</dependency>
<dependency>
<groupId>com.squareup.retrofit2</groupId>
<artifactId>converter-simplexml</artifactId>
<version>${retrofit.version}</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>4.9</version>
</dependency>
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<version>0.10.2</version>
</dependency>
<dependency>
<groupId>cn.quantgroup</groupId>
<artifactId>quantgroup-user-sdk</artifactId>
<version>1.0.13-release</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--<dependency>-->
<!--<groupId>com.github.tobato</groupId>-->
<!--<artifactId>fastdfs-client</artifactId>-->
<!--<version>1.26.2</version>-->
<!--</dependency>-->
<dependency>
<groupId>com.belerweb</groupId>
<artifactId>pinyin4j</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.ibm.icu</groupId>
<artifactId>icu4j</artifactId>
<version>53.1</version>
</dependency>
<!--SpringBoot, Cloud基础的依赖-->
<dependency>
<groupId>cn.quantgroup</groupId>
<artifactId>commons-spring</artifactId>
</dependency>
<!--暂时是空的.....-->
<dependency>
<groupId>cn.quantgroup</groupId>
<artifactId>commons-core</artifactId>
</dependency>
<!--优雅停机, 不再强制停机-->
<dependency>
<groupId>cn.quantgroup</groupId>
<artifactId>shutdown-spring-boot-starter</artifactId>
</dependency>
<!--调用链追踪. 开启吧, 以后的监控,数据统计. 都从这里出数-->
<dependency>
<groupId>cn.quantgroup</groupId>
<artifactId>brave-spring-boot-starter</artifactId>
</dependency>
<!--全局唯一的IDGenerator, 试运行阶段. 百度开发者+刘志国结合量化派场景, 倾情定制.-->
<dependency>
<groupId>cn.quantgroup</groupId>
<artifactId>idgenerator-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring-shaded</artifactId>
<version>2.1.5</version>
</dependency>
<!--SpringBoot, Cloud基础的依赖-->
<dependency>
<groupId>cn.quantgroup</groupId>
<artifactId>commons-spring</artifactId>
</dependency>
<!--暂时是空的.....-->
<dependency>
<groupId>cn.quantgroup</groupId>
<artifactId>commons-core</artifactId>
</dependency>
<!--优雅停机, 不再强制停机-->
<dependency>
<groupId>cn.quantgroup</groupId>
<artifactId>shutdown-spring-boot-starter</artifactId>
</dependency>
<!--调用链追踪. 开启吧, 以后的监控,数据统计. 都从这里出数-->
<dependency>
<groupId>cn.quantgroup</groupId>
<artifactId>brave-spring-boot-starter</artifactId>
</dependency>
<!--全局唯一的IDGenerator, 试运行阶段. 百度开发者+刘志国结合量化派场景, 倾情定制.-->
<dependency>
<groupId>cn.quantgroup</groupId>
<artifactId>idgenerator-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring-shaded</artifactId>
<version>2.1.5</version>
</dependency>
</dependencies>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.quantgroup</groupId>
<artifactId>qg-data-dc</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>qg-data-dc</name>
<packaging>jar</packaging>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>cn.quantgroup</groupId>
<artifactId>qg-rt-dc</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>cn.quantgroup</groupId>
<artifactId>qg-data-service</artifactId>
<version>V1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package cn.quantgroup.qgdatadc;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.ctrip.framework.apollo.spring.annotation.EnableApolloConfig;
import com.ctrip.framework.apollo.spring.config.ApolloPropertySourceInitializer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.transaction.annotation.EnableTransactionManagement;
//@EnableJpaRepositories(basePackages = {"cn.quantgroup.qgrtdc.repository.jpa"})
//@EntityScan(basePackages = {"cn.quantgroup.qgrtdc.repository.jpa.entity"})
@EnableTransactionManagement
@EnableConfigurationProperties
@EnableCaching
@EnableAsync
@EnableAspectJAutoProxy
@Slf4j
@EnableApolloConfig
@SpringBootApplication
public class QgDataDcApplication {
public static void main(String[] args) {
JSON.DEFAULT_GENERATE_FEATURE |= SerializerFeature.WriteEnumUsingToString.getMask();
SpringApplication springApplication = new SpringApplication(QgDataDcApplication.class);
springApplication.addInitializers(new ApolloPropertySourceInitializer());
springApplication.run(args);
log.info("QUANTGROUP-数据实时清洗-系统启动完成 ^_^ ");
}
}
app.id=qg-data-dc
namespace=application,tech.service.urls,tech.common,tech.sleuth,tech.deploy,tech.msg.sdk
\ No newline at end of file
package cn.quantgroup.qgdatadc;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class QgDataDcApplicationTests {
@Test
public void contextLoads() {
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.quantgroup</groupId>
<artifactId>qg-data-service</artifactId>
<version>0.1.9-SNAPSHOT</version>
<packaging>jar</packaging>
<name>qg-data-service</name>
<parent>
<groupId>cn.quantgroup</groupId>
<artifactId>commons-parent</artifactId>
<version>0.2.4</version>
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.46</version>
</dependency>
<dependency>
<groupId>com.lkb.data</groupId>
<artifactId>lkb-data-service</artifactId>
<version>1.7.5.1-3b-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>
</dependencies>
<!--<build>-->
<!--<plugins>-->
<!--<plugin>-->
<!--<groupId>org.springframework.boot</groupId>-->
<!--<artifactId>spring-boot-maven-plugin</artifactId>-->
<!--</plugin>-->
<!--</plugins>-->
<!--</build>-->
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.3</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<!-- this is used for inheritance merges -->
<phase>package</phase>
<!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<!--<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<id>attach-sources</id>
<phase>compile</phase>
<goals>
<goal>jar-no-fork</goal>
&lt;!&ndash; 类似执行mvn source:jar &ndash;&gt;
</goals>
</execution>
</executions>
</plugin>-->
</plugins>
</build>
<distributionManagement>
<repository>
<id>Releases</id>
<name>LKB Releases Repository</name>
<url>http://repo.quantgroup.cn/nexus/content/repositories/lkb-releases</url>
</repository>
<snapshotRepository>
<id>Snapshots</id>
<name>LKB Snapshots Repository</name>
<url>http://repo.quantgroup.cn/nexus/content/repositories/lkb-snapshots</url>
</snapshotRepository>
</distributionManagement>
</project>
package cn.quantgroup.qgdataservice;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication
public class QgDataServiceApplication {
public static void main(String[] args) {
SpringApplication.run(QgDataServiceApplication.class, args);
}
}
package cn.quantgroup.qgdataservice.config.datasource;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
/**
* @Author fengjunkai
* @Date 2019-05-14 16:32
*/
public enum TidbDataSource {
HIK_DATA_SOURCE;
public javax.sql.DataSource dataSource;
TidbDataSource() {
if (dataSource == null) {
System.out.println("======创建TIDB数据库连接======");
HikariConfig config = new HikariConfig();
// config.setJdbcUrl("jdbc:mysql://10.17.115.6:4010/rc_comservice_data_pool_v2?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true");
// config.setUsername("rc_comservice_data_pool_v2_w");
// config.setPassword("w9pr8IPJkLmUSBe4");
config.setJdbcUrl("jdbc:mysql://172.30.220.9:3306/rc_comservice_data_pool_v2?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true");
// config.setJdbcUrl("jdbc:mysql://172.30.220.9:3306/rc_comservice_data_pool_v2?useUnicode=true&characterEncoding=UTF8");useServerPrepStmts=true
config.setUsername("qa");
config.setPassword("qatest");
config.setDriverClassName("com.mysql.jdbc.Driver");
config.setMaximumPoolSize(12);
config.setMinimumIdle(6);
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
dataSource = new HikariDataSource(config);
}
}
}
package cn.quantgroup.qgdataservice.constant;
/**
* @Author fengjunkai
* @Date 2019-05-14 18:19
*/
public class Constant {
public static class SQL{
public static String USER_INFO_ITEM = "INSERT IGNORE INTO comservice_i_spider_user_info (uuid,realName,registerDate,idCard,phoneRemain,phone,addr,merry,cardType,cardNo,sex,loginName,userSource,timestamp) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; //14
public static String MOBILE_DETAIL_INFOS = "INSERT IGNORE INTO comservice_i_mobile_call_info (uuid,cTime,tradeAddr,tradeWay,tradeType,receiverPhone,tradeTime,taocan,onlinePay,phone,iscm,timestamp) values (?,?,?,?,?,?,?,?,?,?,?,?)"; //12
public static String MOBILE_SMS_DETAIL_INFOS = "INSERT IGNORE INTO comservice_i_mobile_sms_info (uuid,allPay,createTs,phone,receiverPhone,sentAddr,sentTime,tradeway,timestamp) values (?,?,?,?,?,?,?,?,?)"; //9
public static String MOBILE_FLOW_DETAIL_INFOS = "INSERT IGNORE INTO comservice_i_mobile_flow_info (uuid,cTime,cheapService,communicationFees,onlineTime,onlineType,phone,totalFlow,tradeAddr,timestamp) values (?,?,?,?,?,?,?,?,?,?)"; //10
public static String TELECOM_DETAIL_INFOS = "INSERT IGNORE INTO comservice_i_telecom_call_info (uuid,tradeType,cTime,tradeTime,callWay,receiverPhone,tradeAddr,basePay,longPay,infoPay,otherPay,allPay,phone,iscm,timestamp) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; //15
public static String TELECOM_SMS_DETAIL_INFOS = "INSERT IGNORE INTO comservice_i_telecom_sms_info (uuid,allpay,businessType,createTs,phone,receiverPhone,sentTime,timestamp) values (?,?,?,?,?,?,?,?)"; //8
public static String TELECOM_FLOW_DETAIL_INFOS = "INSERT IGNORE INTO comservice_i_telecom_flow_info (uuid,beginTime,business,fee,flow,iscm,location,netType,phone,tradeTime,timestamp) values (?,?,?,?,?,?,?,?,?,?,?)"; //11
public static String UNICOM_DETAIL_INFOS = "INSERT IGNORE INTO comservice_i_unicom_call_info (uuid,businessType,cTime,tradeTime,callType,receiverPhone,tradeAddr,tradeType,basePay,ldPay,otherPay,totalPay,phone,iscm,reductionPay,timestamp) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; //16
public static String UNICOM_SMS_DETAIL_INFOS = "INSERT IGNORE INTO comservice_i_unicom_sms_info (uuid,allPay,createTs,phone,receiverPhone,sentTime,tradeType,timestamp) values (?,?,?,?,?,?,?,?)"; //8
public static String UNICOM_FLOW_DETAIL_INFOS = "INSERT IGNORE INTO comservice_i_unicom_flow_info (uuid,allFlow,allPay,createTs,phone,startTime,tradeAddr,tradeType,timestamp) values (?,?,?,?,?,?,?,?,?)";//9
public static String PHONE_BILL_INFOS = "INSERT IGNORE INTO comservice_i_phone_bill_info (uuid,phoneNo,billDay,name,amount,dependCycle,billPackage,content,timestamp) values (?,?,?,?,?,?,?,?,?)";//9
public static String PHONE_RECHARGE_INFOS = "INSERT IGNORE INTO comservice_i_phone_payment_info (uuid,phone,payamount,paytime,paymethod,paychannel,payspare,timestamp) values (?,?,?,?,?,?,?,?)";//8
}
public static class OPERATOR{
public static final String MOBILE_USERSOURCE_CALL = "MOBILE_USERSOURCE_CALL";
public static final String TELECOM_USERSOURCE_CALL = "TELECOM_USERSOURCE_CALL";
public static final String UNICOM_USERSOURCE_CALL = "UNICOM_USERSOURCE_CALL";
public static final String MOBILE_USERSOURCE_SMS = "MOBILE_USERSOURCE_SMS";
public static final String TELECOM_USERSOURCE_SMS = "TELECOM_USERSOURCE_SMS";
public static final String UNICOM_USERSOURCE_SMS = "UNICOM_USERSOURCE_SMS";
public static final String MOBILE_USERSOURCE_FLOW = "MOBILE_USERSOURCE_FLOW";
public static final String TELECOM_USERSOURCE_FLOW = "TELECOM_USERSOURCE_FLOW";
public static final String UNICOM_USERSOURCE_FLOW = "UNICOM_USERSOURCE_FLOW";
public static final String PHONE_CALL_BILL = "PHONE_CALL_BILL";
// public static final String PHONE_FLOW_BILL = "PHONE_FLOW_BILL";
public static final String PHONE_RECHARGE_RECORDS = "PHONE_RECHARGE_RECORDS";
}
public static class BATCH{
public static final int BATCH_SIZE = 1000;
}
}
package cn.quantgroup.qgdataservice.service.hbase;
import cn.quantgroup.qgdataservice.service.tidb.OperatorInfoCleaningService;
import com.lkb.data.hbase.dataservice.SpiderUserItemDataService;
import com.lkb.data.hbase.dataservice.operators.PhoneBillDataService;
import com.lkb.data.hbase.dataservice.operators.PhonePaymentDataService;
import com.lkb.data.hbase.dataservice.operators.mobile.MobileCallInfoDataService;
import com.lkb.data.hbase.dataservice.operators.mobile.MobileFlowInfoDataService;
import com.lkb.data.hbase.dataservice.operators.mobile.MobileSmsInfoDataService;
import com.lkb.data.hbase.dataservice.operators.telecom.TelecomCallInfoDataService;
import com.lkb.data.hbase.dataservice.operators.telecom.TelecomFlowInfoDataService;
import com.lkb.data.hbase.dataservice.operators.telecom.TelecomSmsInfoDataService;
import com.lkb.data.hbase.dataservice.operators.unicom.UnicomCallInfoDataService;
import com.lkb.data.hbase.dataservice.operators.unicom.UnicomFlowInfoDataService;
import com.lkb.data.hbase.dataservice.operators.unicom.UnicomSmsInfoDataService;
import com.lkb.data.hbase.row.SpiderUserItemDataRow;
import com.lkb.data.hbase.row.operators.PhoneBillDataRow;
import com.lkb.data.hbase.row.operators.PhonePaymentDataRow;
import com.lkb.data.hbase.row.operators.mobile.MobileCallInfoRow;
import com.lkb.data.hbase.row.operators.mobile.MobileFlowInfoRow;
import com.lkb.data.hbase.row.operators.mobile.MobileSmsInfoRow;
import com.lkb.data.hbase.row.operators.telecom.TelecomCallInfoRow;
import com.lkb.data.hbase.row.operators.telecom.TelecomFlowInfoRow;
import com.lkb.data.hbase.row.operators.telecom.TelecomSmsInfoRow;
import com.lkb.data.hbase.row.operators.unicom.UnicomCallInfoRow;
import com.lkb.data.hbase.row.operators.unicom.UnicomFlowInfoRow;
import com.lkb.data.hbase.row.operators.unicom.UnicomSmsInfoRow;
import java.util.List;
/**
* 三大运营商数据清洗
*
* @Author fengjunkai
* @Date 2019-05-14 11:42
*/
public class OperatorInfoService {
/**
* 保存并清洗用户详情信息
*
* @param spiderUserItemDataRow
* @param uuid
*/
public static void storageUserInfoItem(SpiderUserItemDataRow spiderUserItemDataRow, String uuid, boolean putAndCheck, boolean ka) {
if (putAndCheck)
SpiderUserItemDataService.putAndCheck(spiderUserItemDataRow);
else
SpiderUserItemDataService.put(spiderUserItemDataRow);
OperatorInfoCleaningService.cleaningAndSaveUserInfo(spiderUserItemDataRow, uuid, ka);
}
/**
* 保存并清洗移动通话详单
*
* @param mobileCallInfoRows
* @param uuid
*/
public static void storageMobileCallDetailInfos(List<MobileCallInfoRow> mobileCallInfoRows, String uuid, boolean putAndCheck, boolean ka) {
if (putAndCheck)
MobileCallInfoDataService.putAndCheck(mobileCallInfoRows);
else
MobileCallInfoDataService.put(mobileCallInfoRows);
OperatorInfoCleaningService.cleaningAndSaveMobileCallDetailInfo(mobileCallInfoRows, uuid, ka);
}
/**
* 保存并清洗移动短信详单
*
* @param mobileSmsInfoRows
* @param uuid
*/
public static void storageMobileSmsDetail(List<MobileSmsInfoRow> mobileSmsInfoRows, String uuid, boolean putAndCheck, boolean ka) {
if (putAndCheck)
MobileSmsInfoDataService.putAndCheck(mobileSmsInfoRows);
else
MobileSmsInfoDataService.put(mobileSmsInfoRows);
OperatorInfoCleaningService.cleaningAndSaveMobileSmsDetainInfo(mobileSmsInfoRows, uuid, ka);
}
/**
* 保存并清洗移动流量详单
*
* @param mobileFlowInfoRows
* @param uuid
*/
public static void storageMobileFlowDetailInfos(List<MobileFlowInfoRow> mobileFlowInfoRows, String uuid, boolean putAndCheck, boolean ka) {
if (putAndCheck)
MobileFlowInfoDataService.putAndCheck(mobileFlowInfoRows);
else
MobileFlowInfoDataService.put(mobileFlowInfoRows);
OperatorInfoCleaningService.cleaningAndSaveMobileFlowDetailInfo(mobileFlowInfoRows, uuid, ka);
}
/**
* 保存并清洗电信通话详单
*
* @param telecomCallInfoRows
* @param uuid
*/
public static void storageTelecomCallDetailInfos(List<TelecomCallInfoRow> telecomCallInfoRows, String uuid, boolean putAndCheck, boolean ka) {
if (putAndCheck)
TelecomCallInfoDataService.putAndCheck(telecomCallInfoRows);
else
TelecomCallInfoDataService.put(telecomCallInfoRows);
OperatorInfoCleaningService.cleaningAndSaveTelecomCallDetailInfo(telecomCallInfoRows, uuid, ka);
}
/**
* 保存并清洗电信短信详单
*
* @param telecomSmsInfoRows
* @param uuid
*/
public static void storageTelecomSmsDetailInfos(List<TelecomSmsInfoRow> telecomSmsInfoRows, String uuid, boolean putAndCheck, boolean ka) {
if (putAndCheck)
TelecomSmsInfoDataService.putAndCheck(telecomSmsInfoRows);
else
TelecomSmsInfoDataService.put(telecomSmsInfoRows);
OperatorInfoCleaningService.cleaningAndSaveTelecomSmsDetailInfo(telecomSmsInfoRows, uuid, ka);
}
/**
* 保存并清洗电信流量详单
*
* @param telecomFlowInfoRows
* @param uuid
*/
public static void storageTelecomFlowDetailInfos(List<TelecomFlowInfoRow> telecomFlowInfoRows, String uuid, boolean putAndCheck, boolean ka) {
if (putAndCheck)
TelecomFlowInfoDataService.putAndCheck(telecomFlowInfoRows);
else
TelecomFlowInfoDataService.put(telecomFlowInfoRows);
OperatorInfoCleaningService.cleaningAndSaveTelecomFlowDetailInfo(telecomFlowInfoRows, uuid, ka);
}
/**
* 保存并清洗联通通话详单
*
* @param unicomCallInfoRows
* @param uuid
*/
public static void storageUnicomCallDetailInfos(List<UnicomCallInfoRow> unicomCallInfoRows, String uuid, boolean putAndCheck, boolean ka) {
if (putAndCheck)
UnicomCallInfoDataService.putAndCheck(unicomCallInfoRows);
else
UnicomCallInfoDataService.put(unicomCallInfoRows);
OperatorInfoCleaningService.cleaningAndSaveUnicomCallDetailInfo(unicomCallInfoRows, uuid, ka);
}
/**
* 保存并清洗联通短信详单
*
* @param unicomSmsInfoRows
* @param uuid
*/
public static void storageUnicomSmsDetailInfos(List<UnicomSmsInfoRow> unicomSmsInfoRows, String uuid, boolean putAndCheck, boolean ka) {
if (putAndCheck)
UnicomSmsInfoDataService.putAndCheck(unicomSmsInfoRows);
else
UnicomSmsInfoDataService.put(unicomSmsInfoRows);
OperatorInfoCleaningService.cleaningAndSaveUnicomSmsDetailInfo(unicomSmsInfoRows, uuid, ka);
}
/**
* 保存并清洗联通流量详单
*
* @param unicomFlowInfoRows
* @param uuid
*/
public static void storageUnicomFlowDetailInfos(List<UnicomFlowInfoRow> unicomFlowInfoRows, String uuid, boolean putAndCheck, boolean ka) {
if (putAndCheck)
UnicomFlowInfoDataService.putAndCheck(unicomFlowInfoRows);
else
UnicomFlowInfoDataService.put(unicomFlowInfoRows);
OperatorInfoCleaningService.cleaningAndSaveUnicomFlowDetailInfo(unicomFlowInfoRows, uuid, ka);
}
/**
* 保存并清洗通话账单
*
* @param phoneBillDataRow
* @param uuid
*/
public static void storageBillInfos(PhoneBillDataRow phoneBillDataRow, String uuid, boolean putAndCheck, boolean ka) {
if (putAndCheck)
PhoneBillDataService.putAndCheck(phoneBillDataRow);
else
PhoneBillDataService.put(phoneBillDataRow);
OperatorInfoCleaningService.cleaningAndSavePhoneBillInfo(phoneBillDataRow, uuid, ka);
}
/**
* 保存并清洗手机充值记录
*
* @param phonePaymentDataRow
* @param uuid
*/
public static void storageRechargeInfos(PhonePaymentDataRow phonePaymentDataRow, String uuid, boolean putAndCheck, boolean ka) {
PhonePaymentDataService.put(phonePaymentDataRow);
OperatorInfoCleaningService.cleaningAndSaveRechargeInfo(phonePaymentDataRow, uuid, ka);
}
}
package cn.quantgroup.qgdataservice.service.tidb;
import cn.quantgroup.qgdataservice.constant.Constant;
import cn.quantgroup.qgdataservice.utils.*;
import com.alibaba.fastjson.JSON;
import com.google.common.base.Stopwatch;
import com.lkb.data.hbase.row.SpiderUserItemDataRow;
import com.lkb.data.hbase.row.operators.PhoneBillDataRow;
import com.lkb.data.hbase.row.operators.PhonePaymentDataRow;
import com.lkb.data.hbase.row.operators.mobile.MobileCallInfoRow;
import com.lkb.data.hbase.row.operators.mobile.MobileFlowInfoRow;
import com.lkb.data.hbase.row.operators.mobile.MobileSmsInfoRow;
import com.lkb.data.hbase.row.operators.telecom.TelecomCallInfoRow;
import com.lkb.data.hbase.row.operators.telecom.TelecomFlowInfoRow;
import com.lkb.data.hbase.row.operators.telecom.TelecomSmsInfoRow;
import com.lkb.data.hbase.row.operators.unicom.UnicomCallInfoRow;
import com.lkb.data.hbase.row.operators.unicom.UnicomFlowInfoRow;
import com.lkb.data.hbase.row.operators.unicom.UnicomSmsInfoRow;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @Author fengjunkai
* @Date 2019-05-28 14:08
*/
public class OperatorInfoCleaningService {
private static final Logger log = LoggerFactory.getLogger(OperatorInfoCleaningService.class);
/**
* 入网时间/手机余额/手机号
*
* @param spiderUserItemDataRow
* @param uuid
* @param ka
*/
public static void cleaningAndSaveUserInfo(SpiderUserItemDataRow spiderUserItemDataRow, String uuid, boolean ka) {
ThreadPoolExecutorUtils.getThreadPoolUtil().execute(new Runnable() {
@Override
public void run() {
Stopwatch stopwatch = Stopwatch.createStarted();
List<List<String>> sqls = new ArrayList<>();
try {
String userSource = spiderUserItemDataRow.getUserSource();
String phone = spiderUserItemDataRow.getPhone();
List<String> list = new ArrayList<>();
list.add(StringUtils.isBlank(uuid) ? null : uuid);
String realName = spiderUserItemDataRow.getRealName();
list.add(StringUtils.isBlank(realName) ? null : realName);
Date registerDate = spiderUserItemDataRow.getRegisterDate();
list.add(TimeUtils.registerDateFormat(registerDate, userSource, phone));
String idCard = spiderUserItemDataRow.getIdCard();
list.add(StringUtils.isBlank(idCard) ? null : idCard);
Float phoneRemain = spiderUserItemDataRow.getPhoneRemain();
list.add(NumberUtils.formatPhoneRemain(phoneRemain, userSource, phone));
list.add(PhoneUtils.getPhoneNoByRegx(phone, userSource, phone));
String addr = spiderUserItemDataRow.getAddr();
list.add(StringUtils.isBlank(addr) ? null : addr);
String merry = spiderUserItemDataRow.getMerry();
list.add(StringUtils.isBlank(merry) ? null : merry);
String cardType = spiderUserItemDataRow.getCardType();
list.add(StringUtils.isBlank(cardType) ? null : cardType);
String cardNo = spiderUserItemDataRow.getCardNo();
list.add(StringUtils.isBlank(cardNo) ? null : cardNo);
String sex = spiderUserItemDataRow.getSex();
list.add(StringUtils.isBlank(sex) ? null : sex);
String loginName = spiderUserItemDataRow.getLoginName();
list.add(StringUtils.isBlank(loginName) ? null : loginName);
list.add(StringUtils.isBlank(userSource) ? null : userSource);
list.add(TimeUtils.timeStamp2Date(String.valueOf(System.currentTimeMillis()), userSource, phone));
sqls.add(list);
} catch (Exception e) {
log.error("用户详情清洗异常, uuid: {} , ka: {} , param: {} ", uuid, ka, JSON.toJSONString(spiderUserItemDataRow), e);
}
int size = JdbcExecuters.batchExecute(sqls, Constant.SQL.USER_INFO_ITEM);
log.info("用户详情清洗数据完成, uuid: {} , 入库大小: {} , ka: {} , 耗时: {} ", uuid, size, ka, stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
}
});
}
public static void cleaningAndSaveMobileCallDetailInfo(List<MobileCallInfoRow> mobileCallInfoRows, String uuid, boolean ka) {
ThreadPoolExecutorUtils.getThreadPoolUtil().execute(new Runnable() {
@Override
public void run() {
Stopwatch stopwatch = Stopwatch.createStarted();
List<List<String>> sqls = new ArrayList<>();
for (int i = 0; i < mobileCallInfoRows.size(); i++) {
MobileCallInfoRow mobileCallInfoRow = mobileCallInfoRows.get(i);
try {
String phone = mobileCallInfoRow.getPhone();
List<String> list = new ArrayList<>();
String cTime = mobileCallInfoRow.getcTime();
list.add(StringUtils.isBlank(uuid) ? null : uuid);
list.add(TimeUtils.getCTimeFormat(cTime, Constant.OPERATOR.MOBILE_USERSOURCE_CALL, phone));
String tradeAddr = mobileCallInfoRow.getTradeAddr();
list.add(StringUtils.isBlank(tradeAddr) ? null : tradeAddr);
String tradeWay = mobileCallInfoRow.getTradeWay();
list.add(StringUtils.isBlank(tradeWay) ? null : tradeWay);
String tradeType = mobileCallInfoRow.getTradeType();
list.add(StringUtils.isBlank(tradeType) ? null : tradeType);
String receiverPhone = mobileCallInfoRow.getReceiverPhone();
list.add(PhoneUtils.getPhoneNoByRegx(receiverPhone, Constant.OPERATOR.MOBILE_USERSOURCE_CALL, phone));
String useTime = mobileCallInfoRow.getTradeTime();
list.add(NumberUtils.getUseTimeFormat(useTime, Constant.OPERATOR.MOBILE_USERSOURCE_CALL, phone));
String taocan = mobileCallInfoRow.getTaocan();
list.add(StringUtils.isBlank(taocan) ? null : taocan);
String onlinePay = mobileCallInfoRow.getOnlinePay();
list.add(StringUtils.isBlank(onlinePay) ? null : onlinePay);
list.add(PhoneUtils.getPhoneNoByRegx(phone, Constant.OPERATOR.MOBILE_USERSOURCE_CALL, phone));
String iscm = mobileCallInfoRow.getIscm();
list.add(StringUtils.isBlank(iscm) ? null : iscm);
list.add(TimeUtils.timeStamp2Date(String.valueOf(System.currentTimeMillis()), Constant.OPERATOR.MOBILE_USERSOURCE_CALL, phone));
sqls.add(list);
} catch (Exception e) {
log.error("移动通话详单清洗异常, uuid: {} , ka: {} , param: {} ", uuid, ka, JSON.toJSONString(mobileCallInfoRow), e);
}
}
int size = JdbcExecuters.batchExecute(sqls, Constant.SQL.MOBILE_DETAIL_INFOS);
log.info("移动通话详单清洗完成, uuid: {} , 原始大小: {} , 清洗后大小: {} , 入库大小: {} , ka: {} , 耗时: {} ", uuid, mobileCallInfoRows.size(), sqls.size(), size, ka, stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
}
});
}
public static void cleaningAndSaveMobileSmsDetainInfo(List<MobileSmsInfoRow> mobileSmsInfoRows, String uuid, boolean ka) {
ThreadPoolExecutorUtils.getThreadPoolUtil().execute(new Runnable() {
@Override
public void run() {
Stopwatch stopwatch = Stopwatch.createStarted();
List<List<String>> sqls = new ArrayList<>();
for (int i = 0; i < mobileSmsInfoRows.size(); i++) {
MobileSmsInfoRow mobileSmsInfoRow = mobileSmsInfoRows.get(i);
try {
List<String> list = new ArrayList<>();
list.add(StringUtils.isBlank(uuid) ? null : uuid);
String phone = mobileSmsInfoRow.getPhone();
String allPay = mobileSmsInfoRow.getAllPay();
list.add(StringUtils.isBlank(allPay) ? null : allPay);
String createTs = mobileSmsInfoRow.getCreateTs();
list.add(StringUtils.isBlank(createTs) ? null : createTs);
list.add(PhoneUtils.getPhoneNoByRegx(phone, Constant.OPERATOR.MOBILE_USERSOURCE_SMS, phone));
String receiverPhone = mobileSmsInfoRow.getReceiverPhone();
list.add(PhoneUtils.getPhoneNoByRegx(receiverPhone, Constant.OPERATOR.MOBILE_USERSOURCE_SMS, phone));
String sendAddr = mobileSmsInfoRow.getSentAddr();
list.add(StringUtils.isBlank(sendAddr) ? null : sendAddr);
String sendTime = mobileSmsInfoRow.getSentTime();
list.add(TimeUtils.getCTimeFormat(sendTime, Constant.OPERATOR.MOBILE_USERSOURCE_SMS, phone));
String tradeway = mobileSmsInfoRow.getTradeWay();
list.add(StringUtils.isBlank(tradeway) ? null : tradeway);
list.add(TimeUtils.timeStamp2Date(String.valueOf(System.currentTimeMillis()), Constant.OPERATOR.MOBILE_USERSOURCE_SMS, phone));
sqls.add(list);
} catch (Exception e) {
log.error("移动短信详单清洗异常, uuid: {} , ka: {} , param: {} ", uuid, ka, JSON.toJSONString(mobileSmsInfoRow), e);
}
}
int size = JdbcExecuters.batchExecute(sqls, Constant.SQL.MOBILE_SMS_DETAIL_INFOS);
log.info("移动短信详单清洗完成, uuid: {} , 原始大小: {} , 清洗后大小: {} , 入库大小: {} , ka: {} , 耗时: {} ", uuid, mobileSmsInfoRows.size(), sqls.size(), size, ka, stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
}
});
}
public static void cleaningAndSaveMobileFlowDetailInfo(List<MobileFlowInfoRow> mobileFlowInfoRows, String uuid, boolean ka) {
ThreadPoolExecutorUtils.getThreadPoolUtil().execute(new Runnable() {
@Override
public void run() {
Stopwatch stopwatch = Stopwatch.createStarted();
List<List<String>> sqls = new ArrayList<>();
for (int i = 0; i < mobileFlowInfoRows.size(); i++) {
MobileFlowInfoRow mobileFlowInfoRow = mobileFlowInfoRows.get(i);
try {
List<String> list = new ArrayList<>();
list.add(StringUtils.isBlank(uuid) ? null : uuid);
String phone = mobileFlowInfoRow.getPhone();
String cTime = mobileFlowInfoRow.getcTime();
list.add(TimeUtils.getCTimeFormat(cTime, Constant.OPERATOR.MOBILE_USERSOURCE_FLOW, phone));
String cheapService = mobileFlowInfoRow.getCheapService();
list.add(StringUtils.isBlank(cheapService) ? null : cheapService);
String communicationFees = mobileFlowInfoRow.getCommunicationFees();
list.add(StringUtils.isBlank(communicationFees) ? null : communicationFees);
String onlineTime = mobileFlowInfoRow.getOnlineTime();
list.add(NumberUtils.getUseTimeFormat(onlineTime, Constant.OPERATOR.MOBILE_USERSOURCE_FLOW, phone));
String onlineType = mobileFlowInfoRow.getOnlineType();
list.add(StringUtils.isBlank(onlineType) ? null : onlineType);
list.add(PhoneUtils.getPhoneNoByRegx(phone, Constant.OPERATOR.MOBILE_USERSOURCE_FLOW, phone));
String totalFlow = mobileFlowInfoRow.getTotalFlow();
list.add(NumberUtils.flowFormat(totalFlow, Constant.OPERATOR.MOBILE_USERSOURCE_FLOW, phone));
String tradeAddr = mobileFlowInfoRow.getTradeAddr();
list.add(StringUtils.isBlank(tradeAddr) ? null : tradeAddr);
list.add(TimeUtils.timeStamp2Date(String.valueOf(System.currentTimeMillis()), Constant.OPERATOR.MOBILE_USERSOURCE_FLOW, phone));
sqls.add(list);
} catch (Exception e) {
log.error("移动流量详单清洗异常, uuid: {} , ka: {} , param: {} ", uuid, ka, JSON.toJSONString(mobileFlowInfoRow), e);
}
}
int size = JdbcExecuters.batchExecute(sqls, Constant.SQL.MOBILE_FLOW_DETAIL_INFOS);
log.info("移动流量详单清洗完成, uuid: {} , 原始大小: {} , 清洗后大小: {} , 入库大小: {} , ka: {} , 耗时: {} ", uuid, mobileFlowInfoRows.size(), sqls.size(), size, ka, stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
}
});
}
public static void cleaningAndSaveTelecomCallDetailInfo(List<TelecomCallInfoRow> telecomCallInfoRows, String uuid, boolean ka) {
ThreadPoolExecutorUtils.getThreadPoolUtil().execute(new Runnable() {
@Override
public void run() {
Stopwatch stopwatch = Stopwatch.createStarted();
List<List<String>> sqls = new ArrayList<>();
for (int i = 0; i < telecomCallInfoRows.size(); i++) {
TelecomCallInfoRow telecomCallInfoRow = telecomCallInfoRows.get(i);
try {
List<String> list = new ArrayList<>();
list.add(StringUtils.isBlank(uuid) ? null : uuid);
String phoneNo = telecomCallInfoRow.getPhone();
String tradeType = telecomCallInfoRow.getTradeType();
list.add(StringUtils.isBlank(tradeType) ? null : tradeType);
String cTime = telecomCallInfoRow.getcTime();
list.add(TimeUtils.getCTimeFormat(cTime, Constant.OPERATOR.TELECOM_USERSOURCE_CALL, phoneNo));
String tradeTime = telecomCallInfoRow.getTradeTime();
list.add(NumberUtils.getUseTimeFormat(tradeTime, Constant.OPERATOR.TELECOM_USERSOURCE_CALL, phoneNo));
String callWay = telecomCallInfoRow.getCallWay();
list.add(StringUtils.isBlank(callWay) ? null : callWay);
String receiverPhone = telecomCallInfoRow.getReceiverPhone();
list.add(PhoneUtils.getPhoneNoByRegx(receiverPhone, Constant.OPERATOR.TELECOM_USERSOURCE_CALL, phoneNo));
String tradeAddr = telecomCallInfoRow.getTradeAddr();
list.add(StringUtils.isBlank(tradeAddr) ? null : tradeAddr);
String basePay = telecomCallInfoRow.getBasePay();
list.add(StringUtils.isBlank(basePay) ? null : basePay);
String longPay = telecomCallInfoRow.getLongPay();
list.add(StringUtils.isBlank(longPay) ? null : longPay);
String infoPay = telecomCallInfoRow.getInfoPay();
list.add(StringUtils.isBlank(infoPay) ? null : infoPay);
String otherPay = telecomCallInfoRow.getOtherPay();
list.add(StringUtils.isBlank(otherPay) ? null : otherPay);
String allPay = telecomCallInfoRow.getAllPay();
list.add(StringUtils.isBlank(allPay) ? null : allPay);
list.add(PhoneUtils.getPhoneNoByRegx(phoneNo, Constant.OPERATOR.TELECOM_USERSOURCE_CALL, phoneNo));
String iscm = telecomCallInfoRow.getIscm();
list.add(StringUtils.isBlank(iscm) ? null : iscm);
list.add(TimeUtils.timeStamp2Date(String.valueOf(System.currentTimeMillis()), Constant.OPERATOR.TELECOM_USERSOURCE_CALL, phoneNo));
sqls.add(list);
} catch (Exception e) {
log.error("电信通话详单清洗异常, uuid: {} , ka: {} , param: {} ", uuid, ka, JSON.toJSONString(telecomCallInfoRow), e);
}
}
int size = JdbcExecuters.batchExecute(sqls, Constant.SQL.TELECOM_DETAIL_INFOS);
log.info("电信通话详单清洗完成, uuid: {} , 原始大小: {} , 清洗后大小: {} , 入库大小: {} , ka: {} , 耗时: {} ", uuid, telecomCallInfoRows.size(), sqls.size(), size, ka, stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
}
});
}
public static void cleaningAndSaveTelecomSmsDetailInfo(List<TelecomSmsInfoRow> telecomSmsInfoRows, String uuid, boolean ka) {
ThreadPoolExecutorUtils.getThreadPoolUtil().execute(new Runnable() {
@Override
public void run() {
Stopwatch stopwatch = Stopwatch.createStarted();
List<List<String>> sqls = new ArrayList<>();
for (int i = 0; i < telecomSmsInfoRows.size(); i++) {
TelecomSmsInfoRow telecomSmsInfoRow = telecomSmsInfoRows.get(i);
try {
List<String> list = new ArrayList<>();
list.add(StringUtils.isBlank(uuid) ? null : uuid);
String phone = telecomSmsInfoRow.getPhone();
String allPay = telecomSmsInfoRow.getAllPay();
list.add(StringUtils.isBlank(allPay) ? null : allPay);
String businessType = telecomSmsInfoRow.getBusinessType();
list.add(StringUtils.isBlank(businessType) ? null : businessType);
String createTs = telecomSmsInfoRow.getCreateTs();
list.add(StringUtils.isBlank(createTs) ? null : createTs);
list.add(PhoneUtils.getPhoneNoByRegx(phone, Constant.OPERATOR.TELECOM_USERSOURCE_SMS, phone));
String receiverPhone = telecomSmsInfoRow.getReceiverPhone();
list.add(PhoneUtils.getPhoneNoByRegx(receiverPhone, Constant.OPERATOR.TELECOM_USERSOURCE_SMS, phone));
String sendTime = telecomSmsInfoRow.getSentTime();
list.add(TimeUtils.getCTimeFormat(sendTime, Constant.OPERATOR.TELECOM_USERSOURCE_SMS, phone));
list.add(TimeUtils.timeStamp2Date(String.valueOf(System.currentTimeMillis()), Constant.OPERATOR.TELECOM_USERSOURCE_SMS, phone));
sqls.add(list);
} catch (Exception e) {
log.error("电信短信详单解析异常, uuid: {} , ka: {} , param: {} ", uuid, ka, JSON.toJSONString(telecomSmsInfoRow), e);
}
}
int size = JdbcExecuters.batchExecute(sqls, Constant.SQL.TELECOM_SMS_DETAIL_INFOS);
log.info("电信短信详单清洗完成, uuid: {} , 原始大小: {} , 清洗后大小: {} , 入库大小: {} , ka: {} , 耗时: {} ", uuid, telecomSmsInfoRows.size(), sqls.size(), size, ka, stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
}
});
}
public static void cleaningAndSaveTelecomFlowDetailInfo(List<TelecomFlowInfoRow> telecomFlowInfoRows, String uuid, boolean ka) {
ThreadPoolExecutorUtils.getThreadPoolUtil().execute(new Runnable() {
@Override
public void run() {
Stopwatch stopwatch = Stopwatch.createStarted();
List<List<String>> sqls = new ArrayList<>();
for (int i = 0; i < telecomFlowInfoRows.size(); i++) {
TelecomFlowInfoRow telecomFlowInfoRow = telecomFlowInfoRows.get(i);
try {
List<String> list = new ArrayList<>();
list.add(StringUtils.isBlank(uuid) ? null : uuid);
String phone = telecomFlowInfoRow.getPhone();
String beginTime = telecomFlowInfoRow.getBeginTime();
list.add(TimeUtils.getCTimeFormat(beginTime, Constant.OPERATOR.TELECOM_USERSOURCE_FLOW, phone));
String busienss = telecomFlowInfoRow.getBusiness();
list.add(StringUtils.isBlank(busienss) ? null : busienss);
String fee = telecomFlowInfoRow.getFee();
list.add(StringUtils.isBlank(fee) ? null : fee);
String flow = telecomFlowInfoRow.getFlow();
list.add(NumberUtils.flowFormat(flow, Constant.OPERATOR.TELECOM_USERSOURCE_FLOW, phone));
String iscm = telecomFlowInfoRow.getIscm();
list.add(StringUtils.isBlank(iscm) ? null : iscm);
String location = telecomFlowInfoRow.getLocation();
list.add(StringUtils.isBlank(location) ? null : location);
String netType = telecomFlowInfoRow.getNetType();
list.add(StringUtils.isBlank(netType) ? null : netType);
list.add(PhoneUtils.getPhoneNoByRegx(phone, Constant.OPERATOR.TELECOM_USERSOURCE_FLOW, phone));
String tradeTime = telecomFlowInfoRow.getTradeTime();
list.add(NumberUtils.getUseTimeFormat(tradeTime, Constant.OPERATOR.TELECOM_USERSOURCE_FLOW, phone));
list.add(TimeUtils.timeStamp2Date(String.valueOf(System.currentTimeMillis()), Constant.OPERATOR.TELECOM_USERSOURCE_FLOW, phone));
sqls.add(list);
} catch (Exception e) {
log.error("电信流量详单清洗异常, uuid: {} , ka: {} , param: {} ", uuid, ka, JSON.toJSONString(telecomFlowInfoRow), e);
}
}
int size = JdbcExecuters.batchExecute(sqls, Constant.SQL.TELECOM_FLOW_DETAIL_INFOS);
log.info("电信流量详单清洗完成, uuid: {} , 原始大小: {} , 清洗后大小: {} , 入库大小: {} , ka: {} , 耗时: {} ", uuid, telecomFlowInfoRows.size(), sqls.size(), size, ka, stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
}
});
}
public static void cleaningAndSaveUnicomCallDetailInfo(List<UnicomCallInfoRow> unicomCallInfoRows, String uuid, boolean ka) {
ThreadPoolExecutorUtils.getThreadPoolUtil().execute(new Runnable() {
@Override
public void run() {
Stopwatch stopwatch = Stopwatch.createStarted();
List<List<String>> sqls = new ArrayList<>();
for (int i = 0; i < unicomCallInfoRows.size(); i++) {
UnicomCallInfoRow unicomCallInfoRow = unicomCallInfoRows.get(i);
try {
String phone = unicomCallInfoRow.getPhone();
List<String> list = new ArrayList<>();
list.add(StringUtils.isBlank(uuid) ? null : uuid);
String businessType = unicomCallInfoRow.getBusinessType();
list.add(StringUtils.isBlank(businessType) ? null : businessType);
String cTime = unicomCallInfoRow.getcTime();
list.add(TimeUtils.getCTimeFormat(cTime, Constant.OPERATOR.UNICOM_USERSOURCE_CALL, phone));
String tradeTime = unicomCallInfoRow.getTradeTime();
list.add(NumberUtils.getUseTimeFormat(tradeTime, Constant.OPERATOR.UNICOM_USERSOURCE_CALL, phone));
String callType = unicomCallInfoRow.getCallType();
list.add(StringUtils.isBlank(callType) ? null : callType);
String receiverPhone = unicomCallInfoRow.getReceiverPhone();
list.add(PhoneUtils.getPhoneNoByRegx(receiverPhone, Constant.OPERATOR.UNICOM_USERSOURCE_CALL, phone));
String tradeAddr = unicomCallInfoRow.getTradeAddr();
list.add(StringUtils.isBlank(tradeAddr) ? null : tradeAddr);
String tradeType = unicomCallInfoRow.getTradeType();
list.add(StringUtils.isBlank(tradeType) ? null : tradeType);
String basePay = unicomCallInfoRow.getBasePay();
list.add(StringUtils.isBlank(basePay) ? null : basePay);
String ldPay = unicomCallInfoRow.getLdPay();
list.add(StringUtils.isBlank(ldPay) ? null : ldPay);
String otherPay = unicomCallInfoRow.getOtherPay();
list.add(StringUtils.isBlank(otherPay) ? null : otherPay);
String totalPay = unicomCallInfoRow.getTotalPay();
list.add(StringUtils.isBlank(totalPay) ? null : totalPay);
list.add(PhoneUtils.getPhoneNoByRegx(phone, Constant.OPERATOR.UNICOM_USERSOURCE_CALL, phone));
String iscm = unicomCallInfoRow.getIscm();
list.add(StringUtils.isBlank(iscm) ? null : iscm);
String reductionPay = unicomCallInfoRow.getReductionPay();
list.add(StringUtils.isBlank(reductionPay) ? null : reductionPay);
list.add(TimeUtils.timeStamp2Date(String.valueOf(System.currentTimeMillis()), Constant.OPERATOR.UNICOM_USERSOURCE_CALL, phone));
sqls.add(list);
} catch (Exception e) {
log.error("联通通话详单清洗异常, uuid: {} , ka: {} , param: {} ", uuid, ka, JSON.toJSONString(unicomCallInfoRow), e);
}
}
int size = JdbcExecuters.batchExecute(sqls, Constant.SQL.UNICOM_DETAIL_INFOS);
log.info("联通通话详单清洗完成, uuid: {} , 原始大小: {} , 清洗后大小: {} , 入库大小: {} , ka: {} , 耗时: {} ", uuid, unicomCallInfoRows.size(), sqls.size(), size, ka, stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
}
});
}
public static void cleaningAndSaveUnicomSmsDetailInfo(List<UnicomSmsInfoRow> unicomSmsInfoRows, String uuid, boolean ka) {
ThreadPoolExecutorUtils.getThreadPoolUtil().execute(new Runnable() {
@Override
public void run() {
Stopwatch stopwatch = Stopwatch.createStarted();
List<List<String>> sqls = new ArrayList<>();
for (int i = 0; i < unicomSmsInfoRows.size(); i++) {
UnicomSmsInfoRow unicomSmsInfoRow = unicomSmsInfoRows.get(i);
try {
List<String> list = new ArrayList<>();
list.add(StringUtils.isBlank(uuid) ? null : uuid);
String phone = unicomSmsInfoRow.getPhone();
String allPay = unicomSmsInfoRow.getAllPay();
list.add(StringUtils.isBlank(allPay) ? null : allPay);
String createTs = unicomSmsInfoRow.getCreateTs();
list.add(StringUtils.isBlank(createTs) ? null : createTs);
list.add(PhoneUtils.getPhoneNoByRegx(phone, Constant.OPERATOR.UNICOM_USERSOURCE_SMS, phone));
String receiverPhone = unicomSmsInfoRow.getReceiverPhone();
list.add(StringUtils.isBlank(receiverPhone) ? null : receiverPhone);
String sendTime = unicomSmsInfoRow.getSentTime();
list.add(TimeUtils.getCTimeFormat(sendTime, Constant.OPERATOR.UNICOM_USERSOURCE_SMS, phone));
String tradeType = unicomSmsInfoRow.getTradeType();
list.add(StringUtils.isBlank(tradeType) ? null : tradeType);
list.add(TimeUtils.timeStamp2Date(String.valueOf(System.currentTimeMillis()), Constant.OPERATOR.UNICOM_USERSOURCE_SMS, phone));
sqls.add(list);
} catch (Exception e) {
log.error("联通短信详单清洗异常, uuid: {} , ka: {} , param: {} ", uuid, ka, JSON.toJSONString(unicomSmsInfoRow), e);
}
}
int size = JdbcExecuters.batchExecute(sqls, Constant.SQL.UNICOM_SMS_DETAIL_INFOS);
log.info("联通短信详单清洗完成, uuid: {} , 原始大小: {} , 清洗后大小: {} , 入库大小: {} , ka: {} , 耗时: {} ", uuid, unicomSmsInfoRows.size(), sqls.size(), size, ka, stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
}
});
}
public static void cleaningAndSaveUnicomFlowDetailInfo(List<UnicomFlowInfoRow> unicomFlowInfoRows, String uuid, boolean ka) {
ThreadPoolExecutorUtils.getThreadPoolUtil().execute(new Runnable() {
@Override
public void run() {
Stopwatch stopwatch = Stopwatch.createStarted();
List<List<String>> sqls = new ArrayList<>();
for (int i = 0; i < unicomFlowInfoRows.size(); i++) {
UnicomFlowInfoRow unicomFlowInfoRow = unicomFlowInfoRows.get(i);
try {
List<String> list = new ArrayList<>();
list.add(StringUtils.isBlank(uuid) ? null : uuid);
String phone = unicomFlowInfoRow.getPhone();
String allFlow = unicomFlowInfoRow.getAllFlow();
list.add(NumberUtils.flowFormat(allFlow, Constant.OPERATOR.UNICOM_USERSOURCE_FLOW, phone));
String allPay = unicomFlowInfoRow.getAllPay();
list.add(StringUtils.isBlank(allPay) ? null : allPay);
String createTs = unicomFlowInfoRow.getCreateTs();
list.add(StringUtils.isBlank(createTs) ? null : createTs);
list.add(PhoneUtils.getPhoneNoByRegx(phone, Constant.OPERATOR.UNICOM_USERSOURCE_FLOW, phone));
String startTime = unicomFlowInfoRow.getStartTime();
list.add(TimeUtils.getCTimeFormat(startTime, Constant.OPERATOR.UNICOM_USERSOURCE_FLOW, phone));
String tradeAddr = unicomFlowInfoRow.getTradeAddr();
list.add(StringUtils.isBlank(tradeAddr) ? null : tradeAddr);
String tradeType = unicomFlowInfoRow.getTradeType();
list.add(StringUtils.isBlank(tradeType) ? null : tradeType);
list.add(TimeUtils.timeStamp2Date(String.valueOf(System.currentTimeMillis()), Constant.OPERATOR.UNICOM_USERSOURCE_FLOW, phone));
sqls.add(list);
} catch (Exception e) {
log.error("联通流量详单清洗异常, uuid: {} , ka: {} , param: {} ", uuid, ka, JSON.toJSONString(unicomFlowInfoRow), e);
}
}
int size = JdbcExecuters.batchExecute(sqls, Constant.SQL.UNICOM_FLOW_DETAIL_INFOS);
log.info("联通流量详单清洗完成, uuid: {} , 原始大小: {} , 清洗后大小: {} , 入库大小: {} , ka: {} , 耗时: {} ", uuid, unicomFlowInfoRows.size(), sqls.size(), size, ka, stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
}
});
}
/**
* 手机号/账单月/账单金额
*
* @param phoneBillDataRow
* @param uuid
* @param ka
*/
public static void cleaningAndSavePhoneBillInfo(PhoneBillDataRow phoneBillDataRow, String uuid, boolean ka) {
ThreadPoolExecutorUtils.getThreadPoolUtil().execute(new Runnable() {
@Override
public void run() {
Stopwatch stopwatch = Stopwatch.createStarted();
List<List<String>> sqls = new ArrayList<>();
try {
String phone = phoneBillDataRow.getPhoneNo();
List<String> list = new ArrayList<>();
list.add(StringUtils.isBlank(uuid) ? null : uuid);
list.add(PhoneUtils.getPhoneNoByRegx(phone, Constant.OPERATOR.PHONE_CALL_BILL, phone));
Date date = phoneBillDataRow.getBillDay();
list.add(TimeUtils.billDayFormat(date, Constant.OPERATOR.PHONE_CALL_BILL, phone));
String name = phoneBillDataRow.getName();
list.add(StringUtils.isBlank(name) ? null : name);
float amount = phoneBillDataRow.getAmount();
list.add(NumberUtils.amountToFormat(String.valueOf(amount), Constant.OPERATOR.PHONE_CALL_BILL, phone));
String dependCycle = phoneBillDataRow.getDependCycle();
list.add(StringUtils.isBlank(dependCycle) ? null : dependCycle);
String billPackage = phoneBillDataRow.getBillPackage();
list.add(StringUtils.isBlank(billPackage) ? null : billPackage);
String content = phoneBillDataRow.getContent();
list.add(StringUtils.isBlank(content) ? null : content);
list.add(TimeUtils.timeStamp2Date(String.valueOf(System.currentTimeMillis()), Constant.OPERATOR.PHONE_CALL_BILL, phone));
sqls.add(list);
} catch (Exception e) {
log.error("通话账单清洗异常, uuid: {} , ka: {} , param: {} ", uuid, ka, JSON.toJSONString(phoneBillDataRow), e);
}
int size = JdbcExecuters.batchExecute(sqls, Constant.SQL.PHONE_BILL_INFOS);
log.info("通话账单清洗完成, uuid: {} , 入库大小: {} , ka: {} , 耗时: {} ", uuid, size, ka, stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
}
});
}
/**
*
* @param phonePaymentDataRow
* @param uuid
* @param ka
*/
public static void cleaningAndSaveRechargeInfo(PhonePaymentDataRow phonePaymentDataRow, String uuid, boolean ka) {
ThreadPoolExecutorUtils.getThreadPoolUtil().execute(new Runnable() {
@Override
public void run() {
Stopwatch stopwatch = Stopwatch.createStarted();
List<List<String>> sqls = new ArrayList<>();
try {
List<String> list = new ArrayList<>();
String phone = phonePaymentDataRow.getPhone();
list.add(StringUtils.isBlank(uuid) ? null : uuid);
list.add(phone);
Double payAmount = phonePaymentDataRow.getPayAmount();
list.add(NumberUtils.amountToFormat(String.valueOf(payAmount), Constant.OPERATOR.PHONE_RECHARGE_RECORDS, phone));
list.add(TimeUtils.payTimeToFormat(phonePaymentDataRow.getPayTime(), Constant.OPERATOR.PHONE_RECHARGE_RECORDS, phone));
String payMethod = phonePaymentDataRow.getPayMethod();
list.add(StringUtils.isBlank(payMethod) ? null : payMethod);
String payChannel = phonePaymentDataRow.getPayChannel();
list.add(StringUtils.isBlank(payChannel) ? null : payChannel);
String paySpare = phonePaymentDataRow.getPaySpare();
list.add(StringUtils.isBlank(paySpare) ? null : paySpare);
list.add(TimeUtils.timeStamp2Date(String.valueOf(System.currentTimeMillis()), Constant.OPERATOR.PHONE_RECHARGE_RECORDS, phone));
sqls.add(list);
} catch (Exception e) {
log.error("充值记录清洗异常, uuid: {} , ka: {} , param: {} ", uuid, ka, JSON.toJSONString(phonePaymentDataRow));
}
int size = JdbcExecuters.batchExecute(sqls, Constant.SQL.PHONE_RECHARGE_INFOS);
log.info("充值记录清洗完成, uuid: {} , 入库大小: {} , ka: {} , 耗时: {} ", uuid, size, ka, stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
}
});
}
}
package cn.quantgroup.qgdataservice.utils;
import cn.quantgroup.qgdataservice.constant.Constant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static cn.quantgroup.qgdataservice.config.datasource.TidbDataSource.HIK_DATA_SOURCE;
/**
* Created by renfeng on 2019/4/22.
*/
public class JdbcExecuters {
private static final Logger log = LoggerFactory.getLogger(JdbcExecuters.class);
/**
* 批量插入
*
* @param sql
*/
public static void batchUpdateExecute(String sql, String userSource, String phone) {
Connection conn = null;
Statement st = null;
try {
conn = HIK_DATA_SOURCE.dataSource.getConnection();
st = conn.createStatement();
try {
st.executeUpdate(sql);
} catch (SQLException e) {
log.error("执行sql异常, sql: {} ", sql, e);
} catch (Exception e) {
log.error("未知异常, sql: {} ", sql, e);
}
} catch (Exception e) {
log.error("执行sqlException异常, sqls: {} ", sql, e);
} finally {
close(conn, st, null);
}
}
/**
* 批量插入
*
* @param sqls
*/
public static void batchUpdateExecute(List<String> sqls) {
Connection conn = null;
Statement st = null;
try {
conn = HIK_DATA_SOURCE.dataSource.getConnection();
st = conn.createStatement();
conn.setAutoCommit(false);
for (String sql : sqls) {
try {
st.executeUpdate(sql);
} catch (SQLException e) {
log.error("执行sql异常, sql: {} ", sql, e);
} catch (Exception e) {
log.error("未知异常, sql: {} ", sql, e);
}
}
conn.commit();
} catch (Exception e) {
log.error("执行sqlException异常, sqls: {} ", sqls, e);
} finally {
close(conn, st, null);
}
}
public static int batchExecute(List<List<String>> list, String sql) {
Connection conn = null;
PreparedStatement ps = null;
AtomicInteger atomicInteger = new AtomicInteger();
try {
conn = HIK_DATA_SOURCE.dataSource.getConnection();
ps = conn.prepareStatement(sql);
conn.setAutoCommit(false);
for (int i = 0; i < list.size(); i++) {
List<String> params = list.get(i);
for (int j = 0; j < params.size(); j++) {
ps.setString(j + 1, params.get(j));
}
ps.addBatch();
atomicInteger.getAndIncrement();
if (i > 0 && i % Constant.BATCH.BATCH_SIZE == 0) {
ps.executeBatch();
conn.commit();
ps.clearBatch();
}
}
ps.executeBatch();
conn.commit();
} catch (Exception e) {
log.error("清洗数据批量插入数据异常", e);
} finally {
close(conn, ps, null);
}
return atomicInteger.get();
}
// public static void batchExecute(List<String> list) {
// Connection conn = null;
// PreparedStatement ps = null;
// try {
//
//// String sql = "INSERT IGNORE INTO comservice_i_unicom_call_info (uuid,businessType,cTime,tradeTime,callType,receiverPhone,tradeAddr,tradeType,basePay,ldPay,otherPay,totalPay,phone,iscm,reductionPay,timestamp) " +
//// "values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
//
// conn = HIK_DATA_SOURCE.dataSource.getConnection();
// ps = conn.prepareStatement("");
// conn.setAutoCommit(false);
// for (int i = 0; i < list.size(); i++) {
// ps.addBatch(list.get(i));
// if (i>0 && i % Constant.BATCH.BATCH_SIZE == 0) {
// ps.executeBatch();
// ps.clearBatch();
// }
// }
// ps.executeBatch();
// conn.commit();
// ps.clearBatch();
//
//
// } catch (Exception e) {
// log.error("清洗数据批量插入数据异常", e);
// } finally {
// close(conn, ps, null);
// }
// }
/**
* 关闭资源
*
* @param conn
* @param st
* @param rs
*/
private static void close(Connection conn, Statement st, ResultSet rs) {
try {
if (rs != null)
rs.close();
if (st != null)
st.close();
if (conn != null)
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
package cn.quantgroup.qgdataservice.utils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* @Author fengjunkai
* @Date 2019-05-14 17:51
*/
public class NumberUtils {
private static final Logger log = LoggerFactory.getLogger(NumberUtils.class);
public static String formatPhoneRemain(Float remain, String userSource, String phone) {
if (Objects.isNull(remain)) return null;
try {
BigDecimal bigDecimal = new BigDecimal(remain);
BigDecimal phoneRemain = bigDecimal.setScale(2, BigDecimal.ROUND_HALF_UP);
return String.valueOf(phoneRemain);
} catch (Exception e) {
log.error("运营商 ( {} ) 解析用户余额异常, phone: {} , enterStr: {} ", userSource, phone, remain, e);
return null;
}
}
public static String getUseTimeFormat(String time, String userSource, String phone) {
if (StringUtils.isBlank(time)) {
return null;
}
if (StringUtils.startsWith(time, "-")) {
return null;
}
try {
long allTime = 0, hour2Second = 0, min2Second = 0, second = 0;
if (time.contains(":")) {
if (time.length() == 7 && time.indexOf(":") == 1) {
time = "0" + time;
}
if (time.length() == 8) {
second = new Long(time.substring(6, 8));
min2Second = new Long(time.substring(3, 5));
hour2Second = new Long(time.substring(0, 2));
} else if (time.split(":").length == 3) {
String[] timeStrs = time.split(":");
second = new Long(timeStrs[2]);
min2Second = new Long(timeStrs[1]);
hour2Second = new Long(timeStrs[0]);
}
allTime = hour2Second * 3600 + min2Second * 60 + second;
return String.valueOf(allTime);
} else {
if (time.contains("时")) {
if (time.contains("小时"))
hour2Second = Long.parseLong(time.split("小时")[0].trim()) * 60 * 60;
else
hour2Second = Long.parseLong(time.split("时")[0].trim()) * 60 * 60;
}
if (time.contains("分")) {
if (time.contains("时")) {
min2Second = Long.parseLong(subStr("时", "分", time)) * 60;
} else {
String strs[] = time.split("分");
min2Second = Long.parseLong(strs[0].trim()) * 60;
try {
if (strs.length > 1 && !strs[1].contains("秒") && StringUtils.isNumeric(strs[1])) {
second = Long.parseLong(strs[1].trim());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
if (time.contains("秒")) {
if (time.contains("分")) {
if (time.contains("分钟"))
second = Long.parseLong(subStr("分钟", "秒", time).trim());
else
second = Long.parseLong(subStr("分", "秒", time).trim());
} else if (time.contains("时")) {
if (time.contains("小时"))
second = Long.parseLong(subStr("小时", "秒", time).trim());
else
second = Long.parseLong(subStr("时", "秒", time).trim());
} else {
second = Long.parseLong(time.split("秒")[0].trim());
}
}
allTime = hour2Second + min2Second + second;
if (allTime == 0) {
if (time != null) {
try {
if (time.contains(".")) {
time = time.substring(0, time.indexOf("."));
}
allTime = Long.parseLong(time.trim().replaceAll("\\D+?", ""));
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
}
return String.valueOf(allTime);
}
} catch (Exception e) {
log.error("运营商 ( {} ) 解析用户通话使用时间异常, phone: {} , enterStr: {} ", userSource, phone, time, e);
}
return time;
}
public static String flowFormat(String flow_temp, String userSource, String phone) {
try {
if (StringUtils.isBlank(flow_temp) || StringUtils.startsWith(flow_temp, "-")) {
return null;
}
BigDecimal mb2Kb = new BigDecimal("0");
BigDecimal gb2Kb = new BigDecimal("0");
BigDecimal kb = new BigDecimal("0");
BigDecimal thou = new BigDecimal("1024");
flow_temp = flow_temp.replaceAll(" ", "").replace("&nbsp;", "").replace("(", "").replace(")", "").replace("(", "").replace(")", "");
// 全角空格这么霸道...by Pat。Liu
flow_temp = flow_temp.replaceAll(" ", "");
flow_temp = flow_temp.replaceAll("-", "");
if (flow_temp.contains("GB") || flow_temp.contains("MB") || flow_temp.contains("KB")) {
if (flow_temp.contains("GB")) {
gb2Kb = new BigDecimal(flow_temp.split("GB")[0].trim()).multiply(thou).multiply(thou);
}
if (flow_temp.contains("MB")) {
if (flow_temp.contains("GB"))
mb2Kb = new BigDecimal(flow_temp.split("GB")[1].split("MB")[0].trim()).multiply(thou);
else
mb2Kb = new BigDecimal(flow_temp.split("MB")[0].trim()).multiply(thou);
}
if (flow_temp.contains("KB")) {
if (flow_temp.contains("MB")) {
kb = new BigDecimal(subStr("MB", "KB", flow_temp).trim());
} else if (flow_temp.contains("GB")) {
kb = new BigDecimal(subStr("GB", "KB", flow_temp).trim());
} else {
kb = new BigDecimal(flow_temp.split("KB")[0].trim());
}
}
} else if (isNumeric(flow_temp)) {
BigDecimal bigDecimal = new BigDecimal(flow_temp).setScale(2, BigDecimal.ROUND_HALF_UP);
return String.valueOf(bigDecimal);
} else {
if (flow_temp.contains("G")) {
gb2Kb = new BigDecimal(flow_temp.split("G")[0].trim()).multiply(thou).multiply(thou);
}
if (flow_temp.contains("M")) {
if (flow_temp.contains("G"))
mb2Kb = new BigDecimal(flow_temp.split("G")[1].split("M")[0].trim()).multiply(thou);
else
mb2Kb = new BigDecimal(flow_temp.split("M")[0].trim()).multiply(thou);
}
if (flow_temp.contains("K")) {
if (flow_temp.contains("M")) {
kb = new BigDecimal(subStr("M", "K", flow_temp).trim());
} else if (flow_temp.contains("G")) {
kb = new BigDecimal(subStr("G", "K", flow_temp).trim());
} else {
kb = new BigDecimal(flow_temp.split("K")[0].trim());
}
}
}
BigDecimal d = gb2Kb.add(mb2Kb).add(kb).setScale(2, BigDecimal.ROUND_HALF_UP);
return String.valueOf(d);
} catch (Exception e) {
log.error("运营商 ( {} ) 解析用户使用总流量异常, phone: {} , enterStr: {} ", userSource, phone, flow_temp, e);
return null;
}
}
public static String amountToFormat(String amount, String userSource, String phone) {
try {
String amountTmep = amount;
if (isNumeric(amountTmep)) {
if (amountTmep.startsWith("-") || amount.contains("E") || amount.contains("e")) {
return new BigDecimal(0.00).setScale(2, BigDecimal.ROUND_HALF_UP).toString();
}
return new BigDecimal(amountTmep).setScale(2, BigDecimal.ROUND_HALF_UP).toString();
}
} catch (Exception e) {
log.error("运营商 ( {} ) 解析用户账单当月费用异常, phone: {} , enterStr: {} ", userSource, phone, amount, e);
}
return amount;
}
public static String subStr(String stext, String etext, String text) {
int sindex = text.indexOf(stext);
if (sindex >= 0) {
int eindex = text.indexOf(etext, sindex);
if (eindex >= 0) {
String ctext = text.substring(sindex + stext.length(), eindex);
return ctext;
}
}
return "";
}
public static boolean isNumeric(String str) {
Pattern pattern = Pattern.compile("-?[0-9]+(.[0-9]+)?");
Matcher isNum = pattern.matcher(str);
if (!isNum.matches()) {
return false;
}
return true;
}
}
package cn.quantgroup.qgdataservice.utils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.regex.Pattern;
/**
* @Author fengjunkai
* @Date 2019-05-14 18:08
*/
public class PhoneUtils {
private static final Logger log = LoggerFactory.getLogger(PhoneUtils.class);
public static String getPhoneNoByRegx(String enterStr, String userSource, String phone) {
if (StringUtils.isBlank(enterStr)) {
return null;
}
try {
String phoneNo = Pattern.compile("[^0-9]").matcher(enterStr).replaceAll("");
if (StringUtils.startsWith(phoneNo, "86125832")) {
return phoneNo.replaceFirst("86125832", "");
} else if (StringUtils.startsWith(phoneNo, "0086125832")) {
return StringUtils.replaceFirst(phoneNo, "0086125832", "");
} else if (13 == phoneNo.length() && StringUtils.startsWith(phoneNo, "86")) {
return phoneNo.replaceFirst("86", "");
} else if (13 == phoneNo.length() && StringUtils.startsWith(phoneNo, "001")) {
return phoneNo.replaceFirst("00", "");
} else if (StringUtils.startsWith(phoneNo, "+86")) {
return phoneNo.replaceFirst("\\+86", "");
} else if (StringUtils.startsWith(phoneNo, "0086")) {
return phoneNo.replaceFirst("0086", "");
}
} catch (Exception e) {
log.error("运营商 ( {} ) 解析用户余额异常, phone: {} , enterStr: {} ", userSource, phone, enterStr, e);
}
return enterStr;
}
}
package cn.quantgroup.qgdataservice.utils;
import java.util.List;
/**
* @Author fengjunkai
* @Date 2019-05-14 18:18
*/
public class SqlUtils {
public static String getExecuteSql(List<String> list, String sqlStr) {
String formatStr = "";
for (int i = 0; i < list.size(); i++) {
String string = list.get(i);
if (string == null) {
formatStr += (string + (i == list.size() - 1 ? "" : ","));
} else {
formatStr += ("'" + string + "'" + (i == list.size() - 1 ? "" : ","));
}
}
String executeSql = String.format(sqlStr, formatStr);
return executeSql;
}
}
package cn.quantgroup.qgdataservice.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorUtils {
private static final Logger log = LoggerFactory.getLogger(ThreadPoolExecutorUtils.class);
private static ThreadPoolExecutorUtils threadPool = new ThreadPoolExecutorUtils();
private ThreadPoolExecutor executor = null;
private ThreadPoolExecutorUtils() {
executor = new ThreadPoolExecutor(50, 200, 50, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(20));
}
public static ThreadPoolExecutorUtils getThreadPoolUtil() {
return threadPool;
}
public void execute(Runnable r) {
try {
int activeCount = executor.getActiveCount();
int size = executor.getQueue().size();
log.info("数据清洗线程池活跃数: {} , queueSize: {} ", activeCount, size);
} catch (Exception e) {
log.info("TASK_POOL KA Exception : {}", e);
}
executor.execute(r);
}
public void shutDown() {
executor.shutdown();
}
}
package cn.quantgroup.qgdataservice.utils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Objects;
/**
* 数据清洗时间工具类
*
* @Author fengjunkai
* @Date 2019-05-14 16:52
*/
public class TimeUtils {
private static final Logger log = LoggerFactory.getLogger(TimeUtils.class);
private static final String ERROR_TIME = "1960-01-01 00:00:00";
public static String registerDateFormat(Date enterDate, String userSource, String phone) {
if (Objects.isNull(enterDate)) {
return null;
}
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
if (timeCompar(enterDate, enterDate)) return null;
String outDate = simpleDateFormat.format(enterDate);
if (!outDate.startsWith("1") && !outDate.startsWith("2")) return null;
return outDate;
} catch (Exception e) {
log.error("运营商 ( {} ) 解析用户注册时间异常, phone: {} , enterStr: {} ", userSource, phone, enterDate, e);
return null;
}
}
public static String registerDateFormat(String enterStr, String userSource, String phone) {
if (StringUtils.isBlank(enterStr)) {
return null;
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
Date errorDate = sdf.parse(ERROR_TIME);
if (!enterStr.startsWith("1") && !enterStr.startsWith("2")) { //将YYYY第一个Y不是1或2的错误数据,填为空
return null;
}
if (isLongDate(enterStr, userSource, phone)) { //时间戳(1590909284321)这种,转为YYYY-MM-DD HH:mm:ss
String enterTime = sdf.format(new Date(Long.valueOf(enterStr)));
Date enterDate = sdf.parse(enterTime);
if (timeCompar(enterDate, errorDate)) return null;
return enterTime;
}
if (enterStr.contains("年") && enterStr.contains("月") && enterStr.contains("秒")) { //时间带年/月的这种,比如“2019年1月1日”,转为2019-01-01 00:00:00
String tempTime = enterStr.replaceAll("年", "-").replaceAll("月", "-")
.replaceAll("日", "").replaceAll("时", ":").replaceAll("分", ":")
.replaceAll("秒", "");
String time = sdf.format(tempTime);
if (timeCompar(sdf.parse(time), errorDate)) return null;
return time;
}
Date date = sdf.parse(enterStr);
if (timeCompar(date, errorDate)) return null;
return enterStr;
} catch (Exception e) {
log.error("运营商 ( {} ) 解析用户注册时间异常, phone: {} , enterStr: {} ", userSource, phone, enterStr, e);
}
return enterStr;
}
public static String getCTimeFormat(String cTime, String userSource, String phone) {
if (StringUtils.isBlank(cTime)) {
return null;
}
if (!cTime.startsWith("1") && !cTime.startsWith("2")) {
return null;
}
String errorTime = "1960-01-01 00:00:00";
SimpleDateFormat simpleDateFormat1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
Date errorDate = simpleDateFormat1.parse(errorTime);
int cTimeLength = cTime.length();
String cTimeStr = cTime.trim();
if (19 == cTimeLength) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = simpleDateFormat.parse(cTimeStr);
if (date.compareTo(errorDate) <= 0 || date.compareTo(new Date()) > 0) {
return null;
}
return cTimeStr;
} else if (20 == cTimeLength && cTimeStr.contains("年") && cTimeStr.contains("分")) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy年MM月dd日HH时mm分ss秒");
Date date = simpleDateFormat.parse(cTimeStr);
if (date.compareTo(errorDate) <= 0 || date.compareTo(new Date()) > 0) {
return null;
}
SimpleDateFormat simpleDateFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return simpleDateFormat2.format(date);
} else if (21 == cTimeLength && cTimeStr.contains("年") && cTimeStr.contains("分")) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy年MM月dd日 HH时mm分ss秒");
Date date = simpleDateFormat.parse(cTimeStr);
if (date.compareTo(errorDate) <= 0 || date.compareTo(new Date()) > 0) {
return null;
}
SimpleDateFormat simpleDateFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return simpleDateFormat2.format(cTimeStr);
} else if (13 == cTimeLength && isLongDate(cTimeStr, userSource, phone)) { //1590909284321
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = new Date(Long.valueOf(cTimeStr));
if (date.compareTo(errorDate) <= 0 || date.compareTo(new Date()) > 0) {
return null;
}
return simpleDateFormat.format(date);
}
} catch (ParseException e) {
log.error("运营商详单 ( {} ) 解析用户通话开始时间异常, phone: {} , enterStr: {} ", userSource, phone, cTime, e);
}
return cTime;
}
public static String billDayFormat(Date enterDate, String userSource, String phone) {
if (Objects.isNull(enterDate)) {
return null;
}
String dateFormatStr = "";
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMM");
dateFormatStr = simpleDateFormat.format(enterDate);
if (!dateFormatStr.startsWith("1") && !dateFormatStr.startsWith("2")) {
return null;
}
return dateFormatStr;
} catch (Exception e) {
log.error("运营商详单 ( {} ) 解析用户账单开始时间异常, phone: {} , enterStr: {} ", userSource, phone, enterDate, e);
return null;
}
//
// if (StringUtils.isBlank(dateFormatStr)) {
// return null;
// }
// String dateStr = dateFormatStr.replaceAll("年", "").replaceAll("月", "").replaceAll("日", "");
// if (dateStr.length() == 6) {
// try {
// SimpleDateFormat sdf = new SimpleDateFormat("yyyyMM");
// sdf.parse(dateStr);
// return dateStr;
// } catch (Exception e) {
//// System.out.println("======" + dataType + "异常======" + dateFormatStr);
// }
// return dateFormatStr;
// }
//
// if (isLongDate(dateFormatStr, userSource, phone)) {
// try {
// Date date = new Date(Long.valueOf(dateFormatStr));
// SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMM");
// return simpleDateFormat.format(date);
// } catch (Exception e) {
//// System.out.println("======" + dataType + "时间戳转日期yyyyMM异常======" + dateFormatStr);
// return dateFormatStr;
// }
// }
//
// try {
// SimpleDateFormat sdf = new SimpleDateFormat("MMM dd,yyyy HH:mm:ss a", Locale.ENGLISH);
// SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMM");
// return simpleDateFormat.format(sdf.parse(dateFormatStr));
// } catch (ParseException e) {
//// System.out.printf("======" + dataType + "异常======" + dateFormatStr);
// }
// return dateFormatStr;
}
public static String payTimeToFormat(Date payTime, String userSource, String phone) {
String payDate = "";
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
payDate = simpleDateFormat.format(payTime);
if (!payDate.startsWith("1") && !payDate.startsWith("2")) {
return null;
} else {
SimpleDateFormat simpleDateFormat1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date errDate = simpleDateFormat1.parse(ERROR_TIME);
return payTime.compareTo(errDate) > 0 && payTime.compareTo(new Date()) <= 0 ? payDate : null;
}
} catch (Exception e) {
log.error("运营商 ( {} ) 解析用户缴费时间异常, phone: {} , enterStr: {} ", userSource, phone, payTime, e);
}
return null;
}
public static String timeStamp2Date(String enterStr, String userSource, String phone) {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf.format(new Date(Long.valueOf(enterStr)));
} catch (Exception e) {
log.error("运营商 ( {} ) 解析用户注册时间异常, phone: {} , enterStr: {} ", userSource, phone, enterStr, e);
}
return null;
}
private static boolean timeCompar(Date enterDate, Date errorDate) {
if (enterDate.compareTo(errorDate) < 0) { //将小于1960-01-01 00:00:00的错误数据,填为空
return true;
}
if (new Date().compareTo(enterDate) < 0) { //大于授权日的入网时间,填为空 当前时间
return true;
}
return false;
}
private static boolean isLongDate(String enterStr, String userSource, String phone) {
if (StringUtils.length(enterStr) == 13) {
try {
Long.valueOf(enterStr);
return true;
} catch (Exception e) {
}
}
return false;
}
public static void main(String[] args) {
// List<PhoneBillDataRow> phoneBillDataRows = PhoneBillDataService.get("18612632691");
// System.out.println(JSON.toJSONString(phoneBillDataRows));
Date date = new Date();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMM");
System.out.println(simpleDateFormat.format(date));
}
}
package cn.quantgroup.qgdataservice;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class QgDataServiceApplicationTests {
@Test
public void contextLoads() {
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment