Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
D
DataX
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
risk-feature
DataX
Commits
2f8cc74b
Commit
2f8cc74b
authored
Oct 11, 2019
by
mazhenlin
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
cassandra plugins
parent
57c8dd86
Changes
33
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
33 changed files
with
2321 additions
and
0 deletions
+2321
-0
README.md
README.md
+1
-0
cassandrareader.md
cassandrareader/doc/cassandrareader.md
+217
-0
pom.xml
cassandrareader/pom.xml
+133
-0
package.xml
cassandrareader/src/main/assembly/package.xml
+35
-0
CassandraReader.java
.../datax/plugin/reader/cassandrareader/CassandraReader.java
+123
-0
CassandraReaderErrorCode.java
...ugin/reader/cassandrareader/CassandraReaderErrorCode.java
+32
-0
CassandraReaderHelper.java
.../plugin/reader/cassandrareader/CassandraReaderHelper.java
+607
-0
Key.java
.../com/alibaba/datax/plugin/reader/cassandrareader/Key.java
+39
-0
LocalStrings.properties
...tax/plugin/reader/cassandrareader/LocalStrings.properties
+1
-0
LocalStrings_en_US.properties
...ugin/reader/cassandrareader/LocalStrings_en_US.properties
+0
-0
LocalStrings_ja_JP.properties
...ugin/reader/cassandrareader/LocalStrings_ja_JP.properties
+1
-0
LocalStrings_zh_CN.properties
...ugin/reader/cassandrareader/LocalStrings_zh_CN.properties
+1
-0
LocalStrings_zh_HK.properties
...ugin/reader/cassandrareader/LocalStrings_zh_HK.properties
+1
-0
LocalStrings_zh_TW.properties
...ugin/reader/cassandrareader/LocalStrings_zh_TW.properties
+1
-0
plugin.json
cassandrareader/src/main/resources/plugin.json
+6
-0
plugin_job_template.json
cassandrareader/src/main/resources/plugin_job_template.json
+15
-0
cassandrawriter.md
cassandrawriter/doc/cassandrawriter.md
+227
-0
pom.xml
cassandrawriter/pom.xml
+125
-0
package.xml
cassandrawriter/src/main/assembly/package.xml
+35
-0
CassandraWriter.java
.../datax/plugin/writer/cassandrawriter/CassandraWriter.java
+242
-0
CassandraWriterErrorCode.java
...ugin/writer/cassandrawriter/CassandraWriterErrorCode.java
+35
-0
CassandraWriterHelper.java
.../plugin/writer/cassandrawriter/CassandraWriterHelper.java
+351
-0
Key.java
.../com/alibaba/datax/plugin/writer/cassandrawriter/Key.java
+43
-0
LocalStrings.properties
...tax/plugin/writer/cassandrawriter/LocalStrings.properties
+2
-0
LocalStrings_en_US.properties
...ugin/writer/cassandrawriter/LocalStrings_en_US.properties
+2
-0
LocalStrings_ja_JP.properties
...ugin/writer/cassandrawriter/LocalStrings_ja_JP.properties
+2
-0
LocalStrings_zh_CN.properties
...ugin/writer/cassandrawriter/LocalStrings_zh_CN.properties
+2
-0
LocalStrings_zh_HK.properties
...ugin/writer/cassandrawriter/LocalStrings_zh_HK.properties
+2
-0
LocalStrings_zh_TW.properties
...ugin/writer/cassandrawriter/LocalStrings_zh_TW.properties
+2
-0
plugin.json
cassandrawriter/src/main/resources/plugin.json
+7
-0
plugin_job_template.json
cassandrawriter/src/main/resources/plugin_job_template.json
+15
-0
package.xml
package.xml
+14
-0
pom.xml
pom.xml
+2
-0
No files found.
README.md
View file @
2f8cc74b
...
...
@@ -51,6 +51,7 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N
| | Phoenix5.x | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/hbase20xsqlreader/doc/hbase20xsqlreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/hbase20xsqlwriter/doc/hbase20xsqlwriter.md
)
|
| | MongoDB | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/mongoreader/doc/mongoreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/mongowriter/doc/mongowriter.md
)
|
| | Hive | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md
)
|
| | Cassandra | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/cassandrareader/doc/cassandrareader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/cassandrawriter/doc/cassandrawriter.md
)
|
| 无结构化数据存储 | TxtFile | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md
)
|
| | FTP | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md
)
|
| | HDFS | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md
)
|
...
...
cassandrareader/doc/cassandrareader.md
0 → 100644
View file @
2f8cc74b
# CassandraReader 插件文档
__
_
## 1 快速介绍
CassandraReader插件实现了从Cassandra读取数据。在底层实现上,CassandraReader通过datastax的java driver连接Cassandra实例,并执行相应的cql语句将数据从cassandra中SELECT出来。
## 2 实现原理
简而言之,CassandraReader通过java driver连接到Cassandra实例,并根据用户配置的信息生成查询SELECT CQL语句,然后发送到Cassandra,并将该CQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。
对于用户配置Table、Column的信息,CassandraReader将其拼接为CQL语句发送到Cassandra。
## 3 功能说明
### 3.1 配置样例
*
配置一个从Cassandra同步抽取数据到本地的作业:
```
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "cassandrareader",
"parameter": {
"host": "localhost",
"port": 9042,
"useSSL": false,
"keyspace": "test",
"table": "datax_src",
"column": [
"textCol",
"blobCol",
"writetime(blobCol)",
"boolCol",
"smallintCol",
"tinyintCol",
"intCol",
"bigintCol",
"varintCol",
"floatCol",
"doubleCol",
"decimalCol",
"dateCol",
"timeCol",
"timeStampCol",
"uuidCol",
"inetCol",
"durationCol",
"listCol",
"mapCol",
"setCol"
"tupleCol"
"udtCol",
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print":true
}
}
}
]
}
}
```
### 3.2 参数说明
*
**host**
* 描述:Cassandra连接点的域名或ip,多个node之间用逗号分隔。 <br />
* 必选:是 <br />
* 默认值:无 <br />
*
**port**
* 描述:Cassandra端口。 <br />
* 必选:是 <br />
* 默认值:9042 <br />
*
**username**
* 描述:数据源的用户名 <br />
* 必选:否 <br />
* 默认值:无 <br />
*
**password**
* 描述:数据源指定用户名的密码 <br />
* 必选:否 <br />
* 默认值:无 <br />
*
**useSSL**
* 描述:是否使用SSL连接。<br />
* 必选:否 <br />
* 默认值:false <br />
*
**keyspace**
* 描述:需要同步的表所在的keyspace。<br />
* 必选:是 <br />
* 默认值:无 <br />
*
**table**
* 描述:所选取的需要同步的表。<br />
* 必选:是 <br />
* 默认值:无 <br />
*
**column**
* 描述:所配置的表中需要同步的列集合。<br />
其中的元素可以指定列的名称或writetime(column_name),后一种形式会读取column_name列的时间戳而不是数据。
* 必选:是 <br />
* 默认值:无 <br />
*
**where**
* 描述:数据筛选条件的cql表达式,例如:<br />
```
"where":"textcol='a'"
```
* 必选:否 <br />
* 默认值:无 <br />
*
**allowFiltering**
* 描述:是否在服务端过滤数据。参考cassandra文档中ALLOW FILTERING关键字的相关描述。<br />
* 必选:否 <br />
* 默认值:无 <br />
*
**consistancyLevel**
* 描述:数据一致性级别。可选ONE|QUORUM|LOCAL_QUORUM|EACH_QUORUM|ALL|ANY|TWO|THREE|LOCAL_ONE<br />
* 必选:否 <br />
* 默认值:LOCAL_QUORUM <br />
### 3.3 类型转换
目前CassandraReader支持除counter和Custom类型之外的所有类型。
下面列出CassandraReader针对Cassandra类型转换列表:
| DataX 内部类型| Cassandra 数据类型 |
| -------- | ----- |
| Long |int, tinyint, smallint,varint,bigint,time|
| Double |float, double, decimal|
| String |ascii,varchar, text,uuid,timeuuid,duration,list,map,set,tuple,udt,inet |
| Date |date, timestamp |
| Boolean |bool |
| Bytes |blob |
请注意:
*
目前不支持counter类型和custom类型。
## 4 性能报告
略
## 5 约束限制
### 5.1 主备同步数据恢复问题
略
## 6 FAQ
cassandrareader/pom.xml
0 → 100644
View file @
2f8cc74b
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<parent>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-all
</artifactId>
<version>
0.0.1-SNAPSHOT
</version>
</parent>
<artifactId>
cassandrareader
</artifactId>
<name>
cassandrareader
</name>
<packaging>
jar
</packaging>
<dependencies>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-common
</artifactId>
<version>
${datax-project-version}
</version>
<exclusions>
<exclusion>
<artifactId>
slf4j-log4j12
</artifactId>
<groupId>
org.slf4j
</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
</dependency>
<dependency>
<groupId>
ch.qos.logback
</groupId>
<artifactId>
logback-classic
</artifactId>
</dependency>
<dependency>
<groupId>
com.datastax.cassandra
</groupId>
<artifactId>
cassandra-driver-core
</artifactId>
<version>
3.7.2
</version>
<classifier>
shaded
</classifier>
<exclusions>
<exclusion>
<groupId>
com.google.guava
</groupId>
<artifactId>
guava
</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
com.google.guava
</groupId>
<artifactId>
guava
</artifactId>
<version>
16.0.1
</version>
</dependency>
<dependency>
<groupId>
commons-codec
</groupId>
<artifactId>
commons-codec
</artifactId>
<version>
1.9
</version>
</dependency>
<!-- for test -->
<dependency>
<groupId>
junit
</groupId>
<artifactId>
junit
</artifactId>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-core
</artifactId>
<version>
${datax-project-version}
</version>
<exclusions>
<exclusion>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-service-face
</artifactId>
</exclusion>
<exclusion>
<groupId>
org.apache.hadoop
</groupId>
<artifactId>
hadoop-common
</artifactId>
</exclusion>
<exclusion>
<groupId>
org.apache.hive
</groupId>
<artifactId>
hive-exec
</artifactId>
</exclusion>
<exclusion>
<groupId>
org.apache.hive
</groupId>
<artifactId>
hive-serde
</artifactId>
</exclusion>
<exclusion>
<groupId>
javolution
</groupId>
<artifactId>
javolution
</artifactId>
</exclusion>
</exclusions>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.mockito
</groupId>
<artifactId>
mockito-all
</artifactId>
<version>
1.9.5
</version>
<scope>
test
</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>
maven-compiler-plugin
</artifactId>
<configuration>
<source>
1.6
</source>
<target>
1.6
</target>
<encoding>
${project-sourceEncoding}
</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>
maven-assembly-plugin
</artifactId>
<configuration>
<descriptors>
<descriptor>
src/main/assembly/package.xml
</descriptor>
</descriptors>
<finalName>
datax
</finalName>
</configuration>
<executions>
<execution>
<id>
dwzip
</id>
<phase>
package
</phase>
<goals>
<goal>
single
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
cassandrareader/src/main/assembly/package.xml
0 → 100644
View file @
2f8cc74b
<assembly
xmlns=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"
>
<id></id>
<formats>
<format>
dir
</format>
</formats>
<includeBaseDirectory>
false
</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>
src/main/resources
</directory>
<includes>
<include>
plugin.json
</include>
<include>
plugin_job_template.json
</include>
</includes>
<outputDirectory>
plugin/reader/cassandrareader
</outputDirectory>
</fileSet>
<fileSet>
<directory>
target/
</directory>
<includes>
<include>
cassandrareader-0.0.1-SNAPSHOT.jar
</include>
</includes>
<outputDirectory>
plugin/reader/cassandrareader
</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>
false
</useProjectArtifact>
<outputDirectory>
plugin/reader/cassandrareader/libs
</outputDirectory>
<scope>
runtime
</scope>
</dependencySet>
</dependencySets>
</assembly>
cassandrareader/src/main/java/com/alibaba/datax/plugin/reader/cassandrareader/CassandraReader.java
0 → 100644
View file @
2f8cc74b
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
cassandrareader
;
import
com.alibaba.datax.common.element.Record
;
import
com.alibaba.datax.common.plugin.RecordSender
;
import
com.alibaba.datax.common.spi.Reader
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.datastax.driver.core.Cluster
;
import
com.datastax.driver.core.ConsistencyLevel
;
import
com.datastax.driver.core.ResultSet
;
import
com.datastax.driver.core.Row
;
import
com.datastax.driver.core.Session
;
import
com.datastax.driver.core.SimpleStatement
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.List
;
public
class
CassandraReader
extends
Reader
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
CassandraReader
.
class
);
public
static
class
Job
extends
Reader
.
Job
{
private
Configuration
jobConfig
=
null
;
private
Cluster
cluster
=
null
;
@Override
public
void
init
()
{
this
.
jobConfig
=
super
.
getPluginJobConf
();
this
.
jobConfig
=
super
.
getPluginJobConf
();
String
username
=
jobConfig
.
getString
(
Key
.
USERNAME
);
String
password
=
jobConfig
.
getString
(
Key
.
PASSWORD
);
String
hosts
=
jobConfig
.
getString
(
Key
.
HOST
);
Integer
port
=
jobConfig
.
getInt
(
Key
.
PORT
,
9042
);
boolean
useSSL
=
jobConfig
.
getBool
(
Key
.
USESSL
);
if
((
username
!=
null
)
&&
!
username
.
isEmpty
())
{
Cluster
.
Builder
clusterBuilder
=
Cluster
.
builder
().
withCredentials
(
username
,
password
)
.
withPort
(
Integer
.
valueOf
(
port
)).
addContactPoints
(
hosts
.
split
(
","
));
if
(
useSSL
)
{
clusterBuilder
=
clusterBuilder
.
withSSL
();
}
cluster
=
clusterBuilder
.
build
();
}
else
{
cluster
=
Cluster
.
builder
().
withPort
(
Integer
.
valueOf
(
port
))
.
addContactPoints
(
hosts
.
split
(
","
)).
build
();
}
CassandraReaderHelper
.
checkConfig
(
jobConfig
,
cluster
);
}
@Override
public
void
destroy
()
{
}
@Override
public
List
<
Configuration
>
split
(
int
adviceNumber
)
{
List
<
Configuration
>
splittedConfigs
=
CassandraReaderHelper
.
splitJob
(
adviceNumber
,
jobConfig
,
cluster
);
return
splittedConfigs
;
}
}
public
static
class
Task
extends
Reader
.
Task
{
private
Configuration
taskConfig
;
private
Cluster
cluster
=
null
;
private
Session
session
=
null
;
private
String
queryString
=
null
;
private
ConsistencyLevel
consistencyLevel
;
private
int
columnNumber
=
0
;
private
List
<
String
>
columnMeta
=
null
;
@Override
public
void
init
()
{
this
.
taskConfig
=
super
.
getPluginJobConf
();
String
username
=
taskConfig
.
getString
(
Key
.
USERNAME
);
String
password
=
taskConfig
.
getString
(
Key
.
PASSWORD
);
String
hosts
=
taskConfig
.
getString
(
Key
.
HOST
);
Integer
port
=
taskConfig
.
getInt
(
Key
.
PORT
);
boolean
useSSL
=
taskConfig
.
getBool
(
Key
.
USESSL
);
String
keyspace
=
taskConfig
.
getString
(
Key
.
KEYSPACE
);
this
.
columnMeta
=
taskConfig
.
getList
(
Key
.
COLUMN
,
String
.
class
);
columnNumber
=
columnMeta
.
size
();
if
((
username
!=
null
)
&&
!
username
.
isEmpty
())
{
Cluster
.
Builder
clusterBuilder
=
Cluster
.
builder
().
withCredentials
(
username
,
password
)
.
withPort
(
Integer
.
valueOf
(
port
)).
addContactPoints
(
hosts
.
split
(
","
));
if
(
useSSL
)
{
clusterBuilder
=
clusterBuilder
.
withSSL
();
}
cluster
=
clusterBuilder
.
build
();
}
else
{
cluster
=
Cluster
.
builder
().
withPort
(
Integer
.
valueOf
(
port
))
.
addContactPoints
(
hosts
.
split
(
","
)).
build
();
}
session
=
cluster
.
connect
(
keyspace
);
String
cl
=
taskConfig
.
getString
(
Key
.
CONSITANCY_LEVEL
);
if
(
cl
!=
null
&&
!
cl
.
isEmpty
()
)
{
consistencyLevel
=
ConsistencyLevel
.
valueOf
(
cl
);
}
else
{
consistencyLevel
=
ConsistencyLevel
.
LOCAL_QUORUM
;
}
queryString
=
CassandraReaderHelper
.
getQueryString
(
taskConfig
,
cluster
);
LOG
.
info
(
"query = "
+
queryString
);
}
@Override
public
void
startRead
(
RecordSender
recordSender
)
{
ResultSet
r
=
session
.
execute
(
new
SimpleStatement
(
queryString
).
setConsistencyLevel
(
consistencyLevel
));
for
(
Row
row
:
r
)
{
Record
record
=
recordSender
.
createRecord
();
record
=
CassandraReaderHelper
.
buildRecord
(
record
,
row
,
r
.
getColumnDefinitions
(),
columnNumber
,
super
.
getTaskPluginCollector
());
if
(
record
!=
null
)
recordSender
.
sendToWriter
(
record
);
}
}
@Override
public
void
destroy
()
{
}
}
}
cassandrareader/src/main/java/com/alibaba/datax/plugin/reader/cassandrareader/CassandraReaderErrorCode.java
0 → 100644
View file @
2f8cc74b
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
cassandrareader
;
import
com.alibaba.datax.common.spi.ErrorCode
;
public
enum
CassandraReaderErrorCode
implements
ErrorCode
{
CONF_ERROR
(
"CassandraReader-00"
,
"配置错误."
),
;
private
final
String
code
;
private
final
String
description
;
private
CassandraReaderErrorCode
(
String
code
,
String
description
)
{
this
.
code
=
code
;
this
.
description
=
description
;
}
@Override
public
String
getCode
()
{
return
this
.
code
;
}
@Override
public
String
getDescription
()
{
return
this
.
description
;
}
@Override
public
String
toString
()
{
return
String
.
format
(
"Code:[%s], Description:[%s]. "
,
this
.
code
,
this
.
description
);
}
}
cassandrareader/src/main/java/com/alibaba/datax/plugin/reader/cassandrareader/CassandraReaderHelper.java
0 → 100644
View file @
2f8cc74b
This diff is collapsed.
Click to expand it.
cassandrareader/src/main/java/com/alibaba/datax/plugin/reader/cassandrareader/Key.java
0 → 100644
View file @
2f8cc74b
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
cassandrareader
;
/**
* Created by mazhenlin on 2019/8/19.
*/
public
class
Key
{
public
final
static
String
USERNAME
=
"username"
;
public
final
static
String
PASSWORD
=
"password"
;
public
final
static
String
HOST
=
"host"
;
public
final
static
String
PORT
=
"port"
;
public
final
static
String
USESSL
=
"useSSL"
;
public
final
static
String
KEYSPACE
=
"keyspace"
;
public
final
static
String
TABLE
=
"table"
;
public
final
static
String
COLUMN
=
"column"
;
public
final
static
String
WHERE
=
"where"
;
public
final
static
String
ALLOW_FILTERING
=
"allowFiltering"
;
public
final
static
String
CONSITANCY_LEVEL
=
"consistancyLevel"
;
public
final
static
String
MIN_TOKEN
=
"minToken"
;
public
final
static
String
MAX_TOKEN
=
"maxToken"
;
/**
* 每个列的名字
*/
public
static
final
String
COLUMN_NAME
=
"name"
;
/**
* 列分隔符
*/
public
static
final
String
COLUMN_SPLITTER
=
"format"
;
public
static
final
String
WRITE_TIME
=
"writetime("
;
public
static
final
String
ELEMENT_SPLITTER
=
"splitter"
;
public
static
final
String
ENTRY_SPLITTER
=
"entrySplitter"
;
public
static
final
String
KV_SPLITTER
=
"kvSplitter"
;
public
static
final
String
ELEMENT_CONFIG
=
"element"
;
public
static
final
String
TUPLE_CONNECTOR
=
"_"
;
public
static
final
String
KEY_CONFIG
=
"key"
;
public
static
final
String
VALUE_CONFIG
=
"value"
;
}
cassandrareader/src/main/java/com/alibaba/datax/plugin/reader/cassandrareader/LocalStrings.properties
0 → 100644
View file @
2f8cc74b
errorcode.config_invalid_exception
=
\u
914D
\u
7F6E
\u9519\u
8BEF
\ No newline at end of file
cassandrareader/src/main/java/com/alibaba/datax/plugin/reader/cassandrareader/LocalStrings_en_US.properties
0 → 100644
View file @
2f8cc74b
cassandrareader/src/main/java/com/alibaba/datax/plugin/reader/cassandrareader/LocalStrings_ja_JP.properties
0 → 100644
View file @
2f8cc74b
errorcode.config_invalid_exception
=
\u
914D
\u
7F6E
\u9519\u
8BEF
\ No newline at end of file
cassandrareader/src/main/java/com/alibaba/datax/plugin/reader/cassandrareader/LocalStrings_zh_CN.properties
0 → 100644
View file @
2f8cc74b
errorcode.config_invalid_exception
=
\u
914D
\u
7F6E
\u9519\u
8BEF
\ No newline at end of file
cassandrareader/src/main/java/com/alibaba/datax/plugin/reader/cassandrareader/LocalStrings_zh_HK.properties
0 → 100644
View file @
2f8cc74b
errorcode.config_invalid_exception
=
\u
914D
\u
7F6E
\u9519\u
8BEF
\ No newline at end of file
cassandrareader/src/main/java/com/alibaba/datax/plugin/reader/cassandrareader/LocalStrings_zh_TW.properties
0 → 100644
View file @
2f8cc74b
errorcode.config_invalid_exception
=
\u
914D
\u
7F6E
\u9519\u
8BEF
\ No newline at end of file
cassandrareader/src/main/resources/plugin.json
0 → 100644
View file @
2f8cc74b
{
"name"
:
"cassandrareader"
,
"class"
:
"com.alibaba.datax.plugin.reader.cassandrareader.CassandraReader"
,
"description"
:
"useScene: prod. mechanism: execute select cql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter."
,
"developer"
:
"alibaba"
}
\ No newline at end of file
cassandrareader/src/main/resources/plugin_job_template.json
0 → 100644
View file @
2f8cc74b
{
"name"
:
"cassandrareader"
,
"parameter"
:
{
"username"
:
""
,
"password"
:
""
,
"host"
:
""
,
"port"
:
""
,
"useSSL"
:
false
,
"keyspace"
:
""
,
"table"
:
""
,
"column"
:
[
"c1"
,
"c2"
,
"c3"
]
}
}
\ No newline at end of file
cassandrawriter/doc/cassandrawriter.md
0 → 100644
View file @
2f8cc74b
# CassandraWriter 插件文档
__
_
## 1 快速介绍
CassandraWriter插件实现了向Cassandra写入数据。在底层实现上,CassandraWriter通过datastax的java driver连接Cassandra实例,并执行相应的cql语句将数据写入cassandra中。
## 2 实现原理
简而言之,CassandraWriter通过java driver连接到Cassandra实例,并根据用户配置的信息生成INSERT CQL语句,然后发送到Cassandra。
对于用户配置Table、Column的信息,CassandraReader将其拼接为CQL语句发送到Cassandra。
## 3 功能说明
### 3.1 配置样例
*
配置一个从内存产生到Cassandra导入的作业:
```
{
"job": {
"setting": {
"speed": {
"channel": 5
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{"value":"name","type": "string"},
{"value":"false","type":"bool"},
{"value":"1988-08-08 08:08:08","type":"date"},
{"value":"addr","type":"bytes"},
{"value":1.234,"type":"double"},
{"value":12345678,"type":"long"},
{"value":2.345,"type":"double"},
{"value":3456789,"type":"long"},
{"value":"4a0ef8c0-4d97-11d0-db82-ebecdb03ffa5","type":"string"},
{"value":"value","type":"bytes"},
{"value":"-838383838,37377373,-383883838,27272772,393993939,-38383883,83883838,-1350403181,817650816,1630642337,251398784,-622020148","type":"string"},
],
"sliceRecordCount": 10000000
}
},
"writer": {
"name": "cassandrawriter",
"parameter": {
"host": "localhost",
"port": 9042,
"useSSL": false,
"keyspace": "stresscql",
"table": "dst",
"batchSize":10,
"column": [
"name",
"choice",
"date",
"address",
"dbl",
"lval",
"fval",
"ival",
"uid",
"value",
"listval"
]
}
}
}
]
}
}
```
### 3.2 参数说明
*
**host**
* 描述:Cassandra连接点的域名或ip,多个node之间用逗号分隔。 <br />
* 必选:是 <br />
* 默认值:无 <br />
*
**port**
* 描述:Cassandra端口。 <br />
* 必选:是 <br />
* 默认值:9042 <br />
*
**username**
* 描述:数据源的用户名 <br />
* 必选:否 <br />
* 默认值:无 <br />
*
**password**
* 描述:数据源指定用户名的密码 <br />
* 必选:否 <br />
* 默认值:无 <br />
*
**useSSL**
* 描述:是否使用SSL连接。<br />
* 必选:否 <br />
* 默认值:false <br />
*
**connectionsPerHost**
* 描述:客户端连接池配置:与服务器每个节点建多少个连接。<br />
* 必选:否 <br />
* 默认值:8 <br />
*
**maxPendingPerConnection**
* 描述:客户端连接池配置:每个连接最大请求数。<br />
* 必选:否 <br />
* 默认值:128 <br />
*
**keyspace**
* 描述:需要同步的表所在的keyspace。<br />
* 必选:是 <br />
* 默认值:无 <br />
*
**table**
* 描述:所选取的需要同步的表。<br />
* 必选:是 <br />
* 默认值:无 <br />
*
**column**
* 描述:所配置的表中需要同步的列集合。<br />
内容可以是列的名称或"writetime()"。如果将列名配置为writetime(),会将这一列的内容作为时间戳。
* 必选:是 <br />
* 默认值:无 <br />
*
**consistancyLevel**
* 描述:数据一致性级别。可选ONE|QUORUM|LOCAL_QUORUM|EACH_QUORUM|ALL|ANY|TWO|THREE|LOCAL_ONE<br />
* 必选:否 <br />
* 默认值:LOCAL_QUORUM <br />
*
**batchSize**
* 描述:一次批量提交(UNLOGGED BATCH)的记录数大小(条数)。注意batch的大小有如下限制:<br />
(1)不能超过65535。<br />
(2) batch中的内容大小受到服务器端batch_size_fail_threshold_in_kb的限制。<br />
(3) 如果batch中的内容超过了batch_size_warn_threshold_in_kb的限制,会打出warn日志,但并不影响写入,忽略即可。<br />
如果批量提交失败,会把这个批量的所有内容重新逐条写入一遍。
* 必选:否 <br />
* 默认值:1 <br />
### 3.3 类型转换
目前CassandraReader支持除counter和Custom类型之外的所有类型。
下面列出CassandraReader针对Cassandra类型转换列表:
| DataX 内部类型| Cassandra 数据类型 |
| -------- | ----- |
| Long |int, tinyint, smallint,varint,bigint,time|
| Double |float, double, decimal|
| String |ascii,varchar, text,uuid,timeuuid,duration,list,map,set,tuple,udt,inet |
| Date |date, timestamp |
| Boolean |bool |
| Bytes |blob |
请注意:
*
目前不支持counter类型和custom类型。
## 4 性能报告
略
## 5 约束限制
### 5.1 主备同步数据恢复问题
略
## 6 FAQ
cassandrawriter/pom.xml
0 → 100644
View file @
2f8cc74b
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<parent>
<artifactId>
datax-all
</artifactId>
<groupId>
com.alibaba.datax
</groupId>
<version>
0.0.1-SNAPSHOT
</version>
</parent>
<modelVersion>
4.0.0
</modelVersion>
<artifactId>
cassandrawriter
</artifactId>
<name>
cassandrawriter
</name>
<version>
0.0.1-SNAPSHOT
</version>
<packaging>
jar
</packaging>
<properties>
</properties>
<dependencies>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-common
</artifactId>
<version>
${datax-project-version}
</version>
<exclusions>
<exclusion>
<artifactId>
slf4j-log4j12
</artifactId>
<groupId>
org.slf4j
</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
com.datastax.cassandra
</groupId>
<artifactId>
cassandra-driver-core
</artifactId>
<version>
3.7.2
</version>
</dependency>
<dependency>
<groupId>
commons-codec
</groupId>
<artifactId>
commons-codec
</artifactId>
<version>
1.9
</version>
</dependency>
<!-- for test -->
<dependency>
<groupId>
junit
</groupId>
<artifactId>
junit
</artifactId>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-core
</artifactId>
<version>
${datax-project-version}
</version>
<exclusions>
<exclusion>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-service-face
</artifactId>
</exclusion>
<exclusion>
<groupId>
org.apache.hadoop
</groupId>
<artifactId>
hadoop-common
</artifactId>
</exclusion>
<exclusion>
<groupId>
org.apache.hive
</groupId>
<artifactId>
hive-exec
</artifactId>
</exclusion>
<exclusion>
<groupId>
org.apache.hive
</groupId>
<artifactId>
hive-serde
</artifactId>
</exclusion>
<exclusion>
<groupId>
javolution
</groupId>
<artifactId>
javolution
</artifactId>
</exclusion>
</exclusions>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.mockito
</groupId>
<artifactId>
mockito-all
</artifactId>
<version>
1.9.5
</version>
<scope>
test
</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>
src/main/java
</directory>
<includes>
<include>
**/*.properties
</include>
</includes>
</resource>
</resources>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>
maven-compiler-plugin
</artifactId>
<configuration>
<source>
1.6
</source>
<target>
1.6
</target>
<encoding>
${project-sourceEncoding}
</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>
maven-assembly-plugin
</artifactId>
<configuration>
<descriptors>
<descriptor>
src/main/assembly/package.xml
</descriptor>
</descriptors>
<finalName>
datax
</finalName>
</configuration>
<executions>
<execution>
<id>
dwzip
</id>
<phase>
package
</phase>
<goals>
<goal>
single
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
cassandrawriter/src/main/assembly/package.xml
0 → 100644
View file @
2f8cc74b
<assembly
xmlns=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"
>
<id></id>
<formats>
<format>
dir
</format>
</formats>
<includeBaseDirectory>
false
</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>
src/main/resources
</directory>
<includes>
<include>
plugin.json
</include>
<include>
plugin_job_template.json
</include>
</includes>
<outputDirectory>
plugin/writer/cassandrawriter
</outputDirectory>
</fileSet>
<fileSet>
<directory>
target/
</directory>
<includes>
<include>
cassandrawriter-0.0.1-SNAPSHOT.jar
</include>
</includes>
<outputDirectory>
plugin/writer/cassandrawriter
</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>
false
</useProjectArtifact>
<outputDirectory>
plugin/writer/cassandrawriter/libs
</outputDirectory>
<scope>
runtime
</scope>
</dependencySet>
</dependencySets>
</assembly>
cassandrawriter/src/main/java/com/alibaba/datax/plugin/writer/cassandrawriter/CassandraWriter.java
0 → 100644
View file @
2f8cc74b
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
cassandrawriter
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.concurrent.TimeUnit
;
import
com.alibaba.datax.common.element.Column
;
import
com.alibaba.datax.common.element.Record
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.plugin.RecordReceiver
;
import
com.alibaba.datax.common.spi.Writer
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.datastax.driver.core.BatchStatement
;
import
com.datastax.driver.core.BatchStatement.Type
;
import
com.datastax.driver.core.BoundStatement
;
import
com.datastax.driver.core.Cluster
;
import
com.datastax.driver.core.ColumnMetadata
;
import
com.datastax.driver.core.ConsistencyLevel
;
import
com.datastax.driver.core.DataType
;
import
com.datastax.driver.core.HostDistance
;
import
com.datastax.driver.core.PoolingOptions
;
import
com.datastax.driver.core.PreparedStatement
;
import
com.datastax.driver.core.ResultSetFuture
;
import
com.datastax.driver.core.Session
;
import
com.datastax.driver.core.TableMetadata
;
import
com.datastax.driver.core.querybuilder.Insert
;
import
com.datastax.driver.core.querybuilder.QueryBuilder
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
static
com
.
datastax
.
driver
.
core
.
querybuilder
.
QueryBuilder
.
timestamp
;
/**
* Created by mazhenlin on 2019/8/19.
*/
public
class
CassandraWriter
extends
Writer
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
CassandraWriter
.
class
);
public
static
class
Job
extends
Writer
.
Job
{
private
Configuration
originalConfig
=
null
;
@Override
public
List
<
Configuration
>
split
(
int
mandatoryNumber
)
{
List
<
Configuration
>
splitResultConfigs
=
new
ArrayList
<
Configuration
>();
for
(
int
j
=
0
;
j
<
mandatoryNumber
;
j
++)
{
splitResultConfigs
.
add
(
originalConfig
.
clone
());
}
return
splitResultConfigs
;
}
@Override
public
void
init
()
{
originalConfig
=
getPluginJobConf
();
}
@Override
public
void
destroy
()
{
}
}
public
static
class
Task
extends
Writer
.
Task
{
private
Configuration
taskConfig
;
private
Cluster
cluster
=
null
;
private
Session
session
=
null
;
private
PreparedStatement
statement
=
null
;
private
int
columnNumber
=
0
;
private
List
<
DataType
>
columnTypes
;
private
List
<
String
>
columnMeta
=
null
;
private
int
writeTimeCol
=
-
1
;
private
boolean
asyncWrite
=
false
;
private
long
batchSize
=
1
;
private
List
<
ResultSetFuture
>
unConfirmedWrite
;
private
List
<
BoundStatement
>
bufferedWrite
;
@Override
public
void
startWrite
(
RecordReceiver
lineReceiver
)
{
try
{
Record
record
;
while
((
record
=
lineReceiver
.
getFromReader
())
!=
null
)
{
if
(
record
.
getColumnNumber
()
!=
columnNumber
)
{
// 源头读取字段列数与目的表字段写入列数不相等,直接报错
throw
DataXException
.
asDataXException
(
CassandraWriterErrorCode
.
CONF_ERROR
,
String
.
format
(
"列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改."
,
record
.
getColumnNumber
(),
this
.
columnNumber
));
}
BoundStatement
boundStmt
=
statement
.
bind
();
for
(
int
i
=
0
;
i
<
columnNumber
;
i
++)
{
if
(
writeTimeCol
!=
-
1
&&
i
==
writeTimeCol
)
{
continue
;
}
Column
col
=
record
.
getColumn
(
i
);
int
pos
=
i
;
if
(
writeTimeCol
!=
-
1
&&
pos
>
writeTimeCol
)
{
pos
=
i
-
1
;
}
CassandraWriterHelper
.
setupColumn
(
boundStmt
,
pos
,
columnTypes
.
get
(
pos
),
col
);
}
if
(
writeTimeCol
!=
-
1
)
{
Column
col
=
record
.
getColumn
(
writeTimeCol
);
boundStmt
.
setLong
(
columnNumber
-
1
,
col
.
asLong
());
}
if
(
batchSize
<=
1
)
{
session
.
execute
(
boundStmt
);
}
else
{
if
(
asyncWrite
)
{
unConfirmedWrite
.
add
(
session
.
executeAsync
(
boundStmt
));
if
(
unConfirmedWrite
.
size
()
>=
batchSize
)
{
for
(
ResultSetFuture
write
:
unConfirmedWrite
)
{
write
.
getUninterruptibly
(
10000
,
TimeUnit
.
MILLISECONDS
);
}
unConfirmedWrite
.
clear
();
}
}
else
{
bufferedWrite
.
add
(
boundStmt
);
if
(
bufferedWrite
.
size
()
>=
batchSize
)
{
BatchStatement
batchStatement
=
new
BatchStatement
(
Type
.
UNLOGGED
);
batchStatement
.
addAll
(
bufferedWrite
);
try
{
session
.
execute
(
batchStatement
);
}
catch
(
Exception
e
)
{
LOG
.
error
(
"batch写入失败,尝试逐条写入."
,
e
);
for
(
BoundStatement
stmt:
bufferedWrite
)
{
session
.
execute
(
stmt
);
}
}
///LOG.info("batch finished. size = " + bufferedWrite.size());
bufferedWrite
.
clear
();
}
}
}
}
if
(
unConfirmedWrite
!=
null
&&
unConfirmedWrite
.
size
()
>
0
)
{
for
(
ResultSetFuture
write
:
unConfirmedWrite
)
{
write
.
getUninterruptibly
(
10000
,
TimeUnit
.
MILLISECONDS
);
}
unConfirmedWrite
.
clear
();
}
if
(
bufferedWrite
!=
null
&&
bufferedWrite
.
size
()
>
0
)
{
BatchStatement
batchStatement
=
new
BatchStatement
(
Type
.
UNLOGGED
);
batchStatement
.
addAll
(
bufferedWrite
);
session
.
execute
(
batchStatement
);
bufferedWrite
.
clear
();
}
}
catch
(
Exception
e
)
{
throw
DataXException
.
asDataXException
(
CassandraWriterErrorCode
.
WRITE_DATA_ERROR
,
e
);
}
}
@Override
public
void
init
()
{
this
.
taskConfig
=
super
.
getPluginJobConf
();
String
username
=
taskConfig
.
getString
(
Key
.
USERNAME
);
String
password
=
taskConfig
.
getString
(
Key
.
PASSWORD
);
String
hosts
=
taskConfig
.
getString
(
Key
.
HOST
);
Integer
port
=
taskConfig
.
getInt
(
Key
.
PORT
,
9042
);
boolean
useSSL
=
taskConfig
.
getBool
(
Key
.
USESSL
);
String
keyspace
=
taskConfig
.
getString
(
Key
.
KEYSPACE
);
String
table
=
taskConfig
.
getString
(
Key
.
TABLE
);
batchSize
=
taskConfig
.
getLong
(
Key
.
BATCH_SIZE
,
1
);
this
.
columnMeta
=
taskConfig
.
getList
(
Key
.
COLUMN
,
String
.
class
);
columnTypes
=
new
ArrayList
<
DataType
>(
columnMeta
.
size
());
columnNumber
=
columnMeta
.
size
();
asyncWrite
=
taskConfig
.
getBool
(
Key
.
ASYNC_WRITE
,
false
);
int
connectionsPerHost
=
taskConfig
.
getInt
(
Key
.
CONNECTIONS_PER_HOST
,
8
);
int
maxPendingPerConnection
=
taskConfig
.
getInt
(
Key
.
MAX_PENDING_CONNECTION
,
128
);
PoolingOptions
poolingOpts
=
new
PoolingOptions
()
.
setConnectionsPerHost
(
HostDistance
.
LOCAL
,
connectionsPerHost
,
connectionsPerHost
)
.
setMaxRequestsPerConnection
(
HostDistance
.
LOCAL
,
maxPendingPerConnection
)
.
setNewConnectionThreshold
(
HostDistance
.
LOCAL
,
100
);
Cluster
.
Builder
clusterBuilder
=
Cluster
.
builder
().
withPoolingOptions
(
poolingOpts
);
if
((
username
!=
null
)
&&
!
username
.
isEmpty
())
{
clusterBuilder
=
clusterBuilder
.
withCredentials
(
username
,
password
)
.
withPort
(
Integer
.
valueOf
(
port
)).
addContactPoints
(
hosts
.
split
(
","
));
if
(
useSSL
)
{
clusterBuilder
=
clusterBuilder
.
withSSL
();
}
}
else
{
clusterBuilder
=
clusterBuilder
.
withPort
(
Integer
.
valueOf
(
port
))
.
addContactPoints
(
hosts
.
split
(
","
));
}
cluster
=
clusterBuilder
.
build
();
session
=
cluster
.
connect
(
keyspace
);
TableMetadata
meta
=
cluster
.
getMetadata
().
getKeyspace
(
keyspace
).
getTable
(
table
);
Insert
insertStmt
=
QueryBuilder
.
insertInto
(
table
);
for
(
String
colunmnName
:
columnMeta
)
{
if
(
colunmnName
.
toLowerCase
().
equals
(
Key
.
WRITE_TIME
)
)
{
if
(
writeTimeCol
!=
-
1
)
{
throw
DataXException
.
asDataXException
(
CassandraWriterErrorCode
.
CONF_ERROR
,
"列配置信息有错误. 只能有一个时间戳列(writetime())"
);
}
writeTimeCol
=
columnTypes
.
size
();
continue
;
}
insertStmt
.
value
(
colunmnName
,
QueryBuilder
.
bindMarker
());
ColumnMetadata
col
=
meta
.
getColumn
(
colunmnName
);
if
(
col
==
null
)
{
throw
DataXException
.
asDataXException
(
CassandraWriterErrorCode
.
CONF_ERROR
,
String
.
format
(
"列配置信息有错误. 表中未找到列名 '%s' ."
,
colunmnName
));
}
columnTypes
.
add
(
col
.
getType
());
}
if
(
writeTimeCol
!=
-
1
)
{
insertStmt
.
using
(
timestamp
(
QueryBuilder
.
bindMarker
()));
}
String
cl
=
taskConfig
.
getString
(
Key
.
CONSITANCY_LEVEL
);
if
(
cl
!=
null
&&
!
cl
.
isEmpty
()
)
{
insertStmt
.
setConsistencyLevel
(
ConsistencyLevel
.
valueOf
(
cl
));
}
else
{
insertStmt
.
setConsistencyLevel
(
ConsistencyLevel
.
LOCAL_QUORUM
);
}
statement
=
session
.
prepare
(
insertStmt
);
if
(
batchSize
>
1
)
{
if
(
asyncWrite
)
{
unConfirmedWrite
=
new
ArrayList
<
ResultSetFuture
>();
}
else
{
bufferedWrite
=
new
ArrayList
<
BoundStatement
>();
}
}
}
@Override
public
void
destroy
()
{
}
}
}
cassandrawriter/src/main/java/com/alibaba/datax/plugin/writer/cassandrawriter/CassandraWriterErrorCode.java
0 → 100644
View file @
2f8cc74b
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
cassandrawriter
;
import
com.alibaba.datax.common.spi.ErrorCode
;
/**
* Created by mazhenlin on 2019/8/19.
*/
public
enum
CassandraWriterErrorCode
implements
ErrorCode
{
CONF_ERROR
(
"CassandraWriter-00"
,
"配置错误."
),
WRITE_DATA_ERROR
(
"CassandraWriter-01"
,
"写入数据时失败."
),
;
private
final
String
code
;
private
final
String
description
;
private
CassandraWriterErrorCode
(
String
code
,
String
description
)
{
this
.
code
=
code
;
this
.
description
=
description
;
}
@Override
public
String
getCode
()
{
return
this
.
code
;
}
@Override
public
String
getDescription
()
{
return
this
.
description
;
}
@Override
public
String
toString
()
{
return
String
.
format
(
"Code:[%s], Description:[%s]."
,
this
.
code
,
this
.
description
);
}
}
cassandrawriter/src/main/java/com/alibaba/datax/plugin/writer/cassandrawriter/CassandraWriterHelper.java
0 → 100644
View file @
2f8cc74b
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
cassandrawriter
;
import
java.math.BigDecimal
;
import
java.math.BigInteger
;
import
java.net.InetAddress
;
import
java.nio.ByteBuffer
;
import
java.text.SimpleDateFormat
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
java.util.Date
;
import
java.util.HashMap
;
import
java.util.HashSet
;
import
java.util.Iterator
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Set
;
import
java.util.UUID
;
import
com.alibaba.datax.common.element.Column
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONArray
;
import
com.alibaba.fastjson.JSONException
;
import
com.alibaba.fastjson.JSONObject
;
import
com.datastax.driver.core.BoundStatement
;
import
com.datastax.driver.core.CodecRegistry
;
import
com.datastax.driver.core.DataType
;
import
com.datastax.driver.core.DataType.Name
;
import
com.datastax.driver.core.Duration
;
import
com.datastax.driver.core.LocalDate
;
import
com.datastax.driver.core.TupleType
;
import
com.datastax.driver.core.TupleValue
;
import
com.datastax.driver.core.UDTValue
;
import
com.datastax.driver.core.UserType
;
import
com.datastax.driver.core.UserType.Field
;
import
com.google.common.base.Splitter
;
import
org.apache.commons.codec.binary.Base64
;
/**
* Created by mazhenlin on 2019/8/21.
*/
public
class
CassandraWriterHelper
{
static
CodecRegistry
registry
=
new
CodecRegistry
();
public
static
Object
parseFromString
(
String
s
,
DataType
sqlType
)
throws
Exception
{
if
(
s
==
null
||
s
.
isEmpty
())
{
if
(
sqlType
.
getName
()
==
Name
.
ASCII
||
sqlType
.
getName
()
==
Name
.
TEXT
||
sqlType
.
getName
()
==
Name
.
VARCHAR
)
{
return
s
;
}
else
{
return
null
;
}
}
switch
(
sqlType
.
getName
())
{
case
ASCII:
case
TEXT:
case
VARCHAR:
return
s
;
case
BLOB:
if
(
s
.
length
()
==
0
)
{
return
new
byte
[
0
];
}
byte
[]
byteArray
=
new
byte
[
s
.
length
()
/
2
];
for
(
int
i
=
0
;
i
<
byteArray
.
length
;
i
++)
{
String
subStr
=
s
.
substring
(
2
*
i
,
2
*
i
+
2
);
byteArray
[
i
]
=
((
byte
)
Integer
.
parseInt
(
subStr
,
16
));
}
return
ByteBuffer
.
wrap
(
byteArray
);
case
BOOLEAN:
return
Boolean
.
valueOf
(
s
);
case
TINYINT:
return
Byte
.
valueOf
(
s
);
case
SMALLINT:
return
Short
.
valueOf
(
s
);
case
INT:
return
Integer
.
valueOf
(
s
);
case
BIGINT:
return
Long
.
valueOf
(
s
);
case
VARINT:
return
new
BigInteger
(
s
,
10
);
case
FLOAT:
return
Float
.
valueOf
(
s
);
case
DOUBLE:
return
Double
.
valueOf
(
s
);
case
DECIMAL:
return
new
BigDecimal
(
s
);
case
DATE:
{
String
[]
a
=
s
.
split
(
"-"
);
if
(
a
.
length
!=
3
)
{
throw
new
Exception
(
String
.
format
(
"DATE类型数据 '%s' 格式不正确,必须为yyyy-mm-dd格式"
,
s
));
}
return
LocalDate
.
fromYearMonthDay
(
Integer
.
valueOf
(
a
[
0
]),
Integer
.
valueOf
(
a
[
1
]),
Integer
.
valueOf
(
a
[
2
]));
}
case
TIME:
return
Long
.
valueOf
(
s
);
case
TIMESTAMP:
return
new
Date
(
Long
.
valueOf
(
s
));
case
UUID:
case
TIMEUUID:
return
UUID
.
fromString
(
s
);
case
INET:
String
[]
b
=
s
.
split
(
"/"
);
if
(
b
.
length
<
2
)
{
return
InetAddress
.
getByName
(
s
);
}
byte
[]
addr
=
InetAddress
.
getByName
(
b
[
1
]).
getAddress
();
return
InetAddress
.
getByAddress
(
b
[
0
],
addr
);
case
DURATION:
return
Duration
.
from
(
s
);
case
LIST:
case
MAP:
case
SET:
case
TUPLE:
case
UDT:
Object
jsonObject
=
JSON
.
parse
(
s
);
return
parseFromJson
(
jsonObject
,
sqlType
);
default
:
throw
DataXException
.
asDataXException
(
CassandraWriterErrorCode
.
CONF_ERROR
,
"不支持您配置的列类型:"
+
sqlType
+
", 请检查您的配置 或者 联系 管理员."
);
}
// end switch
}
public
static
Object
parseFromJson
(
Object
jsonObject
,
DataType
type
)
throws
Exception
{
if
(
jsonObject
==
null
)
return
null
;
switch
(
type
.
getName
())
{
case
ASCII:
case
TEXT:
case
VARCHAR:
case
BOOLEAN:
case
TIME:
return
jsonObject
;
case
TINYINT:
return
((
Number
)
jsonObject
).
byteValue
();
case
SMALLINT:
return
((
Number
)
jsonObject
).
shortValue
();
case
INT:
return
((
Number
)
jsonObject
).
intValue
();
case
BIGINT:
return
((
Number
)
jsonObject
).
longValue
();
case
VARINT:
return
new
BigInteger
(
jsonObject
.
toString
());
case
FLOAT:
return
((
Number
)
jsonObject
).
floatValue
();
case
DOUBLE:
return
((
Number
)
jsonObject
).
doubleValue
();
case
DECIMAL:
return
new
BigDecimal
(
jsonObject
.
toString
());
case
BLOB:
return
ByteBuffer
.
wrap
(
Base64
.
decodeBase64
((
String
)
jsonObject
));
case
DATE:
return
LocalDate
.
fromMillisSinceEpoch
(((
Number
)
jsonObject
).
longValue
());
case
TIMESTAMP:
return
new
Date
(((
Number
)
jsonObject
).
longValue
());
case
DURATION:
return
Duration
.
from
(
jsonObject
.
toString
());
case
UUID:
case
TIMEUUID:
return
UUID
.
fromString
(
jsonObject
.
toString
());
case
INET:
return
InetAddress
.
getByName
((
String
)
jsonObject
);
case
LIST:
List
l
=
new
ArrayList
();
for
(
Object
o
:
(
JSONArray
)
jsonObject
)
{
l
.
add
(
parseFromJson
(
o
,
type
.
getTypeArguments
().
get
(
0
)));
}
return
l
;
case
MAP:
{
Map
m
=
new
HashMap
();
for
(
JSONObject
.
Entry
e
:
((
JSONObject
)
jsonObject
).
entrySet
())
{
Object
k
=
parseFromString
((
String
)
e
.
getKey
(),
type
.
getTypeArguments
().
get
(
0
));
Object
v
=
parseFromJson
(
e
.
getValue
(),
type
.
getTypeArguments
().
get
(
1
));
m
.
put
(
k
,
v
);
}
return
m
;
}
case
SET:
Set
s
=
new
HashSet
();
for
(
Object
o
:
(
JSONArray
)
jsonObject
)
{
s
.
add
(
parseFromJson
(
o
,
type
.
getTypeArguments
().
get
(
0
)));
}
return
s
;
case
TUPLE:
{
TupleValue
t
=
((
TupleType
)
type
).
newValue
();
int
j
=
0
;
for
(
Object
e
:
(
JSONArray
)
jsonObject
)
{
DataType
eleType
=
((
TupleType
)
type
).
getComponentTypes
().
get
(
j
);
t
.
set
(
j
,
parseFromJson
(
e
,
eleType
),
registry
.
codecFor
(
eleType
).
getJavaType
());
j
++;
}
return
t
;
}
case
UDT:
{
UDTValue
t
=
((
UserType
)
type
).
newValue
();
UserType
userType
=
t
.
getType
();
for
(
JSONObject
.
Entry
e
:
((
JSONObject
)
jsonObject
).
entrySet
())
{
DataType
eleType
=
userType
.
getFieldType
((
String
)
e
.
getKey
());
t
.
set
((
String
)
e
.
getKey
(),
parseFromJson
(
e
.
getValue
(),
eleType
),
registry
.
codecFor
(
eleType
).
getJavaType
());
}
return
t
;
}
}
return
null
;
}
public
static
void
setupColumn
(
BoundStatement
ps
,
int
pos
,
DataType
sqlType
,
Column
col
)
throws
Exception
{
if
(
col
.
getRawData
()
!=
null
)
{
switch
(
sqlType
.
getName
())
{
case
ASCII:
case
TEXT:
case
VARCHAR:
ps
.
setString
(
pos
,
col
.
asString
());
break
;
case
BLOB:
ps
.
setBytes
(
pos
,
ByteBuffer
.
wrap
(
col
.
asBytes
()));
break
;
case
BOOLEAN:
ps
.
setBool
(
pos
,
col
.
asBoolean
());
break
;
case
TINYINT:
ps
.
setByte
(
pos
,
col
.
asLong
().
byteValue
());
break
;
case
SMALLINT:
ps
.
setShort
(
pos
,
col
.
asLong
().
shortValue
());
break
;
case
INT:
ps
.
setInt
(
pos
,
col
.
asLong
().
intValue
());
break
;
case
BIGINT:
ps
.
setLong
(
pos
,
col
.
asLong
());
break
;
case
VARINT:
ps
.
setVarint
(
pos
,
col
.
asBigInteger
());
break
;
case
FLOAT:
ps
.
setFloat
(
pos
,
col
.
asDouble
().
floatValue
());
break
;
case
DOUBLE:
ps
.
setDouble
(
pos
,
col
.
asDouble
());
break
;
case
DECIMAL:
ps
.
setDecimal
(
pos
,
col
.
asBigDecimal
());
break
;
case
DATE:
ps
.
setDate
(
pos
,
LocalDate
.
fromMillisSinceEpoch
(
col
.
asDate
().
getTime
()));
break
;
case
TIME:
ps
.
setTime
(
pos
,
col
.
asLong
());
break
;
case
TIMESTAMP:
ps
.
setTimestamp
(
pos
,
col
.
asDate
());
break
;
case
UUID:
case
TIMEUUID:
ps
.
setUUID
(
pos
,
UUID
.
fromString
(
col
.
asString
()));
break
;
case
INET:
ps
.
setInet
(
pos
,
InetAddress
.
getByName
(
col
.
asString
()));
break
;
case
DURATION:
ps
.
set
(
pos
,
Duration
.
from
(
col
.
asString
()),
Duration
.
class
);
break
;
case
LIST:
ps
.
setList
(
pos
,
(
List
<?>)
parseFromString
(
col
.
asString
(),
sqlType
));
break
;
case
MAP:
ps
.
setMap
(
pos
,
(
Map
)
parseFromString
(
col
.
asString
(),
sqlType
));
break
;
case
SET:
ps
.
setSet
(
pos
,
(
Set
)
parseFromString
(
col
.
asString
(),
sqlType
));
break
;
case
TUPLE:
ps
.
setTupleValue
(
pos
,
(
TupleValue
)
parseFromString
(
col
.
asString
(),
sqlType
));
break
;
case
UDT:
ps
.
setUDTValue
(
pos
,
(
UDTValue
)
parseFromString
(
col
.
asString
(),
sqlType
));
break
;
default
:
throw
DataXException
.
asDataXException
(
CassandraWriterErrorCode
.
CONF_ERROR
,
"不支持您配置的列类型:"
+
sqlType
+
", 请检查您的配置 或者 联系 管理员."
);
}
// end switch
}
else
{
ps
.
setToNull
(
pos
);
}
}
}
cassandrawriter/src/main/java/com/alibaba/datax/plugin/writer/cassandrawriter/Key.java
0 → 100644
View file @
2f8cc74b
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
cassandrawriter
;
/**
* Created by mazhenlin on 2019/8/19.
*/
public
class
Key
{
public
final
static
String
USERNAME
=
"username"
;
public
final
static
String
PASSWORD
=
"password"
;
public
final
static
String
HOST
=
"host"
;
public
final
static
String
PORT
=
"port"
;
public
final
static
String
USESSL
=
"useSSL"
;
public
final
static
String
KEYSPACE
=
"keyspace"
;
public
final
static
String
TABLE
=
"table"
;
public
final
static
String
COLUMN
=
"column"
;
public
final
static
String
WRITE_TIME
=
"writetime()"
;
public
final
static
String
ASYNC_WRITE
=
"asyncWrite"
;
public
final
static
String
CONSITANCY_LEVEL
=
"consistancyLevel"
;
public
final
static
String
CONNECTIONS_PER_HOST
=
"connectionsPerHost"
;
public
final
static
String
MAX_PENDING_CONNECTION
=
"maxPendingPerConnection"
;
/**
* 异步写入的批次大小,默认1(不异步写入)
*/
public
final
static
String
BATCH_SIZE
=
"batchSize"
;
/**
* 每个列的名字
*/
public
static
final
String
COLUMN_NAME
=
"name"
;
/**
* 列分隔符
*/
public
static
final
String
COLUMN_SPLITTER
=
"format"
;
public
static
final
String
ELEMENT_SPLITTER
=
"splitter"
;
public
static
final
String
ENTRY_SPLITTER
=
"entrySplitter"
;
public
static
final
String
KV_SPLITTER
=
"kvSplitter"
;
public
static
final
String
ELEMENT_CONFIG
=
"element"
;
public
static
final
String
TUPLE_CONNECTOR
=
"_"
;
public
static
final
String
KEY_CONFIG
=
"key"
;
public
static
final
String
VALUE_CONFIG
=
"value"
;
}
cassandrawriter/src/main/java/com/alibaba/datax/plugin/writer/cassandrawriter/LocalStrings.properties
0 → 100644
View file @
2f8cc74b
errorcode.config_invalid_exception
=
\u
914D
\u
7F6E
\u9519\u
8BEF.
errorcode.write_failed_exception
=
\u5199\u5165\u6570\u
636E
\u
65F6
\u5931\u
8D25
\ No newline at end of file
cassandrawriter/src/main/java/com/alibaba/datax/plugin/writer/cassandrawriter/LocalStrings_en_US.properties
0 → 100644
View file @
2f8cc74b
errorcode.config_invalid_exception
=
Error in parameter configuration.
errorcode.write_failed_exception
=
\u5199\u5165\u6570\u
636E
\u
65F6
\u5931\u
8D25
\ No newline at end of file
cassandrawriter/src/main/java/com/alibaba/datax/plugin/writer/cassandrawriter/LocalStrings_ja_JP.properties
0 → 100644
View file @
2f8cc74b
errorcode.config_invalid_exception
=
\u
914D
\u
7F6E
\u9519\u
8BEF.
errorcode.write_failed_exception
=
\u5199\u5165\u6570\u
636E
\u
65F6
\u5931\u
8D25
\ No newline at end of file
cassandrawriter/src/main/java/com/alibaba/datax/plugin/writer/cassandrawriter/LocalStrings_zh_CN.properties
0 → 100644
View file @
2f8cc74b
errorcode.config_invalid_exception
=
\u
914D
\u
7F6E
\u9519\u
8BEF.
errorcode.write_failed_exception
=
\u5199\u5165\u6570\u
636E
\u
65F6
\u5931\u
8D25
\ No newline at end of file
cassandrawriter/src/main/java/com/alibaba/datax/plugin/writer/cassandrawriter/LocalStrings_zh_HK.properties
0 → 100644
View file @
2f8cc74b
errorcode.config_invalid_exception
=
\u
914D
\u
7F6E
\u9519\u
8BEF.
errorcode.write_failed_exception
=
\u5199\u5165\u6570\u
636E
\u
65F6
\u5931\u
8D25
\ No newline at end of file
cassandrawriter/src/main/java/com/alibaba/datax/plugin/writer/cassandrawriter/LocalStrings_zh_TW.properties
0 → 100644
View file @
2f8cc74b
errorcode.config_invalid_exception
=
\u
914D
\u
7F6E
\u9519\u
8BEF.
errorcode.write_failed_exception
=
\u5199\u5165\u6570\u
636E
\u
65F6
\u5931\u
8D25
\ No newline at end of file
cassandrawriter/src/main/resources/plugin.json
0 → 100644
View file @
2f8cc74b
{
"name"
:
"cassandrawriter"
,
"class"
:
"com.alibaba.datax.plugin.writer.cassandrawriter.CassandraWriter"
,
"description"
:
"useScene: prod. mechanism: use datax driver, execute insert sql."
,
"developer"
:
"alibaba"
}
cassandrawriter/src/main/resources/plugin_job_template.json
0 → 100644
View file @
2f8cc74b
{
"name"
:
"cassandrawriter"
,
"parameter"
:
{
"username"
:
""
,
"password"
:
""
,
"host"
:
""
,
"port"
:
""
,
"useSSL"
:
false
,
"keyspace"
:
""
,
"table"
:
""
,
"column"
:
[
"c1"
,
"c2"
,
"c3"
]
}
}
\ No newline at end of file
package.xml
View file @
2f8cc74b
...
...
@@ -166,6 +166,13 @@
</includes>
<outputDirectory>
datax
</outputDirectory>
</fileSet>
<fileSet>
<directory>
cassandrareader/target/datax/
</directory>
<includes>
<include>
**/*.*
</include>
</includes>
<outputDirectory>
datax
</outputDirectory>
</fileSet>
<!-- writer -->
<fileSet>
...
...
@@ -343,5 +350,12 @@
</includes>
<outputDirectory>
datax
</outputDirectory>
</fileSet>
<fileSet>
<directory>
cassandrawriter/target/datax/
</directory>
<includes>
<include>
**/*.*
</include>
</includes>
<outputDirectory>
datax
</outputDirectory>
</fileSet>
</fileSets>
</assembly>
pom.xml
View file @
2f8cc74b
...
...
@@ -63,6 +63,7 @@
<module>
hbase11xreader
</module>
<module>
hbase094xreader
</module>
<module>
opentsdbreader
</module>
<module>
cassandrareader
</module>
<!-- writer -->
<module>
mysqlwriter
</module>
...
...
@@ -89,6 +90,7 @@
<module>
tsdbwriter
</module>
<module>
adbpgwriter
</module>
<module>
gdbwriter
</module>
<module>
cassandrawriter
</module>
<!-- common support module -->
<module>
plugin-rdbms-util
</module>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment