Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
D
DataX
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
risk-feature
DataX
Commits
10371104
Unverified
Commit
10371104
authored
Oct 11, 2019
by
Trafalgar
Committed by
GitHub
Oct 11, 2019
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #471 from sufism/cassandra
cassandra plugins
parents
57c8dd86
2f8cc74b
Changes
33
Hide whitespace changes
Inline
Side-by-side
Showing
33 changed files
with
2321 additions
and
0 deletions
+2321
-0
README.md
README.md
+1
-0
cassandrareader.md
cassandrareader/doc/cassandrareader.md
+217
-0
pom.xml
cassandrareader/pom.xml
+133
-0
package.xml
cassandrareader/src/main/assembly/package.xml
+35
-0
CassandraReader.java
.../datax/plugin/reader/cassandrareader/CassandraReader.java
+123
-0
CassandraReaderErrorCode.java
...ugin/reader/cassandrareader/CassandraReaderErrorCode.java
+32
-0
CassandraReaderHelper.java
.../plugin/reader/cassandrareader/CassandraReaderHelper.java
+607
-0
Key.java
.../com/alibaba/datax/plugin/reader/cassandrareader/Key.java
+39
-0
LocalStrings.properties
...tax/plugin/reader/cassandrareader/LocalStrings.properties
+1
-0
LocalStrings_en_US.properties
...ugin/reader/cassandrareader/LocalStrings_en_US.properties
+0
-0
LocalStrings_ja_JP.properties
...ugin/reader/cassandrareader/LocalStrings_ja_JP.properties
+1
-0
LocalStrings_zh_CN.properties
...ugin/reader/cassandrareader/LocalStrings_zh_CN.properties
+1
-0
LocalStrings_zh_HK.properties
...ugin/reader/cassandrareader/LocalStrings_zh_HK.properties
+1
-0
LocalStrings_zh_TW.properties
...ugin/reader/cassandrareader/LocalStrings_zh_TW.properties
+1
-0
plugin.json
cassandrareader/src/main/resources/plugin.json
+6
-0
plugin_job_template.json
cassandrareader/src/main/resources/plugin_job_template.json
+15
-0
cassandrawriter.md
cassandrawriter/doc/cassandrawriter.md
+227
-0
pom.xml
cassandrawriter/pom.xml
+125
-0
package.xml
cassandrawriter/src/main/assembly/package.xml
+35
-0
CassandraWriter.java
.../datax/plugin/writer/cassandrawriter/CassandraWriter.java
+242
-0
CassandraWriterErrorCode.java
...ugin/writer/cassandrawriter/CassandraWriterErrorCode.java
+35
-0
CassandraWriterHelper.java
.../plugin/writer/cassandrawriter/CassandraWriterHelper.java
+351
-0
Key.java
.../com/alibaba/datax/plugin/writer/cassandrawriter/Key.java
+43
-0
LocalStrings.properties
...tax/plugin/writer/cassandrawriter/LocalStrings.properties
+2
-0
LocalStrings_en_US.properties
...ugin/writer/cassandrawriter/LocalStrings_en_US.properties
+2
-0
LocalStrings_ja_JP.properties
...ugin/writer/cassandrawriter/LocalStrings_ja_JP.properties
+2
-0
LocalStrings_zh_CN.properties
...ugin/writer/cassandrawriter/LocalStrings_zh_CN.properties
+2
-0
LocalStrings_zh_HK.properties
...ugin/writer/cassandrawriter/LocalStrings_zh_HK.properties
+2
-0
LocalStrings_zh_TW.properties
...ugin/writer/cassandrawriter/LocalStrings_zh_TW.properties
+2
-0
plugin.json
cassandrawriter/src/main/resources/plugin.json
+7
-0
plugin_job_template.json
cassandrawriter/src/main/resources/plugin_job_template.json
+15
-0
package.xml
package.xml
+14
-0
pom.xml
pom.xml
+2
-0
No files found.
README.md
View file @
10371104
...
@@ -51,6 +51,7 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N
...
@@ -51,6 +51,7 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N
| | Phoenix5.x | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/hbase20xsqlreader/doc/hbase20xsqlreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/hbase20xsqlwriter/doc/hbase20xsqlwriter.md
)
|
| | Phoenix5.x | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/hbase20xsqlreader/doc/hbase20xsqlreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/hbase20xsqlwriter/doc/hbase20xsqlwriter.md
)
|
| | MongoDB | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/mongoreader/doc/mongoreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/mongowriter/doc/mongowriter.md
)
|
| | MongoDB | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/mongoreader/doc/mongoreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/mongowriter/doc/mongowriter.md
)
|
| | Hive | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md
)
|
| | Hive | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md
)
|
| | Cassandra | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/cassandrareader/doc/cassandrareader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/cassandrawriter/doc/cassandrawriter.md
)
|
| 无结构化数据存储 | TxtFile | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md
)
|
| 无结构化数据存储 | TxtFile | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md
)
|
| | FTP | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md
)
|
| | FTP | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md
)
|
| | HDFS | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md
)
|
| | HDFS | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md
)
|
...
...
cassandrareader/doc/cassandrareader.md
0 → 100644
View file @
10371104
# CassandraReader 插件文档
__
_
## 1 快速介绍
CassandraReader插件实现了从Cassandra读取数据。在底层实现上,CassandraReader通过datastax的java driver连接Cassandra实例,并执行相应的cql语句将数据从cassandra中SELECT出来。
## 2 实现原理
简而言之,CassandraReader通过java driver连接到Cassandra实例,并根据用户配置的信息生成查询SELECT CQL语句,然后发送到Cassandra,并将该CQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。
对于用户配置Table、Column的信息,CassandraReader将其拼接为CQL语句发送到Cassandra。
## 3 功能说明
### 3.1 配置样例
*
配置一个从Cassandra同步抽取数据到本地的作业:
```
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "cassandrareader",
"parameter": {
"host": "localhost",
"port": 9042,
"useSSL": false,
"keyspace": "test",
"table": "datax_src",
"column": [
"textCol",
"blobCol",
"writetime(blobCol)",
"boolCol",
"smallintCol",
"tinyintCol",
"intCol",
"bigintCol",
"varintCol",
"floatCol",
"doubleCol",
"decimalCol",
"dateCol",
"timeCol",
"timeStampCol",
"uuidCol",
"inetCol",
"durationCol",
"listCol",
"mapCol",
"setCol"
"tupleCol"
"udtCol",
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print":true
}
}
}
]
}
}
```
### 3.2 参数说明
*
**host**
* 描述:Cassandra连接点的域名或ip,多个node之间用逗号分隔。 <br />
* 必选:是 <br />
* 默认值:无 <br />
*
**port**
* 描述:Cassandra端口。 <br />
* 必选:是 <br />
* 默认值:9042 <br />
*
**username**
* 描述:数据源的用户名 <br />
* 必选:否 <br />
* 默认值:无 <br />
*
**password**
* 描述:数据源指定用户名的密码 <br />
* 必选:否 <br />
* 默认值:无 <br />
*
**useSSL**
* 描述:是否使用SSL连接。<br />
* 必选:否 <br />
* 默认值:false <br />
*
**keyspace**
* 描述:需要同步的表所在的keyspace。<br />
* 必选:是 <br />
* 默认值:无 <br />
*
**table**
* 描述:所选取的需要同步的表。<br />
* 必选:是 <br />
* 默认值:无 <br />
*
**column**
* 描述:所配置的表中需要同步的列集合。<br />
其中的元素可以指定列的名称或writetime(column_name),后一种形式会读取column_name列的时间戳而不是数据。
* 必选:是 <br />
* 默认值:无 <br />
*
**where**
* 描述:数据筛选条件的cql表达式,例如:<br />
```
"where":"textcol='a'"
```
* 必选:否 <br />
* 默认值:无 <br />
*
**allowFiltering**
* 描述:是否在服务端过滤数据。参考cassandra文档中ALLOW FILTERING关键字的相关描述。<br />
* 必选:否 <br />
* 默认值:无 <br />
*
**consistancyLevel**
* 描述:数据一致性级别。可选ONE|QUORUM|LOCAL_QUORUM|EACH_QUORUM|ALL|ANY|TWO|THREE|LOCAL_ONE<br />
* 必选:否 <br />
* 默认值:LOCAL_QUORUM <br />
### 3.3 类型转换
目前CassandraReader支持除counter和Custom类型之外的所有类型。
下面列出CassandraReader针对Cassandra类型转换列表:
| DataX 内部类型| Cassandra 数据类型 |
| -------- | ----- |
| Long |int, tinyint, smallint,varint,bigint,time|
| Double |float, double, decimal|
| String |ascii,varchar, text,uuid,timeuuid,duration,list,map,set,tuple,udt,inet |
| Date |date, timestamp |
| Boolean |bool |
| Bytes |blob |
请注意:
*
目前不支持counter类型和custom类型。
## 4 性能报告
略
## 5 约束限制
### 5.1 主备同步数据恢复问题
略
## 6 FAQ
cassandrareader/pom.xml
0 → 100644
View file @
10371104
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<parent>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-all
</artifactId>
<version>
0.0.1-SNAPSHOT
</version>
</parent>
<artifactId>
cassandrareader
</artifactId>
<name>
cassandrareader
</name>
<packaging>
jar
</packaging>
<dependencies>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-common
</artifactId>
<version>
${datax-project-version}
</version>
<exclusions>
<exclusion>
<artifactId>
slf4j-log4j12
</artifactId>
<groupId>
org.slf4j
</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
</dependency>
<dependency>
<groupId>
ch.qos.logback
</groupId>
<artifactId>
logback-classic
</artifactId>
</dependency>
<dependency>
<groupId>
com.datastax.cassandra
</groupId>
<artifactId>
cassandra-driver-core
</artifactId>
<version>
3.7.2
</version>
<classifier>
shaded
</classifier>
<exclusions>
<exclusion>
<groupId>
com.google.guava
</groupId>
<artifactId>
guava
</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
com.google.guava
</groupId>
<artifactId>
guava
</artifactId>
<version>
16.0.1
</version>
</dependency>
<dependency>
<groupId>
commons-codec
</groupId>
<artifactId>
commons-codec
</artifactId>
<version>
1.9
</version>
</dependency>
<!-- for test -->
<dependency>
<groupId>
junit
</groupId>
<artifactId>
junit
</artifactId>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-core
</artifactId>
<version>
${datax-project-version}
</version>
<exclusions>
<exclusion>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-service-face
</artifactId>
</exclusion>
<exclusion>
<groupId>
org.apache.hadoop
</groupId>
<artifactId>
hadoop-common
</artifactId>
</exclusion>
<exclusion>
<groupId>
org.apache.hive
</groupId>
<artifactId>
hive-exec
</artifactId>
</exclusion>
<exclusion>
<groupId>
org.apache.hive
</groupId>
<artifactId>
hive-serde
</artifactId>
</exclusion>
<exclusion>
<groupId>
javolution
</groupId>
<artifactId>
javolution
</artifactId>
</exclusion>
</exclusions>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.mockito
</groupId>
<artifactId>
mockito-all
</artifactId>
<version>
1.9.5
</version>
<scope>
test
</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>
maven-compiler-plugin
</artifactId>
<configuration>
<source>
1.6
</source>
<target>
1.6
</target>
<encoding>
${project-sourceEncoding}
</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>
maven-assembly-plugin
</artifactId>
<configuration>
<descriptors>
<descriptor>
src/main/assembly/package.xml
</descriptor>
</descriptors>
<finalName>
datax
</finalName>
</configuration>
<executions>
<execution>
<id>
dwzip
</id>
<phase>
package
</phase>
<goals>
<goal>
single
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
cassandrareader/src/main/assembly/package.xml
0 → 100644
View file @
10371104
<assembly
xmlns=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"
>
<id></id>
<formats>
<format>
dir
</format>
</formats>
<includeBaseDirectory>
false
</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>
src/main/resources
</directory>
<includes>
<include>
plugin.json
</include>
<include>
plugin_job_template.json
</include>
</includes>
<outputDirectory>
plugin/reader/cassandrareader
</outputDirectory>
</fileSet>
<fileSet>
<directory>
target/
</directory>
<includes>
<include>
cassandrareader-0.0.1-SNAPSHOT.jar
</include>
</includes>
<outputDirectory>
plugin/reader/cassandrareader
</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>
false
</useProjectArtifact>
<outputDirectory>
plugin/reader/cassandrareader/libs
</outputDirectory>
<scope>
runtime
</scope>
</dependencySet>
</dependencySets>
</assembly>
cassandrareader/src/main/java/com/alibaba/datax/plugin/reader/cassandrareader/CassandraReader.java
0 → 100644
View file @
10371104
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
cassandrareader
;
import
com.alibaba.datax.common.element.Record
;
import
com.alibaba.datax.common.plugin.RecordSender
;
import
com.alibaba.datax.common.spi.Reader
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.datastax.driver.core.Cluster
;
import
com.datastax.driver.core.ConsistencyLevel
;
import
com.datastax.driver.core.ResultSet
;
import
com.datastax.driver.core.Row
;
import
com.datastax.driver.core.Session
;
import
com.datastax.driver.core.SimpleStatement
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.List
;
public
class
CassandraReader
extends
Reader
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
CassandraReader
.
class
);
public
static
class
Job
extends
Reader
.
Job
{
private
Configuration
jobConfig
=
null
;
private
Cluster
cluster
=
null
;
@Override
public
void
init
()
{
this
.
jobConfig
=
super
.
getPluginJobConf
();
this
.
jobConfig
=
super
.
getPluginJobConf
();
String
username
=
jobConfig
.
getString
(
Key
.
USERNAME
);
String
password
=
jobConfig
.
getString
(
Key
.
PASSWORD
);
String
hosts
=
jobConfig
.
getString
(
Key
.
HOST
);
Integer
port
=
jobConfig
.
getInt
(
Key
.
PORT
,
9042
);
boolean
useSSL
=
jobConfig
.
getBool
(
Key
.
USESSL
);
if
((
username
!=
null
)
&&
!
username
.
isEmpty
())
{
Cluster
.
Builder
clusterBuilder
=
Cluster
.
builder
().
withCredentials
(
username
,
password
)
.
withPort
(
Integer
.
valueOf
(
port
)).
addContactPoints
(
hosts
.
split
(
","
));
if
(
useSSL
)
{
clusterBuilder
=
clusterBuilder
.
withSSL
();
}
cluster
=
clusterBuilder
.
build
();
}
else
{
cluster
=
Cluster
.
builder
().
withPort
(
Integer
.
valueOf
(
port
))
.
addContactPoints
(
hosts
.
split
(
","
)).
build
();
}
CassandraReaderHelper
.
checkConfig
(
jobConfig
,
cluster
);
}
@Override
public
void
destroy
()
{
}
@Override
public
List
<
Configuration
>
split
(
int
adviceNumber
)
{
List
<
Configuration
>
splittedConfigs
=
CassandraReaderHelper
.
splitJob
(
adviceNumber
,
jobConfig
,
cluster
);
return
splittedConfigs
;
}
}
public
static
class
Task
extends
Reader
.
Task
{
private
Configuration
taskConfig
;
private
Cluster
cluster
=
null
;
private
Session
session
=
null
;
private
String
queryString
=
null
;
private
ConsistencyLevel
consistencyLevel
;
private
int
columnNumber
=
0
;
private
List
<
String
>
columnMeta
=
null
;
@Override
public
void
init
()
{
this
.
taskConfig
=
super
.
getPluginJobConf
();
String
username
=
taskConfig
.
getString
(
Key
.
USERNAME
);
String
password
=
taskConfig
.
getString
(
Key
.
PASSWORD
);
String
hosts
=
taskConfig
.
getString
(
Key
.
HOST
);
Integer
port
=
taskConfig
.
getInt
(
Key
.
PORT
);
boolean
useSSL
=
taskConfig
.
getBool
(
Key
.
USESSL
);
String
keyspace
=
taskConfig
.
getString
(
Key
.
KEYSPACE
);
this
.
columnMeta
=
taskConfig
.
getList
(
Key
.
COLUMN
,
String
.
class
);
columnNumber
=
columnMeta
.
size
();
if
((
username
!=
null
)
&&
!
username
.
isEmpty
())
{
Cluster
.
Builder
clusterBuilder
=
Cluster
.
builder
().
withCredentials
(
username
,
password
)
.
withPort
(
Integer
.
valueOf
(
port
)).
addContactPoints
(
hosts
.
split
(
","
));
if
(
useSSL
)
{
clusterBuilder
=
clusterBuilder
.
withSSL
();
}
cluster
=
clusterBuilder
.
build
();
}
else
{
cluster
=
Cluster
.
builder
().
withPort
(
Integer
.
valueOf
(
port
))
.
addContactPoints
(
hosts
.
split
(
","
)).
build
();
}
session
=
cluster
.
connect
(
keyspace
);
String
cl
=
taskConfig
.
getString
(
Key
.
CONSITANCY_LEVEL
);
if
(
cl
!=
null
&&
!
cl
.
isEmpty
()
)
{
consistencyLevel
=
ConsistencyLevel
.
valueOf
(
cl
);
}
else
{
consistencyLevel
=
ConsistencyLevel
.
LOCAL_QUORUM
;
}
queryString
=
CassandraReaderHelper
.
getQueryString
(
taskConfig
,
cluster
);
LOG
.
info
(
"query = "
+
queryString
);
}
@Override
public
void
startRead
(
RecordSender
recordSender
)
{
ResultSet
r
=
session
.
execute
(
new
SimpleStatement
(
queryString
).
setConsistencyLevel
(
consistencyLevel
));
for
(
Row
row
:
r
)
{
Record
record
=
recordSender
.
createRecord
();
record
=
CassandraReaderHelper
.
buildRecord
(
record
,
row
,
r
.
getColumnDefinitions
(),
columnNumber
,
super
.
getTaskPluginCollector
());
if
(
record
!=
null
)
recordSender
.
sendToWriter
(
record
);
}
}
@Override
public
void
destroy
()
{
}
}
}
cassandrareader/src/main/java/com/alibaba/datax/plugin/reader/cassandrareader/CassandraReaderErrorCode.java
0 → 100644
View file @
10371104
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
cassandrareader
;
import
com.alibaba.datax.common.spi.ErrorCode
;
public
enum
CassandraReaderErrorCode
implements
ErrorCode
{
CONF_ERROR
(
"CassandraReader-00"
,
"配置错误."
),
;
private
final
String
code
;
private
final
String
description
;
private
CassandraReaderErrorCode
(
String
code
,
String
description
)
{
this
.
code
=
code
;
this
.
description
=
description
;
}
@Override
public
String
getCode
()
{
return
this
.
code
;
}
@Override
public
String
getDescription
()
{
return
this
.
description
;
}
@Override
public
String
toString
()
{
return
String
.
format
(
"Code:[%s], Description:[%s]. "
,
this
.
code
,
this
.
description
);
}
}
cassandrareader/src/main/java/com/alibaba/datax/plugin/reader/cassandrareader/CassandraReaderHelper.java
0 → 100644
View file @
10371104
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
cassandrareader
;
import
java.math.BigDecimal
;
import
java.math.BigInteger
;
import
java.net.InetAddress
;
import
java.nio.ByteBuffer
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
java.util.Date
;
import
java.util.HashMap
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Set
;
import
com.alibaba.datax.common.element.BoolColumn
;
import
com.alibaba.datax.common.element.BytesColumn
;
import
com.alibaba.datax.common.element.DateColumn
;
import
com.alibaba.datax.common.element.DoubleColumn
;
import
com.alibaba.datax.common.element.LongColumn
;
import
com.alibaba.datax.common.element.Record
;
import
com.alibaba.datax.common.element.StringColumn
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.plugin.TaskPluginCollector
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.fastjson.JSON
;
import
com.datastax.driver.core.Cluster
;
import
com.datastax.driver.core.CodecRegistry
;
import
com.datastax.driver.core.ColumnDefinitions
;
import
com.datastax.driver.core.ColumnMetadata
;
import
com.datastax.driver.core.DataType
;
import
com.datastax.driver.core.Duration
;
import
com.datastax.driver.core.LocalDate
;
import
com.datastax.driver.core.Row
;
import
com.datastax.driver.core.TableMetadata
;
import
com.datastax.driver.core.TupleType
;
import
com.datastax.driver.core.TupleValue
;
import
com.datastax.driver.core.UDTValue
;
import
com.datastax.driver.core.UserType
;
import
com.google.common.reflect.TypeToken
;
import
org.apache.commons.codec.binary.Base64
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* Created by mazhenlin on 2019/8/21.
*/
public
class
CassandraReaderHelper
{
static
CodecRegistry
registry
=
new
CodecRegistry
();
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
CassandraReader
.
class
);
static
class
TypeNotSupported
extends
Exception
{}
static
String
toJSonString
(
Object
o
,
DataType
type
)
throws
Exception
{
if
(
o
==
null
)
return
JSON
.
toJSONString
(
null
);
switch
(
type
.
getName
())
{
case
LIST:
case
MAP:
case
SET:
case
TUPLE:
case
UDT:
return
JSON
.
toJSONString
(
transferObjectForJson
(
o
,
type
));
default
:
return
JSON
.
toJSONString
(
o
);
}
}
static
Object
transferObjectForJson
(
Object
o
,
DataType
type
)
throws
TypeNotSupported
{
if
(
o
==
null
)
return
o
;
switch
(
type
.
getName
())
{
case
ASCII:
case
TEXT:
case
VARCHAR:
case
BOOLEAN:
case
SMALLINT:
case
TINYINT:
case
INT:
case
BIGINT:
case
VARINT:
case
FLOAT:
case
DOUBLE:
case
DECIMAL:
case
UUID:
case
TIMEUUID:
case
TIME:
return
o
;
case
BLOB:
ByteBuffer
byteBuffer
=
(
ByteBuffer
)
o
;
String
s
=
Base64
.
encodeBase64String
(
Arrays
.
copyOfRange
(
byteBuffer
.
array
(),
byteBuffer
.
position
(),
byteBuffer
.
limit
()));
return
s
;
case
DATE:
return
((
LocalDate
)
o
).
getMillisSinceEpoch
();
case
TIMESTAMP:
return
((
Date
)
o
).
getTime
();
case
DURATION:
return
o
.
toString
();
case
INET:
return
((
InetAddress
)
o
).
getHostAddress
();
case
LIST:
{
return
transferListForJson
((
List
)
o
,
type
.
getTypeArguments
().
get
(
0
));
}
case
MAP:
{
DataType
keyType
=
type
.
getTypeArguments
().
get
(
0
);
DataType
valType
=
type
.
getTypeArguments
().
get
(
1
);
return
transferMapForJson
((
Map
)
o
,
keyType
,
valType
);
}
case
SET:
{
return
transferSetForJson
((
Set
)
o
,
type
.
getTypeArguments
().
get
(
0
));
}
case
TUPLE:
{
return
transferTupleForJson
((
TupleValue
)
o
,((
TupleType
)
type
).
getComponentTypes
());
}
case
UDT:
{
return
transferUDTForJson
((
UDTValue
)
o
);
}
default
:
throw
new
TypeNotSupported
();
}
}
static
List
transferListForJson
(
List
clist
,
DataType
eleType
)
throws
TypeNotSupported
{
List
result
=
new
ArrayList
();
switch
(
eleType
.
getName
())
{
case
ASCII:
case
TEXT:
case
VARCHAR:
case
BOOLEAN:
case
SMALLINT:
case
TINYINT:
case
INT:
case
BIGINT:
case
VARINT:
case
FLOAT:
case
DOUBLE:
case
DECIMAL:
case
TIME:
case
UUID:
case
TIMEUUID:
return
clist
;
case
BLOB:
case
DATE:
case
TIMESTAMP:
case
DURATION:
case
INET:
case
LIST:
case
MAP:
case
SET:
case
TUPLE:
case
UDT:
for
(
Object
item
:
clist
)
{
Object
newItem
=
transferObjectForJson
(
item
,
eleType
);
result
.
add
(
newItem
);
}
break
;
default
:
throw
new
TypeNotSupported
();
}
return
result
;
}
static
Set
transferSetForJson
(
Set
cset
,
DataType
eleType
)
throws
TypeNotSupported
{
Set
result
=
new
HashSet
();
switch
(
eleType
.
getName
())
{
case
ASCII:
case
TEXT:
case
VARCHAR:
case
BOOLEAN:
case
SMALLINT:
case
TINYINT:
case
INT:
case
BIGINT:
case
VARINT:
case
FLOAT:
case
DOUBLE:
case
DECIMAL:
case
TIME:
case
UUID:
case
TIMEUUID:
return
cset
;
case
BLOB:
case
DATE:
case
TIMESTAMP:
case
DURATION:
case
INET:
case
LIST:
case
MAP:
case
SET:
case
TUPLE:
case
UDT:
for
(
Object
item
:
cset
)
{
Object
newItem
=
transferObjectForJson
(
item
,
eleType
);
result
.
add
(
newItem
);
}
break
;
default
:
throw
new
TypeNotSupported
();
}
return
result
;
}
static
Map
transferMapForJson
(
Map
cmap
,
DataType
keyType
,
DataType
valueType
)
throws
TypeNotSupported
{
Map
newMap
=
new
HashMap
();
for
(
Object
e
:
cmap
.
entrySet
()
)
{
Object
k
=
((
Map
.
Entry
)
e
).
getKey
();
Object
v
=
((
Map
.
Entry
)
e
).
getValue
();
Object
newKey
=
transferObjectForJson
(
k
,
keyType
);
Object
newValue
=
transferObjectForJson
(
v
,
valueType
);
if
(
!(
newKey
instanceof
String
)
)
{
newKey
=
JSON
.
toJSONString
(
newKey
);
}
newMap
.
put
(
newKey
,
newValue
);
}
return
newMap
;
}
static
List
transferTupleForJson
(
TupleValue
tupleValue
,
List
<
DataType
>
componentTypes
)
throws
TypeNotSupported
{
List
l
=
new
ArrayList
();
for
(
int
j
=
0
;
j
<
componentTypes
.
size
();
j
++
)
{
DataType
dataType
=
componentTypes
.
get
(
j
);
TypeToken
<?>
eltClass
=
registry
.
codecFor
(
dataType
).
getJavaType
();
Object
ele
=
tupleValue
.
get
(
j
,
eltClass
);
l
.
add
(
transferObjectForJson
(
ele
,
dataType
));
}
return
l
;
}
static
Map
transferUDTForJson
(
UDTValue
udtValue
)
throws
TypeNotSupported
{
Map
<
String
,
Object
>
newMap
=
new
HashMap
();
int
j
=
0
;
for
(
UserType
.
Field
f
:
udtValue
.
getType
())
{
DataType
dataType
=
f
.
getType
();
TypeToken
<?>
eltClass
=
registry
.
codecFor
(
dataType
).
getJavaType
();
Object
ele
=
udtValue
.
get
(
j
,
eltClass
);
newMap
.
put
(
f
.
getName
(),
transferObjectForJson
(
ele
,
dataType
));
j
++;
}
return
newMap
;
}
static
Record
buildRecord
(
Record
record
,
Row
rs
,
ColumnDefinitions
metaData
,
int
columnNumber
,
TaskPluginCollector
taskPluginCollector
)
{
try
{
for
(
int
i
=
0
;
i
<
columnNumber
;
i
++)
try
{
if
(
rs
.
isNull
(
i
))
{
record
.
addColumn
(
new
StringColumn
());
continue
;
}
switch
(
metaData
.
getType
(
i
).
getName
())
{
case
ASCII:
case
TEXT:
case
VARCHAR:
record
.
addColumn
(
new
StringColumn
(
rs
.
getString
(
i
)));
break
;
case
BLOB:
record
.
addColumn
(
new
BytesColumn
(
rs
.
getBytes
(
i
).
array
()));
break
;
case
BOOLEAN:
record
.
addColumn
(
new
BoolColumn
(
rs
.
getBool
(
i
)));
break
;
case
SMALLINT:
record
.
addColumn
(
new
LongColumn
((
int
)
rs
.
getShort
(
i
)));
break
;
case
TINYINT:
record
.
addColumn
(
new
LongColumn
((
int
)
rs
.
getByte
(
i
)));
break
;
case
INT:
record
.
addColumn
(
new
LongColumn
(
rs
.
getInt
(
i
)));
break
;
case
BIGINT:
record
.
addColumn
(
new
LongColumn
(
rs
.
getLong
(
i
)));
break
;
case
VARINT:
record
.
addColumn
(
new
LongColumn
(
rs
.
getVarint
(
i
)));
break
;
case
FLOAT:
record
.
addColumn
(
new
DoubleColumn
(
rs
.
getFloat
(
i
)));
break
;
case
DOUBLE:
record
.
addColumn
(
new
DoubleColumn
(
rs
.
getDouble
(
i
)));
break
;
case
DECIMAL:
record
.
addColumn
(
new
DoubleColumn
(
rs
.
getDecimal
(
i
)));
break
;
case
DATE:
record
.
addColumn
(
new
DateColumn
(
rs
.
getDate
(
i
).
getMillisSinceEpoch
()));
break
;
case
TIME:
record
.
addColumn
(
new
LongColumn
(
rs
.
getTime
(
i
)));
break
;
case
TIMESTAMP:
record
.
addColumn
(
new
DateColumn
(
rs
.
getTimestamp
(
i
)));
break
;
case
UUID:
case
TIMEUUID:
record
.
addColumn
(
new
StringColumn
(
rs
.
getUUID
(
i
).
toString
()));
break
;
case
INET:
record
.
addColumn
(
new
StringColumn
(
rs
.
getInet
(
i
).
getHostAddress
()));
break
;
case
DURATION:
record
.
addColumn
(
new
StringColumn
(
rs
.
get
(
i
,
Duration
.
class
).
toString
()));
break
;
case
LIST:
{
TypeToken
listEltClass
=
registry
.
codecFor
(
metaData
.
getType
(
i
).
getTypeArguments
().
get
(
0
)).
getJavaType
();
List
<?>
l
=
rs
.
getList
(
i
,
listEltClass
);
record
.
addColumn
(
new
StringColumn
(
toJSonString
(
l
,
metaData
.
getType
(
i
))));
}
break
;
case
MAP:
{
DataType
keyType
=
metaData
.
getType
(
i
).
getTypeArguments
().
get
(
0
);
DataType
valType
=
metaData
.
getType
(
i
).
getTypeArguments
().
get
(
1
);
TypeToken
<?>
keyEltClass
=
registry
.
codecFor
(
keyType
).
getJavaType
();
TypeToken
<?>
valEltClass
=
registry
.
codecFor
(
valType
).
getJavaType
();
Map
<?,?>
m
=
rs
.
getMap
(
i
,
keyEltClass
,
valEltClass
);
record
.
addColumn
(
new
StringColumn
(
toJSonString
(
m
,
metaData
.
getType
(
i
))));
}
break
;
case
SET:
{
TypeToken
<?>
setEltClass
=
registry
.
codecFor
(
metaData
.
getType
(
i
).
getTypeArguments
().
get
(
0
))
.
getJavaType
();
Set
<?>
set
=
rs
.
getSet
(
i
,
setEltClass
);
record
.
addColumn
(
new
StringColumn
(
toJSonString
(
set
,
metaData
.
getType
(
i
))));
}
break
;
case
TUPLE:
{
TupleValue
t
=
rs
.
getTupleValue
(
i
);
record
.
addColumn
(
new
StringColumn
(
toJSonString
(
t
,
metaData
.
getType
(
i
))));
}
break
;
case
UDT:
{
UDTValue
t
=
rs
.
getUDTValue
(
i
);
record
.
addColumn
(
new
StringColumn
(
toJSonString
(
t
,
metaData
.
getType
(
i
))));
}
break
;
default
:
throw
DataXException
.
asDataXException
(
CassandraReaderErrorCode
.
CONF_ERROR
,
String
.
format
(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[%s], "
+
"字段类型:[%s]. "
,
metaData
.
getName
(
i
),
metaData
.
getType
(
i
)));
}
}
catch
(
TypeNotSupported
t
)
{
throw
DataXException
.
asDataXException
(
CassandraReaderErrorCode
.
CONF_ERROR
,
String
.
format
(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[%s], "
+
"字段类型:[%s]. "
,
metaData
.
getName
(
i
),
metaData
.
getType
(
i
)));
}
}
catch
(
Exception
e
)
{
//TODO 这里识别为脏数据靠谱吗?
taskPluginCollector
.
collectDirtyRecord
(
record
,
e
);
if
(
e
instanceof
DataXException
)
{
throw
(
DataXException
)
e
;
}
return
null
;
}
return
record
;
}
public
static
List
<
Configuration
>
splitJob
(
int
adviceNumber
,
Configuration
jobConfig
,
Cluster
cluster
)
{
List
<
Configuration
>
splittedConfigs
=
new
ArrayList
<
Configuration
>();
if
(
adviceNumber
<=
1
)
{
splittedConfigs
.
add
(
jobConfig
);
return
splittedConfigs
;
}
String
where
=
jobConfig
.
getString
(
Key
.
WHERE
);
if
(
where
!=
null
&&
where
.
toLowerCase
().
contains
(
"token("
))
{
splittedConfigs
.
add
(
jobConfig
);
return
splittedConfigs
;
}
String
partitioner
=
cluster
.
getMetadata
().
getPartitioner
();
if
(
partitioner
.
endsWith
(
"RandomPartitioner"
))
{
BigDecimal
minToken
=
BigDecimal
.
valueOf
(-
1
);
BigDecimal
maxToken
=
new
BigDecimal
(
new
BigInteger
(
"2"
).
pow
(
127
));
BigDecimal
step
=
maxToken
.
subtract
(
minToken
)
.
divide
(
BigDecimal
.
valueOf
(
adviceNumber
),
2
,
BigDecimal
.
ROUND_HALF_EVEN
);
for
(
int
i
=
0
;
i
<
adviceNumber
;
i
++
)
{
BigInteger
l
=
minToken
.
add
(
step
.
multiply
(
BigDecimal
.
valueOf
(
i
))).
toBigInteger
();
BigInteger
r
=
minToken
.
add
(
step
.
multiply
(
BigDecimal
.
valueOf
(
i
+
1
))).
toBigInteger
();
if
(
i
==
adviceNumber
-
1
)
{
r
=
maxToken
.
toBigInteger
();
}
Configuration
taskConfig
=
jobConfig
.
clone
();
taskConfig
.
set
(
Key
.
MIN_TOKEN
,
l
.
toString
());
taskConfig
.
set
(
Key
.
MAX_TOKEN
,
r
.
toString
());
splittedConfigs
.
add
(
taskConfig
);
}
}
else
if
(
partitioner
.
endsWith
(
"Murmur3Partitioner"
)
)
{
BigDecimal
minToken
=
BigDecimal
.
valueOf
(
Long
.
MIN_VALUE
);
BigDecimal
maxToken
=
BigDecimal
.
valueOf
(
Long
.
MAX_VALUE
);
BigDecimal
step
=
maxToken
.
subtract
(
minToken
)
.
divide
(
BigDecimal
.
valueOf
(
adviceNumber
),
2
,
BigDecimal
.
ROUND_HALF_EVEN
);
for
(
int
i
=
0
;
i
<
adviceNumber
;
i
++
)
{
long
l
=
minToken
.
add
(
step
.
multiply
(
BigDecimal
.
valueOf
(
i
))).
longValue
();
long
r
=
minToken
.
add
(
step
.
multiply
(
BigDecimal
.
valueOf
(
i
+
1
))).
longValue
();
if
(
i
==
adviceNumber
-
1
)
{
r
=
maxToken
.
longValue
();
}
Configuration
taskConfig
=
jobConfig
.
clone
();
taskConfig
.
set
(
Key
.
MIN_TOKEN
,
String
.
valueOf
(
l
));
taskConfig
.
set
(
Key
.
MAX_TOKEN
,
String
.
valueOf
(
r
));
splittedConfigs
.
add
(
taskConfig
);
}
}
else
{
splittedConfigs
.
add
(
jobConfig
);
}
return
splittedConfigs
;
}
public
static
String
getQueryString
(
Configuration
taskConfig
,
Cluster
cluster
)
{
List
<
String
>
columnMeta
=
taskConfig
.
getList
(
Key
.
COLUMN
,
String
.
class
);
String
keyspace
=
taskConfig
.
getString
(
Key
.
KEYSPACE
);
String
table
=
taskConfig
.
getString
(
Key
.
TABLE
);
StringBuilder
columns
=
new
StringBuilder
();
for
(
String
column
:
columnMeta
)
{
if
(
columns
.
length
()
>
0
)
{
columns
.
append
(
","
);
}
columns
.
append
(
column
);
}
StringBuilder
where
=
new
StringBuilder
();
String
whereString
=
taskConfig
.
getString
(
Key
.
WHERE
);
if
(
whereString
!=
null
&&
!
whereString
.
isEmpty
()
)
{
where
.
append
(
whereString
);
}
String
minToken
=
taskConfig
.
getString
(
Key
.
MIN_TOKEN
);
String
maxToken
=
taskConfig
.
getString
(
Key
.
MAX_TOKEN
);
if
(
minToken
!=
null
||
maxToken
!=
null
)
{
LOG
.
info
(
"range:"
+
minToken
+
"~"
+
maxToken
);
List
<
ColumnMetadata
>
pks
=
cluster
.
getMetadata
().
getKeyspace
(
keyspace
).
getTable
(
table
).
getPartitionKey
();
StringBuilder
sb
=
new
StringBuilder
();
for
(
ColumnMetadata
pk
:
pks
)
{
if
(
sb
.
length
()
>
0
)
{
sb
.
append
(
","
);
}
sb
.
append
(
pk
.
getName
());
}
String
s
=
sb
.
toString
();
if
(
minToken
!=
null
&&
!
minToken
.
isEmpty
())
{
if
(
where
.
length
()
>
0
){
where
.
append
(
" AND "
);
}
where
.
append
(
"token("
).
append
(
s
).
append
(
")"
).
append
(
" > "
).
append
(
minToken
);
}
if
(
maxToken
!=
null
&&
!
maxToken
.
isEmpty
())
{
if
(
where
.
length
()
>
0
){
where
.
append
(
" AND "
);
}
where
.
append
(
"token("
).
append
(
s
).
append
(
")"
).
append
(
" <= "
).
append
(
maxToken
);
}
}
boolean
allowFiltering
=
taskConfig
.
getBool
(
Key
.
ALLOW_FILTERING
,
false
);
StringBuilder
select
=
new
StringBuilder
();
select
.
append
(
"SELECT "
).
append
(
columns
.
toString
()).
append
(
" FROM "
).
append
(
table
);
if
(
where
.
length
()
>
0
){
select
.
append
(
" where "
).
append
(
where
.
toString
());
}
if
(
allowFiltering
)
{
select
.
append
(
" ALLOW FILTERING"
);
}
select
.
append
(
";"
);
return
select
.
toString
();
}
public
static
void
checkConfig
(
Configuration
jobConfig
,
Cluster
cluster
)
{
ensureStringExists
(
jobConfig
,
Key
.
HOST
);
ensureStringExists
(
jobConfig
,
Key
.
KEYSPACE
);
ensureStringExists
(
jobConfig
,
Key
.
TABLE
);
ensureExists
(
jobConfig
,
Key
.
COLUMN
);
///keyspace,table是否存在
String
keyspace
=
jobConfig
.
getString
(
Key
.
KEYSPACE
);
if
(
cluster
.
getMetadata
().
getKeyspace
(
keyspace
)
==
null
)
{
throw
DataXException
.
asDataXException
(
CassandraReaderErrorCode
.
CONF_ERROR
,
String
.
format
(
"配置信息有错误.keyspace'%s'不存在 ."
,
keyspace
));
}
String
table
=
jobConfig
.
getString
(
Key
.
TABLE
);
TableMetadata
tableMetadata
=
cluster
.
getMetadata
().
getKeyspace
(
keyspace
).
getTable
(
table
);
if
(
tableMetadata
==
null
)
{
throw
DataXException
.
asDataXException
(
CassandraReaderErrorCode
.
CONF_ERROR
,
String
.
format
(
"配置信息有错误.表'%s'不存在 ."
,
table
));
}
List
<
String
>
columns
=
jobConfig
.
getList
(
Key
.
COLUMN
,
String
.
class
);
for
(
String
name
:
columns
)
{
if
(
name
==
null
||
name
.
isEmpty
()
)
{
throw
DataXException
.
asDataXException
(
CassandraReaderErrorCode
.
CONF_ERROR
,
String
.
format
(
"配置信息有错误.列信息中需要包含'%s'字段 ."
,
Key
.
COLUMN_NAME
));
}
if
(
name
.
startsWith
(
Key
.
WRITE_TIME
)
)
{
String
colName
=
name
.
substring
(
Key
.
WRITE_TIME
.
length
(),
name
.
length
()
-
1
);
ColumnMetadata
col
=
tableMetadata
.
getColumn
(
colName
);
if
(
col
==
null
)
{
throw
DataXException
.
asDataXException
(
CassandraReaderErrorCode
.
CONF_ERROR
,
String
.
format
(
"配置信息有错误.列'%s'不存在 ."
,
colName
));
}
}
else
{
ColumnMetadata
col
=
tableMetadata
.
getColumn
(
name
);
if
(
col
==
null
)
{
throw
DataXException
.
asDataXException
(
CassandraReaderErrorCode
.
CONF_ERROR
,
String
.
format
(
"配置信息有错误.列'%s'不存在 ."
,
name
));
}
}
}
}
static
void
ensureExists
(
Configuration
jobConfig
,
String
keyword
)
{
if
(
jobConfig
.
get
(
keyword
)
==
null
)
{
throw
DataXException
.
asDataXException
(
CassandraReaderErrorCode
.
CONF_ERROR
,
String
.
format
(
"配置信息有错误.参数'%s'为必填项 ."
,
keyword
));
}
}
static
void
ensureStringExists
(
Configuration
jobConfig
,
String
keyword
)
{
ensureExists
(
jobConfig
,
keyword
);
if
(
jobConfig
.
getString
(
keyword
).
isEmpty
()
)
{
throw
DataXException
.
asDataXException
(
CassandraReaderErrorCode
.
CONF_ERROR
,
String
.
format
(
"配置信息有错误.参数'%s'不能为空 ."
,
keyword
));
}
}
}
cassandrareader/src/main/java/com/alibaba/datax/plugin/reader/cassandrareader/Key.java
0 → 100644
View file @
10371104
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
cassandrareader
;
/**
* Created by mazhenlin on 2019/8/19.
*/
public
class
Key
{
public
final
static
String
USERNAME
=
"username"
;
public
final
static
String
PASSWORD
=
"password"
;
public
final
static
String
HOST
=
"host"
;
public
final
static
String
PORT
=
"port"
;
public
final
static
String
USESSL
=
"useSSL"
;
public
final
static
String
KEYSPACE
=
"keyspace"
;
public
final
static
String
TABLE
=
"table"
;
public
final
static
String
COLUMN
=
"column"
;
public
final
static
String
WHERE
=
"where"
;
public
final
static
String
ALLOW_FILTERING
=
"allowFiltering"
;
public
final
static
String
CONSITANCY_LEVEL
=
"consistancyLevel"
;
public
final
static
String
MIN_TOKEN
=
"minToken"
;
public
final
static
String
MAX_TOKEN
=
"maxToken"
;
/**
* 每个列的名字
*/
public
static
final
String
COLUMN_NAME
=
"name"
;
/**
* 列分隔符
*/
public
static
final
String
COLUMN_SPLITTER
=
"format"
;
public
static
final
String
WRITE_TIME
=
"writetime("
;
public
static
final
String
ELEMENT_SPLITTER
=
"splitter"
;
public
static
final
String
ENTRY_SPLITTER
=
"entrySplitter"
;
public
static
final
String
KV_SPLITTER
=
"kvSplitter"
;
public
static
final
String
ELEMENT_CONFIG
=
"element"
;
public
static
final
String
TUPLE_CONNECTOR
=
"_"
;
public
static
final
String
KEY_CONFIG
=
"key"
;
public
static
final
String
VALUE_CONFIG
=
"value"
;
}
cassandrareader/src/main/java/com/alibaba/datax/plugin/reader/cassandrareader/LocalStrings.properties
0 → 100644
View file @
10371104
errorcode.config_invalid_exception
=
\u
914D
\u
7F6E
\u9519\u
8BEF
\ No newline at end of file
cassandrareader/src/main/java/com/alibaba/datax/plugin/reader/cassandrareader/LocalStrings_en_US.properties
0 → 100644
View file @
10371104
cassandrareader/src/main/java/com/alibaba/datax/plugin/reader/cassandrareader/LocalStrings_ja_JP.properties
0 → 100644
View file @
10371104
errorcode.config_invalid_exception
=
\u
914D
\u
7F6E
\u9519\u
8BEF
\ No newline at end of file
cassandrareader/src/main/java/com/alibaba/datax/plugin/reader/cassandrareader/LocalStrings_zh_CN.properties
0 → 100644
View file @
10371104
errorcode.config_invalid_exception
=
\u
914D
\u
7F6E
\u9519\u
8BEF
\ No newline at end of file
cassandrareader/src/main/java/com/alibaba/datax/plugin/reader/cassandrareader/LocalStrings_zh_HK.properties
0 → 100644
View file @
10371104
errorcode.config_invalid_exception
=
\u
914D
\u
7F6E
\u9519\u
8BEF
\ No newline at end of file
cassandrareader/src/main/java/com/alibaba/datax/plugin/reader/cassandrareader/LocalStrings_zh_TW.properties
0 → 100644
View file @
10371104
errorcode.config_invalid_exception
=
\u
914D
\u
7F6E
\u9519\u
8BEF
\ No newline at end of file
cassandrareader/src/main/resources/plugin.json
0 → 100644
View file @
10371104
{
"name"
:
"cassandrareader"
,
"class"
:
"com.alibaba.datax.plugin.reader.cassandrareader.CassandraReader"
,
"description"
:
"useScene: prod. mechanism: execute select cql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter."
,
"developer"
:
"alibaba"
}
\ No newline at end of file
cassandrareader/src/main/resources/plugin_job_template.json
0 → 100644
View file @
10371104
{
"name"
:
"cassandrareader"
,
"parameter"
:
{
"username"
:
""
,
"password"
:
""
,
"host"
:
""
,
"port"
:
""
,
"useSSL"
:
false
,
"keyspace"
:
""
,
"table"
:
""
,
"column"
:
[
"c1"
,
"c2"
,
"c3"
]
}
}
\ No newline at end of file
cassandrawriter/doc/cassandrawriter.md
0 → 100644
View file @
10371104
# CassandraWriter 插件文档
__
_
## 1 快速介绍
CassandraWriter插件实现了向Cassandra写入数据。在底层实现上,CassandraWriter通过datastax的java driver连接Cassandra实例,并执行相应的cql语句将数据写入cassandra中。
## 2 实现原理
简而言之,CassandraWriter通过java driver连接到Cassandra实例,并根据用户配置的信息生成INSERT CQL语句,然后发送到Cassandra。
对于用户配置Table、Column的信息,CassandraReader将其拼接为CQL语句发送到Cassandra。
## 3 功能说明
### 3.1 配置样例
*
配置一个从内存产生到Cassandra导入的作业:
```
{
"job": {
"setting": {
"speed": {
"channel": 5
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{"value":"name","type": "string"},
{"value":"false","type":"bool"},
{"value":"1988-08-08 08:08:08","type":"date"},
{"value":"addr","type":"bytes"},
{"value":1.234,"type":"double"},
{"value":12345678,"type":"long"},
{"value":2.345,"type":"double"},
{"value":3456789,"type":"long"},
{"value":"4a0ef8c0-4d97-11d0-db82-ebecdb03ffa5","type":"string"},
{"value":"value","type":"bytes"},
{"value":"-838383838,37377373,-383883838,27272772,393993939,-38383883,83883838,-1350403181,817650816,1630642337,251398784,-622020148","type":"string"},
],
"sliceRecordCount": 10000000
}
},
"writer": {
"name": "cassandrawriter",
"parameter": {
"host": "localhost",
"port": 9042,
"useSSL": false,
"keyspace": "stresscql",
"table": "dst",
"batchSize":10,
"column": [
"name",
"choice",
"date",
"address",
"dbl",
"lval",
"fval",
"ival",
"uid",
"value",
"listval"
]
}
}
}
]
}
}
```
### 3.2 参数说明
*
**host**
* 描述:Cassandra连接点的域名或ip,多个node之间用逗号分隔。 <br />
* 必选:是 <br />
* 默认值:无 <br />
*
**port**
* 描述:Cassandra端口。 <br />
* 必选:是 <br />
* 默认值:9042 <br />
*
**username**
* 描述:数据源的用户名 <br />
* 必选:否 <br />
* 默认值:无 <br />
*
**password**
* 描述:数据源指定用户名的密码 <br />
* 必选:否 <br />
* 默认值:无 <br />
*
**useSSL**
* 描述:是否使用SSL连接。<br />
* 必选:否 <br />
* 默认值:false <br />
*
**connectionsPerHost**
* 描述:客户端连接池配置:与服务器每个节点建多少个连接。<br />
* 必选:否 <br />
* 默认值:8 <br />
*
**maxPendingPerConnection**
* 描述:客户端连接池配置:每个连接最大请求数。<br />
* 必选:否 <br />
* 默认值:128 <br />
*
**keyspace**
* 描述:需要同步的表所在的keyspace。<br />
* 必选:是 <br />
* 默认值:无 <br />
*
**table**
* 描述:所选取的需要同步的表。<br />
* 必选:是 <br />
* 默认值:无 <br />
*
**column**
* 描述:所配置的表中需要同步的列集合。<br />
内容可以是列的名称或"writetime()"。如果将列名配置为writetime(),会将这一列的内容作为时间戳。
* 必选:是 <br />
* 默认值:无 <br />
*
**consistancyLevel**
* 描述:数据一致性级别。可选ONE|QUORUM|LOCAL_QUORUM|EACH_QUORUM|ALL|ANY|TWO|THREE|LOCAL_ONE<br />
* 必选:否 <br />
* 默认值:LOCAL_QUORUM <br />
*
**batchSize**
* 描述:一次批量提交(UNLOGGED BATCH)的记录数大小(条数)。注意batch的大小有如下限制:<br />
(1)不能超过65535。<br />
(2) batch中的内容大小受到服务器端batch_size_fail_threshold_in_kb的限制。<br />
(3) 如果batch中的内容超过了batch_size_warn_threshold_in_kb的限制,会打出warn日志,但并不影响写入,忽略即可。<br />
如果批量提交失败,会把这个批量的所有内容重新逐条写入一遍。
* 必选:否 <br />
* 默认值:1 <br />
### 3.3 类型转换
目前CassandraReader支持除counter和Custom类型之外的所有类型。
下面列出CassandraReader针对Cassandra类型转换列表:
| DataX 内部类型| Cassandra 数据类型 |
| -------- | ----- |
| Long |int, tinyint, smallint,varint,bigint,time|
| Double |float, double, decimal|
| String |ascii,varchar, text,uuid,timeuuid,duration,list,map,set,tuple,udt,inet |
| Date |date, timestamp |
| Boolean |bool |
| Bytes |blob |
请注意:
*
目前不支持counter类型和custom类型。
## 4 性能报告
略
## 5 约束限制
### 5.1 主备同步数据恢复问题
略
## 6 FAQ
cassandrawriter/pom.xml
0 → 100644
View file @
10371104
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<parent>
<artifactId>
datax-all
</artifactId>
<groupId>
com.alibaba.datax
</groupId>
<version>
0.0.1-SNAPSHOT
</version>
</parent>
<modelVersion>
4.0.0
</modelVersion>
<artifactId>
cassandrawriter
</artifactId>
<name>
cassandrawriter
</name>
<version>
0.0.1-SNAPSHOT
</version>
<packaging>
jar
</packaging>
<properties>
</properties>
<dependencies>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-common
</artifactId>
<version>
${datax-project-version}
</version>
<exclusions>
<exclusion>
<artifactId>
slf4j-log4j12
</artifactId>
<groupId>
org.slf4j
</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
com.datastax.cassandra
</groupId>
<artifactId>
cassandra-driver-core
</artifactId>
<version>
3.7.2
</version>
</dependency>
<dependency>
<groupId>
commons-codec
</groupId>
<artifactId>
commons-codec
</artifactId>
<version>
1.9
</version>
</dependency>
<!-- for test -->
<dependency>
<groupId>
junit
</groupId>
<artifactId>
junit
</artifactId>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-core
</artifactId>
<version>
${datax-project-version}
</version>
<exclusions>
<exclusion>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-service-face
</artifactId>
</exclusion>
<exclusion>
<groupId>
org.apache.hadoop
</groupId>
<artifactId>
hadoop-common
</artifactId>
</exclusion>
<exclusion>
<groupId>
org.apache.hive
</groupId>
<artifactId>
hive-exec
</artifactId>
</exclusion>
<exclusion>
<groupId>
org.apache.hive
</groupId>
<artifactId>
hive-serde
</artifactId>
</exclusion>
<exclusion>
<groupId>
javolution
</groupId>
<artifactId>
javolution
</artifactId>
</exclusion>
</exclusions>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.mockito
</groupId>
<artifactId>
mockito-all
</artifactId>
<version>
1.9.5
</version>
<scope>
test
</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>
src/main/java
</directory>
<includes>
<include>
**/*.properties
</include>
</includes>
</resource>
</resources>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>
maven-compiler-plugin
</artifactId>
<configuration>
<source>
1.6
</source>
<target>
1.6
</target>
<encoding>
${project-sourceEncoding}
</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>
maven-assembly-plugin
</artifactId>
<configuration>
<descriptors>
<descriptor>
src/main/assembly/package.xml
</descriptor>
</descriptors>
<finalName>
datax
</finalName>
</configuration>
<executions>
<execution>
<id>
dwzip
</id>
<phase>
package
</phase>
<goals>
<goal>
single
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
cassandrawriter/src/main/assembly/package.xml
0 → 100644
View file @
10371104
<assembly
xmlns=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"
>
<id></id>
<formats>
<format>
dir
</format>
</formats>
<includeBaseDirectory>
false
</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>
src/main/resources
</directory>
<includes>
<include>
plugin.json
</include>
<include>
plugin_job_template.json
</include>
</includes>
<outputDirectory>
plugin/writer/cassandrawriter
</outputDirectory>
</fileSet>
<fileSet>
<directory>
target/
</directory>
<includes>
<include>
cassandrawriter-0.0.1-SNAPSHOT.jar
</include>
</includes>
<outputDirectory>
plugin/writer/cassandrawriter
</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>
false
</useProjectArtifact>
<outputDirectory>
plugin/writer/cassandrawriter/libs
</outputDirectory>
<scope>
runtime
</scope>
</dependencySet>
</dependencySets>
</assembly>
cassandrawriter/src/main/java/com/alibaba/datax/plugin/writer/cassandrawriter/CassandraWriter.java
0 → 100644
View file @
10371104
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
cassandrawriter
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.concurrent.TimeUnit
;
import
com.alibaba.datax.common.element.Column
;
import
com.alibaba.datax.common.element.Record
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.plugin.RecordReceiver
;
import
com.alibaba.datax.common.spi.Writer
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.datastax.driver.core.BatchStatement
;
import
com.datastax.driver.core.BatchStatement.Type
;
import
com.datastax.driver.core.BoundStatement
;
import
com.datastax.driver.core.Cluster
;
import
com.datastax.driver.core.ColumnMetadata
;
import
com.datastax.driver.core.ConsistencyLevel
;
import
com.datastax.driver.core.DataType
;
import
com.datastax.driver.core.HostDistance
;
import
com.datastax.driver.core.PoolingOptions
;
import
com.datastax.driver.core.PreparedStatement
;
import
com.datastax.driver.core.ResultSetFuture
;
import
com.datastax.driver.core.Session
;
import
com.datastax.driver.core.TableMetadata
;
import
com.datastax.driver.core.querybuilder.Insert
;
import
com.datastax.driver.core.querybuilder.QueryBuilder
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
static
com
.
datastax
.
driver
.
core
.
querybuilder
.
QueryBuilder
.
timestamp
;
/**
* Created by mazhenlin on 2019/8/19.
*/
public
class
CassandraWriter
extends
Writer
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
CassandraWriter
.
class
);
public
static
class
Job
extends
Writer
.
Job
{
private
Configuration
originalConfig
=
null
;
@Override
public
List
<
Configuration
>
split
(
int
mandatoryNumber
)
{
List
<
Configuration
>
splitResultConfigs
=
new
ArrayList
<
Configuration
>();
for
(
int
j
=
0
;
j
<
mandatoryNumber
;
j
++)
{
splitResultConfigs
.
add
(
originalConfig
.
clone
());
}
return
splitResultConfigs
;
}
@Override
public
void
init
()
{
originalConfig
=
getPluginJobConf
();
}
@Override
public
void
destroy
()
{
}
}
public
static
class
Task
extends
Writer
.
Task
{
private
Configuration
taskConfig
;
private
Cluster
cluster
=
null
;
private
Session
session
=
null
;
private
PreparedStatement
statement
=
null
;
private
int
columnNumber
=
0
;
private
List
<
DataType
>
columnTypes
;
private
List
<
String
>
columnMeta
=
null
;
private
int
writeTimeCol
=
-
1
;
private
boolean
asyncWrite
=
false
;
private
long
batchSize
=
1
;
private
List
<
ResultSetFuture
>
unConfirmedWrite
;
private
List
<
BoundStatement
>
bufferedWrite
;
@Override
public
void
startWrite
(
RecordReceiver
lineReceiver
)
{
try
{
Record
record
;
while
((
record
=
lineReceiver
.
getFromReader
())
!=
null
)
{
if
(
record
.
getColumnNumber
()
!=
columnNumber
)
{
// 源头读取字段列数与目的表字段写入列数不相等,直接报错
throw
DataXException
.
asDataXException
(
CassandraWriterErrorCode
.
CONF_ERROR
,
String
.
format
(
"列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改."
,
record
.
getColumnNumber
(),
this
.
columnNumber
));
}
BoundStatement
boundStmt
=
statement
.
bind
();
for
(
int
i
=
0
;
i
<
columnNumber
;
i
++)
{
if
(
writeTimeCol
!=
-
1
&&
i
==
writeTimeCol
)
{
continue
;
}
Column
col
=
record
.
getColumn
(
i
);
int
pos
=
i
;
if
(
writeTimeCol
!=
-
1
&&
pos
>
writeTimeCol
)
{
pos
=
i
-
1
;
}
CassandraWriterHelper
.
setupColumn
(
boundStmt
,
pos
,
columnTypes
.
get
(
pos
),
col
);
}
if
(
writeTimeCol
!=
-
1
)
{
Column
col
=
record
.
getColumn
(
writeTimeCol
);
boundStmt
.
setLong
(
columnNumber
-
1
,
col
.
asLong
());
}
if
(
batchSize
<=
1
)
{
session
.
execute
(
boundStmt
);
}
else
{
if
(
asyncWrite
)
{
unConfirmedWrite
.
add
(
session
.
executeAsync
(
boundStmt
));
if
(
unConfirmedWrite
.
size
()
>=
batchSize
)
{
for
(
ResultSetFuture
write
:
unConfirmedWrite
)
{
write
.
getUninterruptibly
(
10000
,
TimeUnit
.
MILLISECONDS
);
}
unConfirmedWrite
.
clear
();
}
}
else
{
bufferedWrite
.
add
(
boundStmt
);
if
(
bufferedWrite
.
size
()
>=
batchSize
)
{
BatchStatement
batchStatement
=
new
BatchStatement
(
Type
.
UNLOGGED
);
batchStatement
.
addAll
(
bufferedWrite
);
try
{
session
.
execute
(
batchStatement
);
}
catch
(
Exception
e
)
{
LOG
.
error
(
"batch写入失败,尝试逐条写入."
,
e
);
for
(
BoundStatement
stmt:
bufferedWrite
)
{
session
.
execute
(
stmt
);
}
}
///LOG.info("batch finished. size = " + bufferedWrite.size());
bufferedWrite
.
clear
();
}
}
}
}
if
(
unConfirmedWrite
!=
null
&&
unConfirmedWrite
.
size
()
>
0
)
{
for
(
ResultSetFuture
write
:
unConfirmedWrite
)
{
write
.
getUninterruptibly
(
10000
,
TimeUnit
.
MILLISECONDS
);
}
unConfirmedWrite
.
clear
();
}
if
(
bufferedWrite
!=
null
&&
bufferedWrite
.
size
()
>
0
)
{
BatchStatement
batchStatement
=
new
BatchStatement
(
Type
.
UNLOGGED
);
batchStatement
.
addAll
(
bufferedWrite
);
session
.
execute
(
batchStatement
);
bufferedWrite
.
clear
();
}
}
catch
(
Exception
e
)
{
throw
DataXException
.
asDataXException
(
CassandraWriterErrorCode
.
WRITE_DATA_ERROR
,
e
);
}
}
@Override
public
void
init
()
{
this
.
taskConfig
=
super
.
getPluginJobConf
();
String
username
=
taskConfig
.
getString
(
Key
.
USERNAME
);
String
password
=
taskConfig
.
getString
(
Key
.
PASSWORD
);
String
hosts
=
taskConfig
.
getString
(
Key
.
HOST
);
Integer
port
=
taskConfig
.
getInt
(
Key
.
PORT
,
9042
);
boolean
useSSL
=
taskConfig
.
getBool
(
Key
.
USESSL
);
String
keyspace
=
taskConfig
.
getString
(
Key
.
KEYSPACE
);
String
table
=
taskConfig
.
getString
(
Key
.
TABLE
);
batchSize
=
taskConfig
.
getLong
(
Key
.
BATCH_SIZE
,
1
);
this
.
columnMeta
=
taskConfig
.
getList
(
Key
.
COLUMN
,
String
.
class
);
columnTypes
=
new
ArrayList
<
DataType
>(
columnMeta
.
size
());
columnNumber
=
columnMeta
.
size
();
asyncWrite
=
taskConfig
.
getBool
(
Key
.
ASYNC_WRITE
,
false
);
int
connectionsPerHost
=
taskConfig
.
getInt
(
Key
.
CONNECTIONS_PER_HOST
,
8
);
int
maxPendingPerConnection
=
taskConfig
.
getInt
(
Key
.
MAX_PENDING_CONNECTION
,
128
);
PoolingOptions
poolingOpts
=
new
PoolingOptions
()
.
setConnectionsPerHost
(
HostDistance
.
LOCAL
,
connectionsPerHost
,
connectionsPerHost
)
.
setMaxRequestsPerConnection
(
HostDistance
.
LOCAL
,
maxPendingPerConnection
)
.
setNewConnectionThreshold
(
HostDistance
.
LOCAL
,
100
);
Cluster
.
Builder
clusterBuilder
=
Cluster
.
builder
().
withPoolingOptions
(
poolingOpts
);
if
((
username
!=
null
)
&&
!
username
.
isEmpty
())
{
clusterBuilder
=
clusterBuilder
.
withCredentials
(
username
,
password
)
.
withPort
(
Integer
.
valueOf
(
port
)).
addContactPoints
(
hosts
.
split
(
","
));
if
(
useSSL
)
{
clusterBuilder
=
clusterBuilder
.
withSSL
();
}
}
else
{
clusterBuilder
=
clusterBuilder
.
withPort
(
Integer
.
valueOf
(
port
))
.
addContactPoints
(
hosts
.
split
(
","
));
}
cluster
=
clusterBuilder
.
build
();
session
=
cluster
.
connect
(
keyspace
);
TableMetadata
meta
=
cluster
.
getMetadata
().
getKeyspace
(
keyspace
).
getTable
(
table
);
Insert
insertStmt
=
QueryBuilder
.
insertInto
(
table
);
for
(
String
colunmnName
:
columnMeta
)
{
if
(
colunmnName
.
toLowerCase
().
equals
(
Key
.
WRITE_TIME
)
)
{
if
(
writeTimeCol
!=
-
1
)
{
throw
DataXException
.
asDataXException
(
CassandraWriterErrorCode
.
CONF_ERROR
,
"列配置信息有错误. 只能有一个时间戳列(writetime())"
);
}
writeTimeCol
=
columnTypes
.
size
();
continue
;
}
insertStmt
.
value
(
colunmnName
,
QueryBuilder
.
bindMarker
());
ColumnMetadata
col
=
meta
.
getColumn
(
colunmnName
);
if
(
col
==
null
)
{
throw
DataXException
.
asDataXException
(
CassandraWriterErrorCode
.
CONF_ERROR
,
String
.
format
(
"列配置信息有错误. 表中未找到列名 '%s' ."
,
colunmnName
));
}
columnTypes
.
add
(
col
.
getType
());
}
if
(
writeTimeCol
!=
-
1
)
{
insertStmt
.
using
(
timestamp
(
QueryBuilder
.
bindMarker
()));
}
String
cl
=
taskConfig
.
getString
(
Key
.
CONSITANCY_LEVEL
);
if
(
cl
!=
null
&&
!
cl
.
isEmpty
()
)
{
insertStmt
.
setConsistencyLevel
(
ConsistencyLevel
.
valueOf
(
cl
));
}
else
{
insertStmt
.
setConsistencyLevel
(
ConsistencyLevel
.
LOCAL_QUORUM
);
}
statement
=
session
.
prepare
(
insertStmt
);
if
(
batchSize
>
1
)
{
if
(
asyncWrite
)
{
unConfirmedWrite
=
new
ArrayList
<
ResultSetFuture
>();
}
else
{
bufferedWrite
=
new
ArrayList
<
BoundStatement
>();
}
}
}
@Override
public
void
destroy
()
{
}
}
}
cassandrawriter/src/main/java/com/alibaba/datax/plugin/writer/cassandrawriter/CassandraWriterErrorCode.java
0 → 100644
View file @
10371104
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
cassandrawriter
;
import
com.alibaba.datax.common.spi.ErrorCode
;
/**
* Created by mazhenlin on 2019/8/19.
*/
public
enum
CassandraWriterErrorCode
implements
ErrorCode
{
CONF_ERROR
(
"CassandraWriter-00"
,
"配置错误."
),
WRITE_DATA_ERROR
(
"CassandraWriter-01"
,
"写入数据时失败."
),
;
private
final
String
code
;
private
final
String
description
;
private
CassandraWriterErrorCode
(
String
code
,
String
description
)
{
this
.
code
=
code
;
this
.
description
=
description
;
}
@Override
public
String
getCode
()
{
return
this
.
code
;
}
@Override
public
String
getDescription
()
{
return
this
.
description
;
}
@Override
public
String
toString
()
{
return
String
.
format
(
"Code:[%s], Description:[%s]."
,
this
.
code
,
this
.
description
);
}
}
cassandrawriter/src/main/java/com/alibaba/datax/plugin/writer/cassandrawriter/CassandraWriterHelper.java
0 → 100644
View file @
10371104
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
cassandrawriter
;
import
java.math.BigDecimal
;
import
java.math.BigInteger
;
import
java.net.InetAddress
;
import
java.nio.ByteBuffer
;
import
java.text.SimpleDateFormat
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
java.util.Date
;
import
java.util.HashMap
;
import
java.util.HashSet
;
import
java.util.Iterator
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Set
;
import
java.util.UUID
;
import
com.alibaba.datax.common.element.Column
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONArray
;
import
com.alibaba.fastjson.JSONException
;
import
com.alibaba.fastjson.JSONObject
;
import
com.datastax.driver.core.BoundStatement
;
import
com.datastax.driver.core.CodecRegistry
;
import
com.datastax.driver.core.DataType
;
import
com.datastax.driver.core.DataType.Name
;
import
com.datastax.driver.core.Duration
;
import
com.datastax.driver.core.LocalDate
;
import
com.datastax.driver.core.TupleType
;
import
com.datastax.driver.core.TupleValue
;
import
com.datastax.driver.core.UDTValue
;
import
com.datastax.driver.core.UserType
;
import
com.datastax.driver.core.UserType.Field
;
import
com.google.common.base.Splitter
;
import
org.apache.commons.codec.binary.Base64
;
/**
* Created by mazhenlin on 2019/8/21.
*/
public
class
CassandraWriterHelper
{
static
CodecRegistry
registry
=
new
CodecRegistry
();
public
static
Object
parseFromString
(
String
s
,
DataType
sqlType
)
throws
Exception
{
if
(
s
==
null
||
s
.
isEmpty
())
{
if
(
sqlType
.
getName
()
==
Name
.
ASCII
||
sqlType
.
getName
()
==
Name
.
TEXT
||
sqlType
.
getName
()
==
Name
.
VARCHAR
)
{
return
s
;
}
else
{
return
null
;
}
}
switch
(
sqlType
.
getName
())
{
case
ASCII:
case
TEXT:
case
VARCHAR:
return
s
;
case
BLOB:
if
(
s
.
length
()
==
0
)
{
return
new
byte
[
0
];
}
byte
[]
byteArray
=
new
byte
[
s
.
length
()
/
2
];
for
(
int
i
=
0
;
i
<
byteArray
.
length
;
i
++)
{
String
subStr
=
s
.
substring
(
2
*
i
,
2
*
i
+
2
);
byteArray
[
i
]
=
((
byte
)
Integer
.
parseInt
(
subStr
,
16
));
}
return
ByteBuffer
.
wrap
(
byteArray
);
case
BOOLEAN:
return
Boolean
.
valueOf
(
s
);
case
TINYINT:
return
Byte
.
valueOf
(
s
);
case
SMALLINT:
return
Short
.
valueOf
(
s
);
case
INT:
return
Integer
.
valueOf
(
s
);
case
BIGINT:
return
Long
.
valueOf
(
s
);
case
VARINT:
return
new
BigInteger
(
s
,
10
);
case
FLOAT:
return
Float
.
valueOf
(
s
);
case
DOUBLE:
return
Double
.
valueOf
(
s
);
case
DECIMAL:
return
new
BigDecimal
(
s
);
case
DATE:
{
String
[]
a
=
s
.
split
(
"-"
);
if
(
a
.
length
!=
3
)
{
throw
new
Exception
(
String
.
format
(
"DATE类型数据 '%s' 格式不正确,必须为yyyy-mm-dd格式"
,
s
));
}
return
LocalDate
.
fromYearMonthDay
(
Integer
.
valueOf
(
a
[
0
]),
Integer
.
valueOf
(
a
[
1
]),
Integer
.
valueOf
(
a
[
2
]));
}
case
TIME:
return
Long
.
valueOf
(
s
);
case
TIMESTAMP:
return
new
Date
(
Long
.
valueOf
(
s
));
case
UUID:
case
TIMEUUID:
return
UUID
.
fromString
(
s
);
case
INET:
String
[]
b
=
s
.
split
(
"/"
);
if
(
b
.
length
<
2
)
{
return
InetAddress
.
getByName
(
s
);
}
byte
[]
addr
=
InetAddress
.
getByName
(
b
[
1
]).
getAddress
();
return
InetAddress
.
getByAddress
(
b
[
0
],
addr
);
case
DURATION:
return
Duration
.
from
(
s
);
case
LIST:
case
MAP:
case
SET:
case
TUPLE:
case
UDT:
Object
jsonObject
=
JSON
.
parse
(
s
);
return
parseFromJson
(
jsonObject
,
sqlType
);
default
:
throw
DataXException
.
asDataXException
(
CassandraWriterErrorCode
.
CONF_ERROR
,
"不支持您配置的列类型:"
+
sqlType
+
", 请检查您的配置 或者 联系 管理员."
);
}
// end switch
}
public
static
Object
parseFromJson
(
Object
jsonObject
,
DataType
type
)
throws
Exception
{
if
(
jsonObject
==
null
)
return
null
;
switch
(
type
.
getName
())
{
case
ASCII:
case
TEXT:
case
VARCHAR:
case
BOOLEAN:
case
TIME:
return
jsonObject
;
case
TINYINT:
return
((
Number
)
jsonObject
).
byteValue
();
case
SMALLINT:
return
((
Number
)
jsonObject
).
shortValue
();
case
INT:
return
((
Number
)
jsonObject
).
intValue
();
case
BIGINT:
return
((
Number
)
jsonObject
).
longValue
();
case
VARINT:
return
new
BigInteger
(
jsonObject
.
toString
());
case
FLOAT:
return
((
Number
)
jsonObject
).
floatValue
();
case
DOUBLE:
return
((
Number
)
jsonObject
).
doubleValue
();
case
DECIMAL:
return
new
BigDecimal
(
jsonObject
.
toString
());
case
BLOB:
return
ByteBuffer
.
wrap
(
Base64
.
decodeBase64
((
String
)
jsonObject
));
case
DATE:
return
LocalDate
.
fromMillisSinceEpoch
(((
Number
)
jsonObject
).
longValue
());
case
TIMESTAMP:
return
new
Date
(((
Number
)
jsonObject
).
longValue
());
case
DURATION:
return
Duration
.
from
(
jsonObject
.
toString
());
case
UUID:
case
TIMEUUID:
return
UUID
.
fromString
(
jsonObject
.
toString
());
case
INET:
return
InetAddress
.
getByName
((
String
)
jsonObject
);
case
LIST:
List
l
=
new
ArrayList
();
for
(
Object
o
:
(
JSONArray
)
jsonObject
)
{
l
.
add
(
parseFromJson
(
o
,
type
.
getTypeArguments
().
get
(
0
)));
}
return
l
;
case
MAP:
{
Map
m
=
new
HashMap
();
for
(
JSONObject
.
Entry
e
:
((
JSONObject
)
jsonObject
).
entrySet
())
{
Object
k
=
parseFromString
((
String
)
e
.
getKey
(),
type
.
getTypeArguments
().
get
(
0
));
Object
v
=
parseFromJson
(
e
.
getValue
(),
type
.
getTypeArguments
().
get
(
1
));
m
.
put
(
k
,
v
);
}
return
m
;
}
case
SET:
Set
s
=
new
HashSet
();
for
(
Object
o
:
(
JSONArray
)
jsonObject
)
{
s
.
add
(
parseFromJson
(
o
,
type
.
getTypeArguments
().
get
(
0
)));
}
return
s
;
case
TUPLE:
{
TupleValue
t
=
((
TupleType
)
type
).
newValue
();
int
j
=
0
;
for
(
Object
e
:
(
JSONArray
)
jsonObject
)
{
DataType
eleType
=
((
TupleType
)
type
).
getComponentTypes
().
get
(
j
);
t
.
set
(
j
,
parseFromJson
(
e
,
eleType
),
registry
.
codecFor
(
eleType
).
getJavaType
());
j
++;
}
return
t
;
}
case
UDT:
{
UDTValue
t
=
((
UserType
)
type
).
newValue
();
UserType
userType
=
t
.
getType
();
for
(
JSONObject
.
Entry
e
:
((
JSONObject
)
jsonObject
).
entrySet
())
{
DataType
eleType
=
userType
.
getFieldType
((
String
)
e
.
getKey
());
t
.
set
((
String
)
e
.
getKey
(),
parseFromJson
(
e
.
getValue
(),
eleType
),
registry
.
codecFor
(
eleType
).
getJavaType
());
}
return
t
;
}
}
return
null
;
}
public
static
void
setupColumn
(
BoundStatement
ps
,
int
pos
,
DataType
sqlType
,
Column
col
)
throws
Exception
{
if
(
col
.
getRawData
()
!=
null
)
{
switch
(
sqlType
.
getName
())
{
case
ASCII:
case
TEXT:
case
VARCHAR:
ps
.
setString
(
pos
,
col
.
asString
());
break
;
case
BLOB:
ps
.
setBytes
(
pos
,
ByteBuffer
.
wrap
(
col
.
asBytes
()));
break
;
case
BOOLEAN:
ps
.
setBool
(
pos
,
col
.
asBoolean
());
break
;
case
TINYINT:
ps
.
setByte
(
pos
,
col
.
asLong
().
byteValue
());
break
;
case
SMALLINT:
ps
.
setShort
(
pos
,
col
.
asLong
().
shortValue
());
break
;
case
INT:
ps
.
setInt
(
pos
,
col
.
asLong
().
intValue
());
break
;
case
BIGINT:
ps
.
setLong
(
pos
,
col
.
asLong
());
break
;
case
VARINT:
ps
.
setVarint
(
pos
,
col
.
asBigInteger
());
break
;
case
FLOAT:
ps
.
setFloat
(
pos
,
col
.
asDouble
().
floatValue
());
break
;
case
DOUBLE:
ps
.
setDouble
(
pos
,
col
.
asDouble
());
break
;
case
DECIMAL:
ps
.
setDecimal
(
pos
,
col
.
asBigDecimal
());
break
;
case
DATE:
ps
.
setDate
(
pos
,
LocalDate
.
fromMillisSinceEpoch
(
col
.
asDate
().
getTime
()));
break
;
case
TIME:
ps
.
setTime
(
pos
,
col
.
asLong
());
break
;
case
TIMESTAMP:
ps
.
setTimestamp
(
pos
,
col
.
asDate
());
break
;
case
UUID:
case
TIMEUUID:
ps
.
setUUID
(
pos
,
UUID
.
fromString
(
col
.
asString
()));
break
;
case
INET:
ps
.
setInet
(
pos
,
InetAddress
.
getByName
(
col
.
asString
()));
break
;
case
DURATION:
ps
.
set
(
pos
,
Duration
.
from
(
col
.
asString
()),
Duration
.
class
);
break
;
case
LIST:
ps
.
setList
(
pos
,
(
List
<?>)
parseFromString
(
col
.
asString
(),
sqlType
));
break
;
case
MAP:
ps
.
setMap
(
pos
,
(
Map
)
parseFromString
(
col
.
asString
(),
sqlType
));
break
;
case
SET:
ps
.
setSet
(
pos
,
(
Set
)
parseFromString
(
col
.
asString
(),
sqlType
));
break
;
case
TUPLE:
ps
.
setTupleValue
(
pos
,
(
TupleValue
)
parseFromString
(
col
.
asString
(),
sqlType
));
break
;
case
UDT:
ps
.
setUDTValue
(
pos
,
(
UDTValue
)
parseFromString
(
col
.
asString
(),
sqlType
));
break
;
default
:
throw
DataXException
.
asDataXException
(
CassandraWriterErrorCode
.
CONF_ERROR
,
"不支持您配置的列类型:"
+
sqlType
+
", 请检查您的配置 或者 联系 管理员."
);
}
// end switch
}
else
{
ps
.
setToNull
(
pos
);
}
}
}
cassandrawriter/src/main/java/com/alibaba/datax/plugin/writer/cassandrawriter/Key.java
0 → 100644
View file @
10371104
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
cassandrawriter
;
/**
* Created by mazhenlin on 2019/8/19.
*/
public
class
Key
{
public
final
static
String
USERNAME
=
"username"
;
public
final
static
String
PASSWORD
=
"password"
;
public
final
static
String
HOST
=
"host"
;
public
final
static
String
PORT
=
"port"
;
public
final
static
String
USESSL
=
"useSSL"
;
public
final
static
String
KEYSPACE
=
"keyspace"
;
public
final
static
String
TABLE
=
"table"
;
public
final
static
String
COLUMN
=
"column"
;
public
final
static
String
WRITE_TIME
=
"writetime()"
;
public
final
static
String
ASYNC_WRITE
=
"asyncWrite"
;
public
final
static
String
CONSITANCY_LEVEL
=
"consistancyLevel"
;
public
final
static
String
CONNECTIONS_PER_HOST
=
"connectionsPerHost"
;
public
final
static
String
MAX_PENDING_CONNECTION
=
"maxPendingPerConnection"
;
/**
* 异步写入的批次大小,默认1(不异步写入)
*/
public
final
static
String
BATCH_SIZE
=
"batchSize"
;
/**
* 每个列的名字
*/
public
static
final
String
COLUMN_NAME
=
"name"
;
/**
* 列分隔符
*/
public
static
final
String
COLUMN_SPLITTER
=
"format"
;
public
static
final
String
ELEMENT_SPLITTER
=
"splitter"
;
public
static
final
String
ENTRY_SPLITTER
=
"entrySplitter"
;
public
static
final
String
KV_SPLITTER
=
"kvSplitter"
;
public
static
final
String
ELEMENT_CONFIG
=
"element"
;
public
static
final
String
TUPLE_CONNECTOR
=
"_"
;
public
static
final
String
KEY_CONFIG
=
"key"
;
public
static
final
String
VALUE_CONFIG
=
"value"
;
}
cassandrawriter/src/main/java/com/alibaba/datax/plugin/writer/cassandrawriter/LocalStrings.properties
0 → 100644
View file @
10371104
errorcode.config_invalid_exception
=
\u
914D
\u
7F6E
\u9519\u
8BEF.
errorcode.write_failed_exception
=
\u5199\u5165\u6570\u
636E
\u
65F6
\u5931\u
8D25
\ No newline at end of file
cassandrawriter/src/main/java/com/alibaba/datax/plugin/writer/cassandrawriter/LocalStrings_en_US.properties
0 → 100644
View file @
10371104
errorcode.config_invalid_exception
=
Error in parameter configuration.
errorcode.write_failed_exception
=
\u5199\u5165\u6570\u
636E
\u
65F6
\u5931\u
8D25
\ No newline at end of file
cassandrawriter/src/main/java/com/alibaba/datax/plugin/writer/cassandrawriter/LocalStrings_ja_JP.properties
0 → 100644
View file @
10371104
errorcode.config_invalid_exception
=
\u
914D
\u
7F6E
\u9519\u
8BEF.
errorcode.write_failed_exception
=
\u5199\u5165\u6570\u
636E
\u
65F6
\u5931\u
8D25
\ No newline at end of file
cassandrawriter/src/main/java/com/alibaba/datax/plugin/writer/cassandrawriter/LocalStrings_zh_CN.properties
0 → 100644
View file @
10371104
errorcode.config_invalid_exception
=
\u
914D
\u
7F6E
\u9519\u
8BEF.
errorcode.write_failed_exception
=
\u5199\u5165\u6570\u
636E
\u
65F6
\u5931\u
8D25
\ No newline at end of file
cassandrawriter/src/main/java/com/alibaba/datax/plugin/writer/cassandrawriter/LocalStrings_zh_HK.properties
0 → 100644
View file @
10371104
errorcode.config_invalid_exception
=
\u
914D
\u
7F6E
\u9519\u
8BEF.
errorcode.write_failed_exception
=
\u5199\u5165\u6570\u
636E
\u
65F6
\u5931\u
8D25
\ No newline at end of file
cassandrawriter/src/main/java/com/alibaba/datax/plugin/writer/cassandrawriter/LocalStrings_zh_TW.properties
0 → 100644
View file @
10371104
errorcode.config_invalid_exception
=
\u
914D
\u
7F6E
\u9519\u
8BEF.
errorcode.write_failed_exception
=
\u5199\u5165\u6570\u
636E
\u
65F6
\u5931\u
8D25
\ No newline at end of file
cassandrawriter/src/main/resources/plugin.json
0 → 100644
View file @
10371104
{
"name"
:
"cassandrawriter"
,
"class"
:
"com.alibaba.datax.plugin.writer.cassandrawriter.CassandraWriter"
,
"description"
:
"useScene: prod. mechanism: use datax driver, execute insert sql."
,
"developer"
:
"alibaba"
}
cassandrawriter/src/main/resources/plugin_job_template.json
0 → 100644
View file @
10371104
{
"name"
:
"cassandrawriter"
,
"parameter"
:
{
"username"
:
""
,
"password"
:
""
,
"host"
:
""
,
"port"
:
""
,
"useSSL"
:
false
,
"keyspace"
:
""
,
"table"
:
""
,
"column"
:
[
"c1"
,
"c2"
,
"c3"
]
}
}
\ No newline at end of file
package.xml
View file @
10371104
...
@@ -166,6 +166,13 @@
...
@@ -166,6 +166,13 @@
</includes>
</includes>
<outputDirectory>
datax
</outputDirectory>
<outputDirectory>
datax
</outputDirectory>
</fileSet>
</fileSet>
<fileSet>
<directory>
cassandrareader/target/datax/
</directory>
<includes>
<include>
**/*.*
</include>
</includes>
<outputDirectory>
datax
</outputDirectory>
</fileSet>
<!-- writer -->
<!-- writer -->
<fileSet>
<fileSet>
...
@@ -343,5 +350,12 @@
...
@@ -343,5 +350,12 @@
</includes>
</includes>
<outputDirectory>
datax
</outputDirectory>
<outputDirectory>
datax
</outputDirectory>
</fileSet>
</fileSet>
<fileSet>
<directory>
cassandrawriter/target/datax/
</directory>
<includes>
<include>
**/*.*
</include>
</includes>
<outputDirectory>
datax
</outputDirectory>
</fileSet>
</fileSets>
</fileSets>
</assembly>
</assembly>
pom.xml
View file @
10371104
...
@@ -63,6 +63,7 @@
...
@@ -63,6 +63,7 @@
<module>
hbase11xreader
</module>
<module>
hbase11xreader
</module>
<module>
hbase094xreader
</module>
<module>
hbase094xreader
</module>
<module>
opentsdbreader
</module>
<module>
opentsdbreader
</module>
<module>
cassandrareader
</module>
<!-- writer -->
<!-- writer -->
<module>
mysqlwriter
</module>
<module>
mysqlwriter
</module>
...
@@ -89,6 +90,7 @@
...
@@ -89,6 +90,7 @@
<module>
tsdbwriter
</module>
<module>
tsdbwriter
</module>
<module>
adbpgwriter
</module>
<module>
adbpgwriter
</module>
<module>
gdbwriter
</module>
<module>
gdbwriter
</module>
<module>
cassandrawriter
</module>
<!-- common support module -->
<!-- common support module -->
<module>
plugin-rdbms-util
</module>
<module>
plugin-rdbms-util
</module>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment