Unverified Commit c4bf7775 authored by wanda1416's avatar wanda1416 Committed by GitHub

Merge branch 'master' into master

parents de093d73 a301cf5c
...@@ -51,12 +51,13 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N ...@@ -51,12 +51,13 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N
| | Phoenix5.x | √ | √ |[](https://github.com/alibaba/DataX/blob/master/hbase20xsqlreader/doc/hbase20xsqlreader.md)[](https://github.com/alibaba/DataX/blob/master/hbase20xsqlwriter/doc/hbase20xsqlwriter.md)| | | Phoenix5.x | √ | √ |[](https://github.com/alibaba/DataX/blob/master/hbase20xsqlreader/doc/hbase20xsqlreader.md)[](https://github.com/alibaba/DataX/blob/master/hbase20xsqlwriter/doc/hbase20xsqlwriter.md)|
| | MongoDB | √ | √ |[](https://github.com/alibaba/DataX/blob/master/mongoreader/doc/mongoreader.md)[](https://github.com/alibaba/DataX/blob/master/mongowriter/doc/mongowriter.md)| | | MongoDB | √ | √ |[](https://github.com/alibaba/DataX/blob/master/mongoreader/doc/mongoreader.md)[](https://github.com/alibaba/DataX/blob/master/mongowriter/doc/mongowriter.md)|
| | Hive | √ | √ |[](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md)[](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md)| | | Hive | √ | √ |[](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md)[](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md)|
| | Cassandra | √ | √ |[](https://github.com/alibaba/DataX/blob/master/cassandrareader/doc/cassandrareader.md)[](https://github.com/alibaba/DataX/blob/master/cassandrawriter/doc/cassandrawriter.md)|
| 无结构化数据存储 | TxtFile | √ | √ |[](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md)[](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md)| | 无结构化数据存储 | TxtFile | √ | √ |[](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md)[](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md)|
| | FTP | √ | √ |[](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md)[](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md)| | | FTP | √ | √ |[](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md)[](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md)|
| | HDFS | √ | √ |[](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md)[](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md)| | | HDFS | √ | √ |[](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md)[](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md)|
| | Elasticsearch | | √ |[](https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md)| | | Elasticsearch | | √ |[](https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md)|
| 时间序列数据库 | OpenTSDB | √ | |[](https://github.com/alibaba/DataX/blob/master/opentsdbreader/doc/opentsdbreader.md)| | 时间序列数据库 | OpenTSDB | √ | |[](https://github.com/alibaba/DataX/blob/master/opentsdbreader/doc/opentsdbreader.md)|
| | TSDB | | √ |[](https://github.com/alibaba/DataX/blob/master/tsdbwriter/doc/tsdbhttpwriter.md)| | | TSDB | √ | √ |[](https://github.com/alibaba/DataX/blob/master/tsdbreader/doc/tsdbreader.md)[](https://github.com/alibaba/DataX/blob/master/tsdbwriter/doc/tsdbhttpwriter.md)|
# 我要开发新的插件 # 我要开发新的插件
请点击:[DataX插件开发宝典](https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md) 请点击:[DataX插件开发宝典](https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md)
......
...@@ -84,8 +84,8 @@ ...@@ -84,8 +84,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.8</source> <source>${jdk-version}</source>
<target>1.8</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -94,8 +94,8 @@ ...@@ -94,8 +94,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
# CassandraReader 插件文档
___
## 1 快速介绍
CassandraReader插件实现了从Cassandra读取数据。在底层实现上,CassandraReader通过datastax的java driver连接Cassandra实例,并执行相应的cql语句将数据从cassandra中SELECT出来。
## 2 实现原理
简而言之,CassandraReader通过java driver连接到Cassandra实例,并根据用户配置的信息生成查询SELECT CQL语句,然后发送到Cassandra,并将该CQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。
对于用户配置Table、Column的信息,CassandraReader将其拼接为CQL语句发送到Cassandra。
## 3 功能说明
### 3.1 配置样例
* 配置一个从Cassandra同步抽取数据到本地的作业:
```
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "cassandrareader",
"parameter": {
"host": "localhost",
"port": 9042,
"useSSL": false,
"keyspace": "test",
"table": "datax_src",
"column": [
"textCol",
"blobCol",
"writetime(blobCol)",
"boolCol",
"smallintCol",
"tinyintCol",
"intCol",
"bigintCol",
"varintCol",
"floatCol",
"doubleCol",
"decimalCol",
"dateCol",
"timeCol",
"timeStampCol",
"uuidCol",
"inetCol",
"durationCol",
"listCol",
"mapCol",
"setCol"
"tupleCol"
"udtCol",
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print":true
}
}
}
]
}
}
```
### 3.2 参数说明
* **host**
* 描述:Cassandra连接点的域名或ip,多个node之间用逗号分隔。 <br />
* 必选:是 <br />
* 默认值:无 <br />
* **port**
* 描述:Cassandra端口。 <br />
* 必选:是 <br />
* 默认值:9042 <br />
* **username**
* 描述:数据源的用户名 <br />
* 必选:否 <br />
* 默认值:无 <br />
* **password**
* 描述:数据源指定用户名的密码 <br />
* 必选:否 <br />
* 默认值:无 <br />
* **useSSL**
* 描述:是否使用SSL连接。<br />
* 必选:否 <br />
* 默认值:false <br />
* **keyspace**
* 描述:需要同步的表所在的keyspace。<br />
* 必选:是 <br />
* 默认值:无 <br />
* **table**
* 描述:所选取的需要同步的表。<br />
* 必选:是 <br />
* 默认值:无 <br />
* **column**
* 描述:所配置的表中需要同步的列集合。<br />
其中的元素可以指定列的名称或writetime(column_name),后一种形式会读取column_name列的时间戳而不是数据。
* 必选:是 <br />
* 默认值:无 <br />
* **where**
* 描述:数据筛选条件的cql表达式,例如:<br />
```
"where":"textcol='a'"
```
* 必选:否 <br />
* 默认值:无 <br />
* **allowFiltering**
* 描述:是否在服务端过滤数据。参考cassandra文档中ALLOW FILTERING关键字的相关描述。<br />
* 必选:否 <br />
* 默认值:无 <br />
* **consistancyLevel**
* 描述:数据一致性级别。可选ONE|QUORUM|LOCAL_QUORUM|EACH_QUORUM|ALL|ANY|TWO|THREE|LOCAL_ONE<br />
* 必选:否 <br />
* 默认值:LOCAL_QUORUM <br />
### 3.3 类型转换
目前CassandraReader支持除counter和Custom类型之外的所有类型。
下面列出CassandraReader针对Cassandra类型转换列表:
| DataX 内部类型| Cassandra 数据类型 |
| -------- | ----- |
| Long |int, tinyint, smallint,varint,bigint,time|
| Double |float, double, decimal|
| String |ascii,varchar, text,uuid,timeuuid,duration,list,map,set,tuple,udt,inet |
| Date |date, timestamp |
| Boolean |bool |
| Bytes |blob |
请注意:
* 目前不支持counter类型和custom类型。
## 4 性能报告
## 5 约束限制
### 5.1 主备同步数据恢复问题
## 6 FAQ
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>cassandrareader</artifactId>
<name>cassandrareader</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.7.2</version>
<classifier>shaded</classifier>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.9</version>
</dependency>
<!-- for test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-service-face</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-serde</artifactId>
</exclusion>
<exclusion>
<groupId>javolution</groupId>
<artifactId>javolution</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/reader/cassandrareader</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>cassandrareader-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/reader/cassandrareader</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/reader/cassandrareader/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>
package com.alibaba.datax.plugin.reader.cassandrareader;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class CassandraReader extends Reader {
private static final Logger LOG = LoggerFactory
.getLogger(CassandraReader.class);
public static class Job extends Reader.Job {
private Configuration jobConfig = null;
private Cluster cluster = null;
@Override public void init() {
this.jobConfig = super.getPluginJobConf();
this.jobConfig = super.getPluginJobConf();
String username = jobConfig.getString(Key.USERNAME);
String password = jobConfig.getString(Key.PASSWORD);
String hosts = jobConfig.getString(Key.HOST);
Integer port = jobConfig.getInt(Key.PORT,9042);
boolean useSSL = jobConfig.getBool(Key.USESSL);
if ((username != null) && !username.isEmpty()) {
Cluster.Builder clusterBuilder = Cluster.builder().withCredentials(username, password)
.withPort(Integer.valueOf(port)).addContactPoints(hosts.split(","));
if (useSSL) {
clusterBuilder = clusterBuilder.withSSL();
}
cluster = clusterBuilder.build();
} else {
cluster = Cluster.builder().withPort(Integer.valueOf(port))
.addContactPoints(hosts.split(",")).build();
}
CassandraReaderHelper.checkConfig(jobConfig,cluster);
}
@Override public void destroy() {
}
@Override public List<Configuration> split(int adviceNumber) {
List<Configuration> splittedConfigs = CassandraReaderHelper.splitJob(adviceNumber,jobConfig,cluster);
return splittedConfigs;
}
}
public static class Task extends Reader.Task {
private Configuration taskConfig;
private Cluster cluster = null;
private Session session = null;
private String queryString = null;
private ConsistencyLevel consistencyLevel;
private int columnNumber = 0;
private List<String> columnMeta = null;
@Override public void init() {
this.taskConfig = super.getPluginJobConf();
String username = taskConfig.getString(Key.USERNAME);
String password = taskConfig.getString(Key.PASSWORD);
String hosts = taskConfig.getString(Key.HOST);
Integer port = taskConfig.getInt(Key.PORT);
boolean useSSL = taskConfig.getBool(Key.USESSL);
String keyspace = taskConfig.getString(Key.KEYSPACE);
this.columnMeta = taskConfig.getList(Key.COLUMN,String.class);
columnNumber = columnMeta.size();
if ((username != null) && !username.isEmpty()) {
Cluster.Builder clusterBuilder = Cluster.builder().withCredentials(username, password)
.withPort(Integer.valueOf(port)).addContactPoints(hosts.split(","));
if (useSSL) {
clusterBuilder = clusterBuilder.withSSL();
}
cluster = clusterBuilder.build();
} else {
cluster = Cluster.builder().withPort(Integer.valueOf(port))
.addContactPoints(hosts.split(",")).build();
}
session = cluster.connect(keyspace);
String cl = taskConfig.getString(Key.CONSITANCY_LEVEL);
if( cl != null && !cl.isEmpty() ) {
consistencyLevel = ConsistencyLevel.valueOf(cl);
} else {
consistencyLevel = ConsistencyLevel.LOCAL_QUORUM;
}
queryString = CassandraReaderHelper.getQueryString(taskConfig,cluster);
LOG.info("query = " + queryString);
}
@Override public void startRead(RecordSender recordSender) {
ResultSet r = session.execute(new SimpleStatement(queryString).setConsistencyLevel(consistencyLevel));
for (Row row : r ) {
Record record = recordSender.createRecord();
record = CassandraReaderHelper.buildRecord(record,row,r.getColumnDefinitions(),columnNumber,
super.getTaskPluginCollector());
if( record != null )
recordSender.sendToWriter(record);
}
}
@Override public void destroy() {
}
}
}
package com.alibaba.datax.plugin.reader.cassandrareader;
import com.alibaba.datax.common.spi.ErrorCode;
public enum CassandraReaderErrorCode implements ErrorCode {
CONF_ERROR("CassandraReader-00", "配置错误."),
;
private final String code;
private final String description;
private CassandraReaderErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s]. ", this.code,
this.description);
}
}
package com.alibaba.datax.plugin.reader.cassandrareader;
/**
* Created by mazhenlin on 2019/8/19.
*/
public class Key {
public final static String USERNAME = "username";
public final static String PASSWORD = "password";
public final static String HOST = "host";
public final static String PORT = "port";
public final static String USESSL = "useSSL";
public final static String KEYSPACE = "keyspace";
public final static String TABLE = "table";
public final static String COLUMN = "column";
public final static String WHERE = "where";
public final static String ALLOW_FILTERING = "allowFiltering";
public final static String CONSITANCY_LEVEL = "consistancyLevel";
public final static String MIN_TOKEN = "minToken";
public final static String MAX_TOKEN = "maxToken";
/**
* 每个列的名字
*/
public static final String COLUMN_NAME = "name";
/**
* 列分隔符
*/
public static final String COLUMN_SPLITTER = "format";
public static final String WRITE_TIME = "writetime(";
public static final String ELEMENT_SPLITTER = "splitter";
public static final String ENTRY_SPLITTER = "entrySplitter";
public static final String KV_SPLITTER = "kvSplitter";
public static final String ELEMENT_CONFIG = "element";
public static final String TUPLE_CONNECTOR = "_";
public static final String KEY_CONFIG = "key";
public static final String VALUE_CONFIG = "value";
}
errorcode.config_invalid_exception=\u914D\u7F6E\u9519\u8BEF
\ No newline at end of file
errorcode.config_invalid_exception=\u914D\u7F6E\u9519\u8BEF
\ No newline at end of file
errorcode.config_invalid_exception=\u914D\u7F6E\u9519\u8BEF
\ No newline at end of file
errorcode.config_invalid_exception=\u914D\u7F6E\u9519\u8BEF
\ No newline at end of file
errorcode.config_invalid_exception=\u914D\u7F6E\u9519\u8BEF
\ No newline at end of file
{
"name": "cassandrareader",
"class": "com.alibaba.datax.plugin.reader.cassandrareader.CassandraReader",
"description": "useScene: prod. mechanism: execute select cql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.",
"developer": "alibaba"
}
\ No newline at end of file
{
"name": "cassandrareader",
"parameter": {
"username": "",
"password": "",
"host": "",
"port": "",
"useSSL": false,
"keyspace": "",
"table": "",
"column": [
"c1","c2","c3"
]
}
}
\ No newline at end of file
# CassandraWriter 插件文档
___
## 1 快速介绍
CassandraWriter插件实现了向Cassandra写入数据。在底层实现上,CassandraWriter通过datastax的java driver连接Cassandra实例,并执行相应的cql语句将数据写入cassandra中。
## 2 实现原理
简而言之,CassandraWriter通过java driver连接到Cassandra实例,并根据用户配置的信息生成INSERT CQL语句,然后发送到Cassandra。
对于用户配置Table、Column的信息,CassandraReader将其拼接为CQL语句发送到Cassandra。
## 3 功能说明
### 3.1 配置样例
* 配置一个从内存产生到Cassandra导入的作业:
```
{
"job": {
"setting": {
"speed": {
"channel": 5
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{"value":"name","type": "string"},
{"value":"false","type":"bool"},
{"value":"1988-08-08 08:08:08","type":"date"},
{"value":"addr","type":"bytes"},
{"value":1.234,"type":"double"},
{"value":12345678,"type":"long"},
{"value":2.345,"type":"double"},
{"value":3456789,"type":"long"},
{"value":"4a0ef8c0-4d97-11d0-db82-ebecdb03ffa5","type":"string"},
{"value":"value","type":"bytes"},
{"value":"-838383838,37377373,-383883838,27272772,393993939,-38383883,83883838,-1350403181,817650816,1630642337,251398784,-622020148","type":"string"},
],
"sliceRecordCount": 10000000
}
},
"writer": {
"name": "cassandrawriter",
"parameter": {
"host": "localhost",
"port": 9042,
"useSSL": false,
"keyspace": "stresscql",
"table": "dst",
"batchSize":10,
"column": [
"name",
"choice",
"date",
"address",
"dbl",
"lval",
"fval",
"ival",
"uid",
"value",
"listval"
]
}
}
}
]
}
}
```
### 3.2 参数说明
* **host**
* 描述:Cassandra连接点的域名或ip,多个node之间用逗号分隔。 <br />
* 必选:是 <br />
* 默认值:无 <br />
* **port**
* 描述:Cassandra端口。 <br />
* 必选:是 <br />
* 默认值:9042 <br />
* **username**
* 描述:数据源的用户名 <br />
* 必选:否 <br />
* 默认值:无 <br />
* **password**
* 描述:数据源指定用户名的密码 <br />
* 必选:否 <br />
* 默认值:无 <br />
* **useSSL**
* 描述:是否使用SSL连接。<br />
* 必选:否 <br />
* 默认值:false <br />
* **connectionsPerHost**
* 描述:客户端连接池配置:与服务器每个节点建多少个连接。<br />
* 必选:否 <br />
* 默认值:8 <br />
* **maxPendingPerConnection**
* 描述:客户端连接池配置:每个连接最大请求数。<br />
* 必选:否 <br />
* 默认值:128 <br />
* **keyspace**
* 描述:需要同步的表所在的keyspace。<br />
* 必选:是 <br />
* 默认值:无 <br />
* **table**
* 描述:所选取的需要同步的表。<br />
* 必选:是 <br />
* 默认值:无 <br />
* **column**
* 描述:所配置的表中需要同步的列集合。<br />
内容可以是列的名称或"writetime()"。如果将列名配置为writetime(),会将这一列的内容作为时间戳。
* 必选:是 <br />
* 默认值:无 <br />
* **consistancyLevel**
* 描述:数据一致性级别。可选ONE|QUORUM|LOCAL_QUORUM|EACH_QUORUM|ALL|ANY|TWO|THREE|LOCAL_ONE<br />
* 必选:否 <br />
* 默认值:LOCAL_QUORUM <br />
* **batchSize**
* 描述:一次批量提交(UNLOGGED BATCH)的记录数大小(条数)。注意batch的大小有如下限制:<br />
(1)不能超过65535。<br />
(2) batch中的内容大小受到服务器端batch_size_fail_threshold_in_kb的限制。<br />
(3) 如果batch中的内容超过了batch_size_warn_threshold_in_kb的限制,会打出warn日志,但并不影响写入,忽略即可。<br />
如果批量提交失败,会把这个批量的所有内容重新逐条写入一遍。
* 必选:否 <br />
* 默认值:1 <br />
### 3.3 类型转换
目前CassandraReader支持除counter和Custom类型之外的所有类型。
下面列出CassandraReader针对Cassandra类型转换列表:
| DataX 内部类型| Cassandra 数据类型 |
| -------- | ----- |
| Long |int, tinyint, smallint,varint,bigint,time|
| Double |float, double, decimal|
| String |ascii,varchar, text,uuid,timeuuid,duration,list,map,set,tuple,udt,inet |
| Date |date, timestamp |
| Boolean |bool |
| Bytes |blob |
请注意:
* 目前不支持counter类型和custom类型。
## 4 性能报告
## 5 约束限制
### 5.1 主备同步数据恢复问题
## 6 FAQ
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cassandrawriter</artifactId>
<name>cassandrawriter</name>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.7.2</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.9</version>
</dependency>
<!-- for test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-service-face</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-serde</artifactId>
</exclusion>
<exclusion>
<groupId>javolution</groupId>
<artifactId>javolution</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
</includes>
</resource>
</resources>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/cassandrawriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>cassandrawriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/cassandrawriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/cassandrawriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>
package com.alibaba.datax.plugin.writer.cassandrawriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BatchStatement.Type;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.datastax.driver.core.querybuilder.QueryBuilder.timestamp;
/**
* Created by mazhenlin on 2019/8/19.
*/
public class CassandraWriter extends Writer {
private static final Logger LOG = LoggerFactory
.getLogger(CassandraWriter.class);
public static class Job extends Writer.Job {
private Configuration originalConfig = null;
@Override public List<Configuration> split(int mandatoryNumber) {
List<Configuration> splitResultConfigs = new ArrayList<Configuration>();
for (int j = 0; j < mandatoryNumber; j++) {
splitResultConfigs.add(originalConfig.clone());
}
return splitResultConfigs;
}
@Override public void init() {
originalConfig = getPluginJobConf();
}
@Override public void destroy() {
}
}
public static class Task extends Writer.Task {
private Configuration taskConfig;
private Cluster cluster = null;
private Session session = null;
private PreparedStatement statement = null;
private int columnNumber = 0;
private List<DataType> columnTypes;
private List<String> columnMeta = null;
private int writeTimeCol = -1;
private boolean asyncWrite = false;
private long batchSize = 1;
private List<ResultSetFuture> unConfirmedWrite;
private List<BoundStatement> bufferedWrite;
@Override public void startWrite(RecordReceiver lineReceiver) {
try {
Record record;
while ((record = lineReceiver.getFromReader()) != null) {
if (record.getColumnNumber() != columnNumber) {
// 源头读取字段列数与目的表字段写入列数不相等,直接报错
throw DataXException
.asDataXException(
CassandraWriterErrorCode.CONF_ERROR,
String.format(
"列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",
record.getColumnNumber(),
this.columnNumber));
}
BoundStatement boundStmt = statement.bind();
for (int i = 0; i < columnNumber; i++) {
if( writeTimeCol != -1 && i == writeTimeCol ) {
continue;
}
Column col = record.getColumn(i);
int pos = i;
if( writeTimeCol != -1 && pos > writeTimeCol ) {
pos = i - 1;
}
CassandraWriterHelper.setupColumn(boundStmt,pos,columnTypes.get(pos),col);
}
if(writeTimeCol != -1) {
Column col = record.getColumn(writeTimeCol );
boundStmt.setLong(columnNumber - 1,col.asLong());
}
if( batchSize <= 1 ) {
session.execute(boundStmt);
} else {
if( asyncWrite ) {
unConfirmedWrite.add(session.executeAsync(boundStmt));
if (unConfirmedWrite.size() >= batchSize) {
for (ResultSetFuture write : unConfirmedWrite) {
write.getUninterruptibly(10000, TimeUnit.MILLISECONDS);
}
unConfirmedWrite.clear();
}
} else {
bufferedWrite.add(boundStmt);
if( bufferedWrite.size() >= batchSize ) {
BatchStatement batchStatement = new BatchStatement(Type.UNLOGGED);
batchStatement.addAll(bufferedWrite);
try {
session.execute(batchStatement);
} catch (Exception e ) {
LOG.error("batch写入失败,尝试逐条写入.",e);
for( BoundStatement stmt: bufferedWrite ) {
session.execute(stmt);
}
}
///LOG.info("batch finished. size = " + bufferedWrite.size());
bufferedWrite.clear();
}
}
}
}
if( unConfirmedWrite != null && unConfirmedWrite.size() > 0 ) {
for( ResultSetFuture write : unConfirmedWrite ) {
write.getUninterruptibly(10000, TimeUnit.MILLISECONDS);
}
unConfirmedWrite.clear();
}
if( bufferedWrite !=null && bufferedWrite.size() > 0 ) {
BatchStatement batchStatement = new BatchStatement(Type.UNLOGGED);
batchStatement.addAll(bufferedWrite);
session.execute(batchStatement);
bufferedWrite.clear();
}
} catch (Exception e) {
throw DataXException.asDataXException(
CassandraWriterErrorCode.WRITE_DATA_ERROR, e);
}
}
@Override public void init() {
this.taskConfig = super.getPluginJobConf();
String username = taskConfig.getString(Key.USERNAME);
String password = taskConfig.getString(Key.PASSWORD);
String hosts = taskConfig.getString(Key.HOST);
Integer port = taskConfig.getInt(Key.PORT,9042);
boolean useSSL = taskConfig.getBool(Key.USESSL);
String keyspace = taskConfig.getString(Key.KEYSPACE);
String table = taskConfig.getString(Key.TABLE);
batchSize = taskConfig.getLong(Key.BATCH_SIZE,1);
this.columnMeta = taskConfig.getList(Key.COLUMN,String.class);
columnTypes = new ArrayList<DataType>(columnMeta.size());
columnNumber = columnMeta.size();
asyncWrite = taskConfig.getBool(Key.ASYNC_WRITE,false);
int connectionsPerHost = taskConfig.getInt(Key.CONNECTIONS_PER_HOST,8);
int maxPendingPerConnection = taskConfig.getInt(Key.MAX_PENDING_CONNECTION,128);
PoolingOptions poolingOpts = new PoolingOptions()
.setConnectionsPerHost(HostDistance.LOCAL, connectionsPerHost, connectionsPerHost)
.setMaxRequestsPerConnection(HostDistance.LOCAL, maxPendingPerConnection)
.setNewConnectionThreshold(HostDistance.LOCAL, 100);
Cluster.Builder clusterBuilder = Cluster.builder().withPoolingOptions(poolingOpts);
if ((username != null) && !username.isEmpty()) {
clusterBuilder = clusterBuilder.withCredentials(username, password)
.withPort(Integer.valueOf(port)).addContactPoints(hosts.split(","));
if (useSSL) {
clusterBuilder = clusterBuilder.withSSL();
}
} else {
clusterBuilder = clusterBuilder.withPort(Integer.valueOf(port))
.addContactPoints(hosts.split(","));
}
cluster = clusterBuilder.build();
session = cluster.connect(keyspace);
TableMetadata meta = cluster.getMetadata().getKeyspace(keyspace).getTable(table);
Insert insertStmt = QueryBuilder.insertInto(table);
for( String colunmnName : columnMeta ) {
if( colunmnName.toLowerCase().equals(Key.WRITE_TIME) ) {
if( writeTimeCol != -1 ) {
throw DataXException
.asDataXException(
CassandraWriterErrorCode.CONF_ERROR,
"列配置信息有错误. 只能有一个时间戳列(writetime())");
}
writeTimeCol = columnTypes.size();
continue;
}
insertStmt.value(colunmnName,QueryBuilder.bindMarker());
ColumnMetadata col = meta.getColumn(colunmnName);
if( col == null ) {
throw DataXException
.asDataXException(
CassandraWriterErrorCode.CONF_ERROR,
String.format(
"列配置信息有错误. 表中未找到列名 '%s' .",
colunmnName));
}
columnTypes.add(col.getType());
}
if(writeTimeCol != -1) {
insertStmt.using(timestamp(QueryBuilder.bindMarker()));
}
String cl = taskConfig.getString(Key.CONSITANCY_LEVEL);
if( cl != null && !cl.isEmpty() ) {
insertStmt.setConsistencyLevel(ConsistencyLevel.valueOf(cl));
} else {
insertStmt.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
}
statement = session.prepare(insertStmt);
if( batchSize > 1 ) {
if( asyncWrite ) {
unConfirmedWrite = new ArrayList<ResultSetFuture>();
} else {
bufferedWrite = new ArrayList<BoundStatement>();
}
}
}
@Override public void destroy() {
}
}
}
package com.alibaba.datax.plugin.writer.cassandrawriter;
import com.alibaba.datax.common.spi.ErrorCode;
/**
* Created by mazhenlin on 2019/8/19.
*/
public enum CassandraWriterErrorCode implements ErrorCode {
CONF_ERROR("CassandraWriter-00", "配置错误."),
WRITE_DATA_ERROR("CassandraWriter-01", "写入数据时失败."),
;
private final String code;
private final String description;
private CassandraWriterErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s].", this.code, this.description);
}
}
package com.alibaba.datax.plugin.writer.cassandrawriter;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.CodecRegistry;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.DataType.Name;
import com.datastax.driver.core.Duration;
import com.datastax.driver.core.LocalDate;
import com.datastax.driver.core.TupleType;
import com.datastax.driver.core.TupleValue;
import com.datastax.driver.core.UDTValue;
import com.datastax.driver.core.UserType;
import com.datastax.driver.core.UserType.Field;
import com.google.common.base.Splitter;
import org.apache.commons.codec.binary.Base64;
/**
* Created by mazhenlin on 2019/8/21.
*/
public class CassandraWriterHelper {
static CodecRegistry registry = new CodecRegistry();
public static Object parseFromString(String s, DataType sqlType ) throws Exception {
if (s == null || s.isEmpty()) {
if (sqlType.getName() == Name.ASCII || sqlType.getName() == Name.TEXT ||
sqlType.getName() == Name.VARCHAR) {
return s;
} else {
return null;
}
}
switch (sqlType.getName()) {
case ASCII:
case TEXT:
case VARCHAR:
return s;
case BLOB:
if (s.length() == 0) {
return new byte[0];
}
byte[] byteArray = new byte[s.length() / 2];
for (int i = 0; i < byteArray.length; i++) {
String subStr = s.substring(2 * i, 2 * i + 2);
byteArray[i] = ((byte) Integer.parseInt(subStr, 16));
}
return ByteBuffer.wrap(byteArray);
case BOOLEAN:
return Boolean.valueOf(s);
case TINYINT:
return Byte.valueOf(s);
case SMALLINT:
return Short.valueOf(s);
case INT:
return Integer.valueOf(s);
case BIGINT:
return Long.valueOf(s);
case VARINT:
return new BigInteger(s, 10);
case FLOAT:
return Float.valueOf(s);
case DOUBLE:
return Double.valueOf(s);
case DECIMAL:
return new BigDecimal(s);
case DATE: {
String[] a = s.split("-");
if (a.length != 3) {
throw new Exception(String.format("DATE类型数据 '%s' 格式不正确,必须为yyyy-mm-dd格式", s));
}
return LocalDate.fromYearMonthDay(Integer.valueOf(a[0]), Integer.valueOf(a[1]),
Integer.valueOf(a[2]));
}
case TIME:
return Long.valueOf(s);
case TIMESTAMP:
return new Date(Long.valueOf(s));
case UUID:
case TIMEUUID:
return UUID.fromString(s);
case INET:
String[] b = s.split("/");
if (b.length < 2) {
return InetAddress.getByName(s);
}
byte[] addr = InetAddress.getByName(b[1]).getAddress();
return InetAddress.getByAddress(b[0], addr);
case DURATION:
return Duration.from(s);
case LIST:
case MAP:
case SET:
case TUPLE:
case UDT:
Object jsonObject = JSON.parse(s);
return parseFromJson(jsonObject,sqlType);
default:
throw DataXException.asDataXException(CassandraWriterErrorCode.CONF_ERROR,
"不支持您配置的列类型:" + sqlType + ", 请检查您的配置 或者 联系 管理员.");
} // end switch
}
public static Object parseFromJson(Object jsonObject,DataType type) throws Exception {
if( jsonObject == null ) return null;
switch (type.getName()) {
case ASCII:
case TEXT:
case VARCHAR:
case BOOLEAN:
case TIME:
return jsonObject;
case TINYINT:
return ((Number)jsonObject).byteValue();
case SMALLINT:
return ((Number)jsonObject).shortValue();
case INT:
return ((Number)jsonObject).intValue();
case BIGINT:
return ((Number)jsonObject).longValue();
case VARINT:
return new BigInteger(jsonObject.toString());
case FLOAT:
return ((Number)jsonObject).floatValue();
case DOUBLE:
return ((Number)jsonObject).doubleValue();
case DECIMAL:
return new BigDecimal(jsonObject.toString());
case BLOB:
return ByteBuffer.wrap(Base64.decodeBase64((String)jsonObject));
case DATE:
return LocalDate.fromMillisSinceEpoch(((Number)jsonObject).longValue());
case TIMESTAMP:
return new Date(((Number)jsonObject).longValue());
case DURATION:
return Duration.from(jsonObject.toString());
case UUID:
case TIMEUUID:
return UUID.fromString(jsonObject.toString());
case INET:
return InetAddress.getByName((String)jsonObject);
case LIST:
List l = new ArrayList();
for( Object o : (JSONArray)jsonObject ) {
l.add(parseFromJson(o,type.getTypeArguments().get(0)));
}
return l;
case MAP: {
Map m = new HashMap();
for (JSONObject.Entry e : ((JSONObject)jsonObject).entrySet()) {
Object k = parseFromString((String) e.getKey(), type.getTypeArguments().get(0));
Object v = parseFromJson(e.getValue(), type.getTypeArguments().get(1));
m.put(k,v);
}
return m;
}
case SET:
Set s = new HashSet();
for( Object o : (JSONArray)jsonObject ) {
s.add(parseFromJson(o,type.getTypeArguments().get(0)));
}
return s;
case TUPLE: {
TupleValue t = ((TupleType) type).newValue();
int j = 0;
for (Object e : (JSONArray)jsonObject) {
DataType eleType = ((TupleType) type).getComponentTypes().get(j);
t.set(j, parseFromJson(e, eleType), registry.codecFor(eleType).getJavaType());
j++;
}
return t;
}
case UDT: {
UDTValue t = ((UserType) type).newValue();
UserType userType = t.getType();
for (JSONObject.Entry e : ((JSONObject)jsonObject).entrySet()) {
DataType eleType = userType.getFieldType((String)e.getKey());
t.set((String)e.getKey(), parseFromJson(e.getValue(), eleType), registry.codecFor(eleType).getJavaType());
}
return t;
}
}
return null;
}
public static void setupColumn(BoundStatement ps, int pos, DataType sqlType, Column col) throws Exception {
if (col.getRawData() != null) {
switch (sqlType.getName()) {
case ASCII:
case TEXT:
case VARCHAR:
ps.setString(pos, col.asString());
break;
case BLOB:
ps.setBytes(pos, ByteBuffer.wrap(col.asBytes()));
break;
case BOOLEAN:
ps.setBool(pos, col.asBoolean());
break;
case TINYINT:
ps.setByte(pos, col.asLong().byteValue());
break;
case SMALLINT:
ps.setShort(pos, col.asLong().shortValue());
break;
case INT:
ps.setInt(pos, col.asLong().intValue());
break;
case BIGINT:
ps.setLong(pos, col.asLong());
break;
case VARINT:
ps.setVarint(pos, col.asBigInteger());
break;
case FLOAT:
ps.setFloat(pos, col.asDouble().floatValue());
break;
case DOUBLE:
ps.setDouble(pos, col.asDouble());
break;
case DECIMAL:
ps.setDecimal(pos, col.asBigDecimal());
break;
case DATE:
ps.setDate(pos, LocalDate.fromMillisSinceEpoch(col.asDate().getTime()));
break;
case TIME:
ps.setTime(pos, col.asLong());
break;
case TIMESTAMP:
ps.setTimestamp(pos, col.asDate());
break;
case UUID:
case TIMEUUID:
ps.setUUID(pos, UUID.fromString(col.asString()));
break;
case INET:
ps.setInet(pos, InetAddress.getByName(col.asString()));
break;
case DURATION:
ps.set(pos, Duration.from(col.asString()), Duration.class);
break;
case LIST:
ps.setList(pos, (List<?>) parseFromString(col.asString(), sqlType));
break;
case MAP:
ps.setMap(pos, (Map) parseFromString(col.asString(), sqlType));
break;
case SET:
ps.setSet(pos, (Set) parseFromString(col.asString(), sqlType));
break;
case TUPLE:
ps.setTupleValue(pos, (TupleValue) parseFromString(col.asString(), sqlType));
break;
case UDT:
ps.setUDTValue(pos, (UDTValue) parseFromString(col.asString(), sqlType));
break;
default:
throw DataXException.asDataXException(CassandraWriterErrorCode.CONF_ERROR,
"不支持您配置的列类型:" + sqlType + ", 请检查您的配置 或者 联系 管理员.");
} // end switch
} else {
ps.setToNull(pos);
}
}
}
package com.alibaba.datax.plugin.writer.cassandrawriter;
/**
* Created by mazhenlin on 2019/8/19.
*/
public class Key {
public final static String USERNAME = "username";
public final static String PASSWORD = "password";
public final static String HOST = "host";
public final static String PORT = "port";
public final static String USESSL = "useSSL";
public final static String KEYSPACE = "keyspace";
public final static String TABLE = "table";
public final static String COLUMN = "column";
public final static String WRITE_TIME = "writetime()";
public final static String ASYNC_WRITE = "asyncWrite";
public final static String CONSITANCY_LEVEL = "consistancyLevel";
public final static String CONNECTIONS_PER_HOST = "connectionsPerHost";
public final static String MAX_PENDING_CONNECTION = "maxPendingPerConnection";
/**
* 异步写入的批次大小,默认1(不异步写入)
*/
public final static String BATCH_SIZE = "batchSize";
/**
* 每个列的名字
*/
public static final String COLUMN_NAME = "name";
/**
* 列分隔符
*/
public static final String COLUMN_SPLITTER = "format";
public static final String ELEMENT_SPLITTER = "splitter";
public static final String ENTRY_SPLITTER = "entrySplitter";
public static final String KV_SPLITTER = "kvSplitter";
public static final String ELEMENT_CONFIG = "element";
public static final String TUPLE_CONNECTOR = "_";
public static final String KEY_CONFIG = "key";
public static final String VALUE_CONFIG = "value";
}
errorcode.config_invalid_exception=\u914D\u7F6E\u9519\u8BEF.
errorcode.write_failed_exception=\u5199\u5165\u6570\u636E\u65F6\u5931\u8D25
\ No newline at end of file
errorcode.config_invalid_exception=Error in parameter configuration.
errorcode.write_failed_exception=\u5199\u5165\u6570\u636E\u65F6\u5931\u8D25
\ No newline at end of file
errorcode.config_invalid_exception=\u914D\u7F6E\u9519\u8BEF.
errorcode.write_failed_exception=\u5199\u5165\u6570\u636E\u65F6\u5931\u8D25
\ No newline at end of file
errorcode.config_invalid_exception=\u914D\u7F6E\u9519\u8BEF.
errorcode.write_failed_exception=\u5199\u5165\u6570\u636E\u65F6\u5931\u8D25
\ No newline at end of file
errorcode.config_invalid_exception=\u914D\u7F6E\u9519\u8BEF.
errorcode.write_failed_exception=\u5199\u5165\u6570\u636E\u65F6\u5931\u8D25
\ No newline at end of file
errorcode.config_invalid_exception=\u914D\u7F6E\u9519\u8BEF.
errorcode.write_failed_exception=\u5199\u5165\u6570\u636E\u65F6\u5931\u8D25
\ No newline at end of file
{
"name": "cassandrawriter",
"class": "com.alibaba.datax.plugin.writer.cassandrawriter.CassandraWriter",
"description": "useScene: prod. mechanism: use datax driver, execute insert sql.",
"developer": "alibaba"
}
{
"name": "cassandrawriter",
"parameter": {
"username": "",
"password": "",
"host": "",
"port": "",
"useSSL": false,
"keyspace": "",
"table": "",
"column": [
"c1","c2","c3"
]
}
}
\ No newline at end of file
...@@ -65,8 +65,8 @@ ...@@ -65,8 +65,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -140,8 +140,8 @@ ...@@ -140,8 +140,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -54,8 +54,8 @@ ...@@ -54,8 +54,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -54,8 +54,8 @@ ...@@ -54,8 +54,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -60,8 +60,8 @@ ...@@ -60,8 +60,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -63,8 +63,8 @@ ...@@ -63,8 +63,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -65,8 +65,8 @@ ...@@ -65,8 +65,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -74,8 +74,8 @@ ...@@ -74,8 +74,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.8</source> <source>${jdk-version}</source>
<target>1.8</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -39,6 +39,12 @@ ...@@ -39,6 +39,12 @@
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId> <artifactId>hbase</artifactId>
<version>0.94.27</version> <version>0.94.27</version>
<exclusions>
<exclusion>
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
...@@ -66,8 +72,8 @@ ...@@ -66,8 +72,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -42,6 +42,12 @@ ...@@ -42,6 +42,12 @@
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId> <artifactId>hbase</artifactId>
<version>0.94.27</version> <version>0.94.27</version>
<exclusions>
<exclusion>
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
...@@ -79,8 +85,8 @@ ...@@ -79,8 +85,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -37,6 +37,12 @@ ...@@ -37,6 +37,12 @@
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId> <artifactId>hbase-client</artifactId>
<version>${hbase.version}</version> <version>${hbase.version}</version>
<exclusions>
<exclusion>
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>
...@@ -86,8 +92,8 @@ ...@@ -86,8 +92,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -37,6 +37,10 @@ ...@@ -37,6 +37,10 @@
<artifactId>servlet-api</artifactId> <artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId> <groupId>javax.servlet</groupId>
</exclusion> </exclusion>
<exclusion>
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
...@@ -80,8 +84,8 @@ ...@@ -80,8 +84,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -48,6 +48,12 @@ ...@@ -48,6 +48,12 @@
<groupId>org.apache.phoenix</groupId> <groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId> <artifactId>phoenix-core</artifactId>
<version>${phoenix.version}</version> <version>${phoenix.version}</version>
<exclusions>
<exclusion>
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.phoenix</groupId> <groupId>org.apache.phoenix</groupId>
...@@ -120,8 +126,8 @@ ...@@ -120,8 +126,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -41,6 +41,12 @@ ...@@ -41,6 +41,12 @@
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId> <artifactId>hbase-client</artifactId>
<version>${hbase.version}</version> <version>${hbase.version}</version>
<exclusions>
<exclusion>
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>
...@@ -95,8 +101,8 @@ ...@@ -95,8 +101,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -82,8 +82,8 @@ ...@@ -82,8 +82,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -76,8 +76,8 @@ ...@@ -76,8 +76,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -71,6 +71,12 @@ ...@@ -71,6 +71,12 @@
<groupId>org.apache.hive</groupId> <groupId>org.apache.hive</groupId>
<artifactId>hive-service</artifactId> <artifactId>hive-service</artifactId>
<version>${hive.version}</version> <version>${hive.version}</version>
<exclusions>
<exclusion>
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.hive</groupId> <groupId>org.apache.hive</groupId>
...@@ -97,8 +103,8 @@ ...@@ -97,8 +103,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -106,8 +106,8 @@ ...@@ -106,8 +106,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -55,8 +55,8 @@ ...@@ -55,8 +55,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -59,8 +59,8 @@ ...@@ -59,8 +59,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -52,8 +52,8 @@ ...@@ -52,8 +52,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -50,8 +50,8 @@ ...@@ -50,8 +50,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -56,8 +56,8 @@ ...@@ -56,8 +56,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
<version>3.2</version> <version>3.2</version>
......
...@@ -115,8 +115,8 @@ ...@@ -115,8 +115,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -78,8 +78,8 @@ ...@@ -78,8 +78,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -126,8 +126,8 @@ ...@@ -126,8 +126,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -56,8 +56,8 @@ ...@@ -56,8 +56,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -53,8 +53,8 @@ ...@@ -53,8 +53,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -58,8 +58,8 @@ ...@@ -58,8 +58,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -52,8 +52,8 @@ ...@@ -52,8 +52,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -47,8 +47,8 @@ ...@@ -47,8 +47,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -61,8 +61,8 @@ ...@@ -61,8 +61,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -47,8 +47,8 @@ ...@@ -47,8 +47,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -166,6 +166,13 @@ ...@@ -166,6 +166,13 @@
</includes> </includes>
<outputDirectory>datax</outputDirectory> <outputDirectory>datax</outputDirectory>
</fileSet> </fileSet>
<fileSet>
<directory>cassandrareader/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<!-- writer --> <!-- writer -->
<fileSet> <fileSet>
...@@ -343,5 +350,12 @@ ...@@ -343,5 +350,12 @@
</includes> </includes>
<outputDirectory>datax</outputDirectory> <outputDirectory>datax</outputDirectory>
</fileSet> </fileSet>
<fileSet>
<directory>cassandrawriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
</fileSets> </fileSets>
</assembly> </assembly>
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
<packaging>pom</packaging> <packaging>pom</packaging>
<properties> <properties>
<jdk-version>1.8</jdk-version>
<datax-project-version>0.0.1-SNAPSHOT</datax-project-version> <datax-project-version>0.0.1-SNAPSHOT</datax-project-version>
<commons-lang3-version>3.3.2</commons-lang3-version> <commons-lang3-version>3.3.2</commons-lang3-version>
<commons-configuration-version>1.10</commons-configuration-version> <commons-configuration-version>1.10</commons-configuration-version>
...@@ -62,7 +63,9 @@ ...@@ -62,7 +63,9 @@
<module>rdbmsreader</module> <module>rdbmsreader</module>
<module>hbase11xreader</module> <module>hbase11xreader</module>
<module>hbase094xreader</module> <module>hbase094xreader</module>
<module>tsdbreader</module>
<module>opentsdbreader</module> <module>opentsdbreader</module>
<module>cassandrareader</module>
<!-- writer --> <!-- writer -->
<module>mysqlwriter</module> <module>mysqlwriter</module>
...@@ -89,7 +92,7 @@ ...@@ -89,7 +92,7 @@
<module>tsdbwriter</module> <module>tsdbwriter</module>
<module>adbpgwriter</module> <module>adbpgwriter</module>
<module>gdbwriter</module> <module>gdbwriter</module>
<module>cassandrawriter</module>
<!-- common support module --> <!-- common support module -->
<module>plugin-rdbms-util</module> <module>plugin-rdbms-util</module>
<module>plugin-unstructured-storage-util</module> <module>plugin-unstructured-storage-util</module>
...@@ -223,8 +226,8 @@ ...@@ -223,8 +226,8 @@
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version> <version>2.3.2</version>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -56,8 +56,8 @@ ...@@ -56,8 +56,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -53,8 +53,8 @@ ...@@ -53,8 +53,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -70,8 +70,8 @@ ...@@ -70,8 +70,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -71,8 +71,8 @@ ...@@ -71,8 +71,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -48,8 +48,8 @@ ...@@ -48,8 +48,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -50,8 +50,8 @@ ...@@ -50,8 +50,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -44,8 +44,8 @@ ...@@ -44,8 +44,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -39,8 +39,8 @@ ...@@ -39,8 +39,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -38,8 +38,8 @@ ...@@ -38,8 +38,8 @@
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.6</source> <source>${jdk-version}</source>
<target>1.6</target> <target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding> <encoding>${project-sourceEncoding}</encoding>
</configuration> </configuration>
</plugin> </plugin>
......
This diff is collapsed.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>tsdbreader</artifactId>
<name>tsdbreader</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- common -->
<commons-lang3.version>3.3.2</commons-lang3.version>
<!-- http -->
<httpclient.version>4.4</httpclient.version>
<commons-io.version>2.4</commons-io.version>
<!-- json -->
<fastjson.version>1.2.28</fastjson.version>
<!-- test -->
<junit4.version>4.12</junit4.version>
<!-- time -->
<joda-time.version>2.9.9</joda-time.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>fastjson</artifactId>
<groupId>com.alibaba</groupId>
</exclusion>
<exclusion>
<artifactId>commons-math3</artifactId>
<groupId>org.apache.commons</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<!-- common -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<!-- http -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>fluent-hc</artifactId>
<version>${httpclient.version}</version>
</dependency>
<!-- json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- time -->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${joda-time.version}</version>
</dependency>
<!-- test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit4.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/reader/tsdbreader</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>tsdbreader-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/reader/tsdbreader</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/reader/tsdbreader/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>
package com.alibaba.datax.plugin.reader.tsdbreader;
import java.util.HashSet;
import java.util.Set;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Constant
*
* @author Benedict Jin
* @since 2019-10-21
*/
public final class Constant {
static final String DEFAULT_DATA_FORMAT = "yyyy-MM-dd HH:mm:ss";
public static final String METRIC_SPECIFY_KEY = "__metric__";
public static final String TS_SPECIFY_KEY = "__ts__";
public static final String VALUE_SPECIFY_KEY = "__value__";
static final Set<String> MUST_CONTAINED_SPECIFY_KEYS = new HashSet<>();
static {
MUST_CONTAINED_SPECIFY_KEYS.add(METRIC_SPECIFY_KEY);
MUST_CONTAINED_SPECIFY_KEYS.add(TS_SPECIFY_KEY);
// __value__ 在多值场景下,可以不指定
}
}
package com.alibaba.datax.plugin.reader.tsdbreader;
import java.util.HashSet;
import java.util.Set;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Key
*
* @author Benedict Jin
* @since 2019-10-21
*/
public class Key {
// TSDB for OpenTSDB / InfluxDB / TimeScale / Prometheus etc.
// RDB for MySQL / ADB etc.
static final String SINK_DB_TYPE = "sinkDbType";
static final String ENDPOINT = "endpoint";
static final String COLUMN = "column";
static final String METRIC = "metric";
static final String FIELD = "field";
static final String TAG = "tag";
static final String INTERVAL_DATE_TIME = "splitIntervalMs";
static final String BEGIN_DATE_TIME = "beginDateTime";
static final String END_DATE_TIME = "endDateTime";
static final Integer INTERVAL_DATE_TIME_DEFAULT_VALUE = 60;
static final String TYPE_DEFAULT_VALUE = "TSDB";
static final Set<String> TYPE_SET = new HashSet<>();
static {
TYPE_SET.add("TSDB");
TYPE_SET.add("RDB");
}
}
package com.alibaba.datax.plugin.reader.tsdbreader;
import com.alibaba.datax.common.spi.ErrorCode;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:TSDB Reader Error Code
*
* @author Benedict Jin
* @since 2019-10-21
*/
public enum TSDBReaderErrorCode implements ErrorCode {
REQUIRED_VALUE("TSDBReader-00", "缺失必要的值"),
ILLEGAL_VALUE("TSDBReader-01", "值非法");
private final String code;
private final String description;
TSDBReaderErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s]. ", this.code, this.description);
}
}
package com.alibaba.datax.plugin.reader.tsdbreader.conn;
import com.alibaba.datax.common.plugin.RecordSender;
import java.util.List;
import java.util.Map;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Connection for TSDB-like databases
*
* @author Benedict Jin
* @since 2019-10-21
*/
public interface Connection4TSDB {
/**
* Get the address of Database.
*
* @return host+ip
*/
String address();
/**
* Get the version of Database.
*
* @return version
*/
String version();
/**
* Get these configurations.
*
* @return configs
*/
String config();
/**
* Get the list of supported version.
*
* @return version list
*/
String[] getSupportVersionPrefix();
/**
* Send data points for TSDB with single field.
*/
void sendDPs(String metric, Map<String, String> tags, Long start, Long end, RecordSender recordSender) throws Exception;
/**
* Send data points for TSDB with multi fields.
*/
void sendDPs(String metric, List<String> fields, Map<String, String> tags, Long start, Long end, RecordSender recordSender) throws Exception;
/**
* Send data points for RDB with single field.
*/
void sendRecords(String metric, Map<String, String> tags, Long start, Long end, List<String> columns4RDB, RecordSender recordSender) throws Exception;
/**
* Send data points for RDB with multi fields.
*/
void sendRecords(String metric, List<String> fields, Map<String, String> tags, Long start, Long end, List<String> columns4RDB, RecordSender recordSender) throws Exception;
/**
* Put data point.
*
* @param dp data point
* @return whether the data point is written successfully
*/
boolean put(DataPoint4TSDB dp);
/**
* Put data points.
*
* @param dps data points
* @return whether the data point is written successfully
*/
boolean put(List<DataPoint4TSDB> dps);
/**
* Whether current version is supported.
*
* @return true: supported; false: not yet!
*/
boolean isSupported();
}
package com.alibaba.datax.plugin.reader.tsdbreader.conn;
import com.alibaba.fastjson.JSON;
import java.util.Map;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:DataPoint for TSDB with Multi Fields
*
* @author Benedict Jin
* @since 2019-10-21
*/
public class DataPoint4MultiFieldsTSDB {
private long timestamp;
private String metric;
private Map<String, Object> tags;
private Map<String, Object> fields;
public DataPoint4MultiFieldsTSDB() {
}
public DataPoint4MultiFieldsTSDB(long timestamp, String metric, Map<String, Object> tags, Map<String, Object> fields) {
this.timestamp = timestamp;
this.metric = metric;
this.tags = tags;
this.fields = fields;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public String getMetric() {
return metric;
}
public void setMetric(String metric) {
this.metric = metric;
}
public Map<String, Object> getTags() {
return tags;
}
public void setTags(Map<String, Object> tags) {
this.tags = tags;
}
public Map<String, Object> getFields() {
return fields;
}
public void setFields(Map<String, Object> fields) {
this.fields = fields;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}
package com.alibaba.datax.plugin.reader.tsdbreader.conn;
import com.alibaba.fastjson.JSON;
import java.util.Map;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:DataPoint for TSDB
*
* @author Benedict Jin
* @since 2019-10-21
*/
public class DataPoint4TSDB {
private long timestamp;
private String metric;
private Map<String, Object> tags;
private Object value;
public DataPoint4TSDB() {
}
public DataPoint4TSDB(long timestamp, String metric, Map<String, Object> tags, Object value) {
this.timestamp = timestamp;
this.metric = metric;
this.tags = tags;
this.value = value;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public String getMetric() {
return metric;
}
public void setMetric(String metric) {
this.metric = metric;
}
public Map<String, Object> getTags() {
return tags;
}
public void setTags(Map<String, Object> tags) {
this.tags = tags;
}
public Object getValue() {
return value;
}
public void setValue(Object value) {
this.value = value;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}
package com.alibaba.datax.plugin.reader.tsdbreader.conn;
import java.util.List;
import java.util.Map;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Multi Field Query Result
*
* @author Benedict Jin
* @since 2019-10-22
*/
public class MultiFieldQueryResult {
private String metric;
private Map<String, Object> tags;
private List<String> aggregatedTags;
private List<String> columns;
private List<List<Object>> values;
public MultiFieldQueryResult() {
}
public String getMetric() {
return metric;
}
public void setMetric(String metric) {
this.metric = metric;
}
public Map<String, Object> getTags() {
return tags;
}
public void setTags(Map<String, Object> tags) {
this.tags = tags;
}
public List<String> getAggregatedTags() {
return aggregatedTags;
}
public void setAggregatedTags(List<String> aggregatedTags) {
this.aggregatedTags = aggregatedTags;
}
public List<String> getColumns() {
return columns;
}
public void setColumns(List<String> columns) {
this.columns = columns;
}
public List<List<Object>> getValues() {
return values;
}
public void setValues(List<List<Object>> values) {
this.values = values;
}
}
package com.alibaba.datax.plugin.reader.tsdbreader.conn;
import java.util.List;
import java.util.Map;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Query Result
*
* @author Benedict Jin
* @since 2019-09-19
*/
public class QueryResult {
private String metricName;
private Map<String, Object> tags;
private List<String> groupByTags;
private List<String> aggregatedTags;
private Map<String, Object> dps;
public QueryResult() {
}
public String getMetricName() {
return metricName;
}
public void setMetricName(String metricName) {
this.metricName = metricName;
}
public Map<String, Object> getTags() {
return tags;
}
public void setTags(Map<String, Object> tags) {
this.tags = tags;
}
public List<String> getGroupByTags() {
return groupByTags;
}
public void setGroupByTags(List<String> groupByTags) {
this.groupByTags = groupByTags;
}
public List<String> getAggregatedTags() {
return aggregatedTags;
}
public void setAggregatedTags(List<String> aggregatedTags) {
this.aggregatedTags = aggregatedTags;
}
public Map<String, Object> getDps() {
return dps;
}
public void setDps(Map<String, Object> dps) {
this.dps = dps;
}
}
package com.alibaba.datax.plugin.reader.tsdbreader.conn;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.plugin.reader.tsdbreader.util.TSDBUtils;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
import java.util.Map;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:TSDB Connection
*
* @author Benedict Jin
* @since 2019-10-21
*/
public class TSDBConnection implements Connection4TSDB {
private String address;
public TSDBConnection(String address) {
this.address = address;
}
@Override
public String address() {
return address;
}
@Override
public String version() {
return TSDBUtils.version(address);
}
@Override
public String config() {
return TSDBUtils.config(address);
}
@Override
public String[] getSupportVersionPrefix() {
return new String[]{"2.4", "2.5"};
}
@Override
public void sendDPs(String metric, Map<String, String> tags, Long start, Long end, RecordSender recordSender) throws Exception {
TSDBDump.dump4TSDB(this, metric, tags, start, end, recordSender);
}
@Override
public void sendDPs(String metric, List<String> fields, Map<String, String> tags, Long start, Long end, RecordSender recordSender) throws Exception {
TSDBDump.dump4TSDB(this, metric, fields, tags, start, end, recordSender);
}
@Override
public void sendRecords(String metric, Map<String, String> tags, Long start, Long end, List<String> columns4RDB, RecordSender recordSender) throws Exception {
TSDBDump.dump4RDB(this, metric, tags, start, end, columns4RDB, recordSender);
}
@Override
public void sendRecords(String metric, List<String> fields, Map<String, String> tags, Long start, Long end, List<String> columns4RDB, RecordSender recordSender) throws Exception {
TSDBDump.dump4RDB(this, metric, fields, tags, start, end, columns4RDB, recordSender);
}
@Override
public boolean put(DataPoint4TSDB dp) {
return false;
}
@Override
public boolean put(List<DataPoint4TSDB> dps) {
return false;
}
@Override
public boolean isSupported() {
String versionJson = version();
if (StringUtils.isBlank(versionJson)) {
throw new RuntimeException("Cannot get the version!");
}
String version = JSON.parseObject(versionJson).getString("version");
if (StringUtils.isBlank(version)) {
return false;
}
for (String prefix : getSupportVersionPrefix()) {
if (version.startsWith(prefix)) {
return true;
}
}
return false;
}
}
package com.alibaba.datax.plugin.reader.tsdbreader.util;
import com.alibaba.fastjson.JSON;
import org.apache.http.client.fluent.Content;
import org.apache.http.client.fluent.Request;
import org.apache.http.entity.ContentType;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:HttpUtils
*
* @author Benedict Jin
* @since 2019-10-21
*/
public final class HttpUtils {
public final static int CONNECT_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60);
public final static int SOCKET_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60);
private HttpUtils() {
}
public static String get(String url) throws Exception {
Content content = Request.Get(url)
.connectTimeout(CONNECT_TIMEOUT_DEFAULT_IN_MILL)
.socketTimeout(SOCKET_TIMEOUT_DEFAULT_IN_MILL)
.execute()
.returnContent();
if (content == null) {
return null;
}
return content.asString(StandardCharsets.UTF_8);
}
public static String post(String url, Map<String, Object> params) throws Exception {
return post(url, JSON.toJSONString(params), CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
}
public static String post(String url, String params) throws Exception {
return post(url, params, CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
}
public static String post(String url, Map<String, Object> params,
int connectTimeoutInMill, int socketTimeoutInMill) throws Exception {
return post(url, JSON.toJSONString(params), connectTimeoutInMill, socketTimeoutInMill);
}
public static String post(String url, String params,
int connectTimeoutInMill, int socketTimeoutInMill) throws Exception {
Content content = Request.Post(url)
.connectTimeout(connectTimeoutInMill)
.socketTimeout(socketTimeoutInMill)
.addHeader("Content-Type", "application/json")
.bodyString(params, ContentType.APPLICATION_JSON)
.execute()
.returnContent();
if (content == null) {
return null;
}
return content.asString(StandardCharsets.UTF_8);
}
}
package com.alibaba.datax.plugin.reader.tsdbreader.util;
import java.util.concurrent.TimeUnit;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:TimeUtils
*
* @author Benedict Jin
* @since 2019-10-21
*/
public final class TimeUtils {
private TimeUtils() {
}
private static final long SECOND_MASK = 0xFFFFFFFF00000000L;
private static final long HOUR_IN_MILL = TimeUnit.HOURS.toMillis(1);
/**
* Weather the timestamp is second.
*
* @param ts timestamp
*/
public static boolean isSecond(long ts) {
return (ts & SECOND_MASK) == 0;
}
/**
* Get the hour.
*
* @param ms time in millisecond
*/
public static long getTimeInHour(long ms) {
return ms - ms % HOUR_IN_MILL;
}
}
{
"name": "tsdbreader",
"class": "com.alibaba.datax.plugin.reader.tsdbreader.TSDBReader",
"description": {
"useScene": "从 TSDB 中摄取数据点",
"mechanism": "通过 /api/query 接口查询出符合条件的数据点",
"warn": "指定起止时间会自动忽略分钟和秒,转为整点时刻,例如 2019-4-18 的 [3:35, 4:55) 会被转为 [3:00, 4:00)"
},
"developer": "Benedict Jin"
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment