Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
D
DataX
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
risk-feature
DataX
Commits
d9f2f4aa
Commit
d9f2f4aa
authored
Apr 14, 2020
by
jiye.tjy
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Add Clickhouse Writer
parent
643b6e9c
Changes
9
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
531 additions
and
1 deletion
+531
-1
pom.xml
clickhousewriter/pom.xml
+94
-0
package.xml
clickhousewriter/src/main/assembly/package.xml
+35
-0
ClickhouseWriter.java
...atax/plugin/writer/clickhousewriter/ClickhouseWriter.java
+330
-0
ClickhouseWriterErrorCode.java
...in/writer/clickhousewriter/ClickhouseWriterErrorCode.java
+31
-0
plugin.json
clickhousewriter/src/main/resources/plugin.json
+6
-0
plugin_job_template.json
clickhousewriter/src/main/resources/plugin_job_template.json
+21
-0
package.xml
package.xml
+7
-0
DataBaseType.java
...ava/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java
+6
-1
pom.xml
pom.xml
+1
-0
No files found.
clickhousewriter/pom.xml
0 → 100644
View file @
d9f2f4aa
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<parent>
<artifactId>
datax-all
</artifactId>
<groupId>
com.alibaba.datax
</groupId>
<version>
0.0.1-SNAPSHOT
</version>
</parent>
<modelVersion>
4.0.0
</modelVersion>
<artifactId>
clickhousewriter
</artifactId>
<name>
clickhousewriter
</name>
<packaging>
jar
</packaging>
<dependencies>
<dependency>
<groupId>
ru.yandex.clickhouse
</groupId>
<artifactId>
clickhouse-jdbc
</artifactId>
<version>
0.2.4
</version>
</dependency>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-core
</artifactId>
<version>
${datax-project-version}
</version>
</dependency>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-common
</artifactId>
<version>
${datax-project-version}
</version>
</dependency>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
simulator
</artifactId>
<version>
${datax-project-version}
</version>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
</dependency>
<dependency>
<groupId>
ch.qos.logback
</groupId>
<artifactId>
logback-classic
</artifactId>
</dependency>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
plugin-rdbms-util
</artifactId>
<version>
${datax-project-version}
</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>
src/main/java
</directory>
<includes>
<include>
**/*.properties
</include>
</includes>
</resource>
</resources>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>
maven-compiler-plugin
</artifactId>
<configuration>
<source>
${jdk-version}
</source>
<target>
${jdk-version}
</target>
<encoding>
${project-sourceEncoding}
</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>
maven-assembly-plugin
</artifactId>
<configuration>
<descriptors>
<descriptor>
src/main/assembly/package.xml
</descriptor>
</descriptors>
<finalName>
datax
</finalName>
</configuration>
<executions>
<execution>
<id>
dwzip
</id>
<phase>
package
</phase>
<goals>
<goal>
single
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
clickhousewriter/src/main/assembly/package.xml
0 → 100755
View file @
d9f2f4aa
<assembly
xmlns=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"
>
<id></id>
<formats>
<format>
dir
</format>
</formats>
<includeBaseDirectory>
false
</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>
src/main/resources
</directory>
<includes>
<include>
plugin.json
</include>
<include>
plugin_job_template.json
</include>
</includes>
<outputDirectory>
plugin/writer/clickhousewriter
</outputDirectory>
</fileSet>
<fileSet>
<directory>
target/
</directory>
<includes>
<include>
clickhousewriter-0.0.1-SNAPSHOT.jar
</include>
</includes>
<outputDirectory>
plugin/writer/clickhousewriter
</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>
false
</useProjectArtifact>
<outputDirectory>
plugin/writer/clickhousewriter/libs
</outputDirectory>
<scope>
runtime
</scope>
</dependencySet>
</dependencySets>
</assembly>
clickhousewriter/src/main/java/com/alibaba/datax/plugin/writer/clickhousewriter/ClickhouseWriter.java
0 → 100644
View file @
d9f2f4aa
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
clickhousewriter
;
import
com.alibaba.datax.common.element.Column
;
import
com.alibaba.datax.common.element.StringColumn
;
import
com.alibaba.datax.common.exception.CommonErrorCode
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.plugin.RecordReceiver
;
import
com.alibaba.datax.common.spi.Writer
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode
;
import
com.alibaba.datax.plugin.rdbms.util.DataBaseType
;
import
com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONArray
;
import
ru.yandex.clickhouse.ClickHouseTuple
;
import
java.sql.Array
;
import
java.sql.Connection
;
import
java.sql.PreparedStatement
;
import
java.sql.SQLException
;
import
java.sql.Timestamp
;
import
java.sql.Types
;
import
java.util.List
;
import
java.util.regex.Pattern
;
public
class
ClickhouseWriter
extends
Writer
{
private
static
final
DataBaseType
DATABASE_TYPE
=
DataBaseType
.
ClickHouse
;
public
static
class
Job
extends
Writer
.
Job
{
private
Configuration
originalConfig
=
null
;
private
CommonRdbmsWriter
.
Job
commonRdbmsWriterMaster
;
@Override
public
void
init
()
{
this
.
originalConfig
=
super
.
getPluginJobConf
();
this
.
commonRdbmsWriterMaster
=
new
CommonRdbmsWriter
.
Job
(
DATABASE_TYPE
);
this
.
commonRdbmsWriterMaster
.
init
(
this
.
originalConfig
);
}
@Override
public
void
prepare
()
{
this
.
commonRdbmsWriterMaster
.
prepare
(
this
.
originalConfig
);
}
@Override
public
List
<
Configuration
>
split
(
int
mandatoryNumber
)
{
return
this
.
commonRdbmsWriterMaster
.
split
(
this
.
originalConfig
,
mandatoryNumber
);
}
@Override
public
void
post
()
{
this
.
commonRdbmsWriterMaster
.
post
(
this
.
originalConfig
);
}
@Override
public
void
destroy
()
{
this
.
commonRdbmsWriterMaster
.
destroy
(
this
.
originalConfig
);
}
}
public
static
class
Task
extends
Writer
.
Task
{
private
Configuration
writerSliceConfig
;
private
CommonRdbmsWriter
.
Task
commonRdbmsWriterSlave
;
@Override
public
void
init
()
{
this
.
writerSliceConfig
=
super
.
getPluginJobConf
();
this
.
commonRdbmsWriterSlave
=
new
CommonRdbmsWriter
.
Task
(
DATABASE_TYPE
)
{
@Override
protected
PreparedStatement
fillPreparedStatementColumnType
(
PreparedStatement
preparedStatement
,
int
columnIndex
,
int
columnSqltype
,
Column
column
)
throws
SQLException
{
try
{
if
(
column
.
getRawData
()
==
null
)
{
preparedStatement
.
setNull
(
columnIndex
+
1
,
columnSqltype
);
return
preparedStatement
;
}
java
.
util
.
Date
utilDate
;
switch
(
columnSqltype
)
{
case
Types
.
CHAR
:
case
Types
.
NCHAR
:
case
Types
.
CLOB
:
case
Types
.
NCLOB
:
case
Types
.
VARCHAR
:
case
Types
.
LONGVARCHAR
:
case
Types
.
NVARCHAR
:
case
Types
.
LONGNVARCHAR
:
preparedStatement
.
setString
(
columnIndex
+
1
,
column
.
asString
());
break
;
case
Types
.
TINYINT
:
case
Types
.
SMALLINT
:
case
Types
.
INTEGER
:
case
Types
.
BIGINT
:
case
Types
.
DECIMAL
:
case
Types
.
FLOAT
:
case
Types
.
REAL
:
case
Types
.
DOUBLE
:
String
strValue
=
column
.
asString
();
if
(
emptyAsNull
&&
""
.
equals
(
strValue
))
{
preparedStatement
.
setNull
(
columnIndex
+
1
,
columnSqltype
);
}
else
{
switch
(
columnSqltype
)
{
case
Types
.
TINYINT
:
case
Types
.
SMALLINT
:
case
Types
.
INTEGER
:
preparedStatement
.
setInt
(
columnIndex
+
1
,
column
.
asBigInteger
().
intValue
());
break
;
case
Types
.
BIGINT
:
preparedStatement
.
setLong
(
columnIndex
+
1
,
column
.
asLong
());
break
;
case
Types
.
DECIMAL
:
preparedStatement
.
setBigDecimal
(
columnIndex
+
1
,
column
.
asBigDecimal
());
break
;
case
Types
.
REAL
:
case
Types
.
FLOAT
:
preparedStatement
.
setFloat
(
columnIndex
+
1
,
column
.
asDouble
().
floatValue
());
break
;
case
Types
.
DOUBLE
:
preparedStatement
.
setDouble
(
columnIndex
+
1
,
column
.
asDouble
());
break
;
}
}
break
;
case
Types
.
DATE
:
if
(
this
.
resultSetMetaData
.
getRight
().
get
(
columnIndex
)
.
equalsIgnoreCase
(
"year"
))
{
if
(
column
.
asBigInteger
()
==
null
)
{
preparedStatement
.
setString
(
columnIndex
+
1
,
null
);
}
else
{
preparedStatement
.
setInt
(
columnIndex
+
1
,
column
.
asBigInteger
().
intValue
());
}
}
else
{
java
.
sql
.
Date
sqlDate
=
null
;
try
{
utilDate
=
column
.
asDate
();
}
catch
(
DataXException
e
)
{
throw
new
SQLException
(
String
.
format
(
"Date 类型转换错误:[%s]"
,
column
));
}
if
(
null
!=
utilDate
)
{
sqlDate
=
new
java
.
sql
.
Date
(
utilDate
.
getTime
());
}
preparedStatement
.
setDate
(
columnIndex
+
1
,
sqlDate
);
}
break
;
case
Types
.
TIME
:
java
.
sql
.
Time
sqlTime
=
null
;
try
{
utilDate
=
column
.
asDate
();
}
catch
(
DataXException
e
)
{
throw
new
SQLException
(
String
.
format
(
"Date 类型转换错误:[%s]"
,
column
));
}
if
(
null
!=
utilDate
)
{
sqlTime
=
new
java
.
sql
.
Time
(
utilDate
.
getTime
());
}
preparedStatement
.
setTime
(
columnIndex
+
1
,
sqlTime
);
break
;
case
Types
.
TIMESTAMP
:
Timestamp
sqlTimestamp
=
null
;
if
(
column
instanceof
StringColumn
&&
column
.
asString
()
!=
null
)
{
String
timeStampStr
=
column
.
asString
();
// JAVA TIMESTAMP 类型入参必须是 "2017-07-12 14:39:00.123566" 格式
String
pattern
=
"^\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+"
;
boolean
isMatch
=
Pattern
.
matches
(
pattern
,
timeStampStr
);
if
(
isMatch
)
{
sqlTimestamp
=
Timestamp
.
valueOf
(
timeStampStr
);
preparedStatement
.
setTimestamp
(
columnIndex
+
1
,
sqlTimestamp
);
break
;
}
}
try
{
utilDate
=
column
.
asDate
();
}
catch
(
DataXException
e
)
{
throw
new
SQLException
(
String
.
format
(
"Date 类型转换错误:[%s]"
,
column
));
}
if
(
null
!=
utilDate
)
{
sqlTimestamp
=
new
Timestamp
(
utilDate
.
getTime
());
}
preparedStatement
.
setTimestamp
(
columnIndex
+
1
,
sqlTimestamp
);
break
;
case
Types
.
BINARY
:
case
Types
.
VARBINARY
:
case
Types
.
BLOB
:
case
Types
.
LONGVARBINARY
:
preparedStatement
.
setBytes
(
columnIndex
+
1
,
column
.
asBytes
());
break
;
case
Types
.
BOOLEAN
:
preparedStatement
.
setInt
(
columnIndex
+
1
,
column
.
asBigInteger
().
intValue
());
break
;
// warn: bit(1) -> Types.BIT 可使用setBoolean
// warn: bit(>1) -> Types.VARBINARY 可使用setBytes
case
Types
.
BIT
:
if
(
this
.
dataBaseType
==
DataBaseType
.
MySql
)
{
Boolean
asBoolean
=
column
.
asBoolean
();
if
(
asBoolean
!=
null
)
{
preparedStatement
.
setBoolean
(
columnIndex
+
1
,
asBoolean
);
}
else
{
preparedStatement
.
setNull
(
columnIndex
+
1
,
Types
.
BIT
);
}
}
else
{
preparedStatement
.
setString
(
columnIndex
+
1
,
column
.
asString
());
}
break
;
default
:
boolean
isHandled
=
fillPreparedStatementColumnType4CustomType
(
preparedStatement
,
columnIndex
,
columnSqltype
,
column
);
if
(
isHandled
)
{
break
;
}
throw
DataXException
.
asDataXException
(
DBUtilErrorCode
.
UNSUPPORTED_TYPE
,
String
.
format
(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段."
,
this
.
resultSetMetaData
.
getLeft
()
.
get
(
columnIndex
),
this
.
resultSetMetaData
.
getMiddle
()
.
get
(
columnIndex
),
this
.
resultSetMetaData
.
getRight
()
.
get
(
columnIndex
)));
}
return
preparedStatement
;
}
catch
(
DataXException
e
)
{
// fix类型转换或者溢出失败时,将具体哪一列打印出来
if
(
e
.
getErrorCode
()
==
CommonErrorCode
.
CONVERT_NOT_SUPPORT
||
e
.
getErrorCode
()
==
CommonErrorCode
.
CONVERT_OVER_FLOW
)
{
throw
DataXException
.
asDataXException
(
e
.
getErrorCode
(),
String
.
format
(
"类型转化错误. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段."
,
this
.
resultSetMetaData
.
getLeft
()
.
get
(
columnIndex
),
this
.
resultSetMetaData
.
getMiddle
()
.
get
(
columnIndex
),
this
.
resultSetMetaData
.
getRight
()
.
get
(
columnIndex
)));
}
else
{
throw
e
;
}
}
}
private
Object
toJavaArray
(
Object
val
)
{
if
(
null
==
val
)
{
return
null
;
}
else
if
(
val
instanceof
JSONArray
)
{
Object
[]
valArray
=
((
JSONArray
)
val
).
toArray
();
for
(
int
i
=
0
;
i
<
valArray
.
length
;
i
++)
{
valArray
[
i
]
=
this
.
toJavaArray
(
valArray
[
i
]);
}
return
valArray
;
}
else
{
return
val
;
}
}
boolean
fillPreparedStatementColumnType4CustomType
(
PreparedStatement
ps
,
int
columnIndex
,
int
columnSqltype
,
Column
column
)
throws
SQLException
{
switch
(
columnSqltype
)
{
case
Types
.
OTHER
:
if
(
this
.
resultSetMetaData
.
getRight
().
get
(
columnIndex
).
startsWith
(
"Tuple"
))
{
throw
DataXException
.
asDataXException
(
ClickhouseWriterErrorCode
.
TUPLE_NOT_SUPPORTED_ERROR
,
ClickhouseWriterErrorCode
.
TUPLE_NOT_SUPPORTED_ERROR
.
getDescription
());
}
else
{
ps
.
setString
(
columnIndex
+
1
,
column
.
asString
());
}
return
true
;
case
Types
.
ARRAY
:
Connection
conn
=
ps
.
getConnection
();
List
<
Object
>
values
=
JSON
.
parseArray
(
column
.
asString
(),
Object
.
class
);
for
(
int
i
=
0
;
i
<
values
.
size
();
i
++)
{
values
.
set
(
i
,
this
.
toJavaArray
(
values
.
get
(
i
)));
}
Array
array
=
conn
.
createArrayOf
(
"String"
,
values
.
toArray
());
ps
.
setArray
(
columnIndex
+
1
,
array
);
return
true
;
default
:
break
;
}
return
false
;
}
};
this
.
commonRdbmsWriterSlave
.
init
(
this
.
writerSliceConfig
);
}
@Override
public
void
prepare
()
{
this
.
commonRdbmsWriterSlave
.
prepare
(
this
.
writerSliceConfig
);
}
@Override
public
void
startWrite
(
RecordReceiver
recordReceiver
)
{
this
.
commonRdbmsWriterSlave
.
startWrite
(
recordReceiver
,
this
.
writerSliceConfig
,
super
.
getTaskPluginCollector
());
}
@Override
public
void
post
()
{
this
.
commonRdbmsWriterSlave
.
post
(
this
.
writerSliceConfig
);
}
@Override
public
void
destroy
()
{
this
.
commonRdbmsWriterSlave
.
destroy
(
this
.
writerSliceConfig
);
}
}
}
\ No newline at end of file
clickhousewriter/src/main/java/com/alibaba/datax/plugin/writer/clickhousewriter/ClickhouseWriterErrorCode.java
0 → 100644
View file @
d9f2f4aa
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
clickhousewriter
;
import
com.alibaba.datax.common.spi.ErrorCode
;
public
enum
ClickhouseWriterErrorCode
implements
ErrorCode
{
TUPLE_NOT_SUPPORTED_ERROR
(
"ClickhouseWriter-00"
,
"不支持TUPLE类型导入."
),
;
private
final
String
code
;
private
final
String
description
;
private
ClickhouseWriterErrorCode
(
String
code
,
String
description
)
{
this
.
code
=
code
;
this
.
description
=
description
;
}
@Override
public
String
getCode
()
{
return
this
.
code
;
}
@Override
public
String
getDescription
()
{
return
this
.
description
;
}
@Override
public
String
toString
()
{
return
String
.
format
(
"Code:[%s], Description:[%s]."
,
this
.
code
,
this
.
description
);
}
}
clickhousewriter/src/main/resources/plugin.json
0 → 100755
View file @
d9f2f4aa
{
"name"
:
"clickhousewriter"
,
"class"
:
"com.alibaba.datax.plugin.writer.clickhousewriter.ClickhouseWriter"
,
"description"
:
"useScene: prod. mechanism: Jdbc connection using the database, execute insert sql."
,
"developer"
:
"jiye.tjy"
}
\ No newline at end of file
clickhousewriter/src/main/resources/plugin_job_template.json
0 → 100644
View file @
d9f2f4aa
{
"name"
:
"clickhousewriter"
,
"parameter"
:
{
"username"
:
"username"
,
"password"
:
"password"
,
"column"
:
[
"col1"
,
"col2"
,
"col3"
],
"connection"
:
[
{
"jdbcUrl"
:
"jdbc:clickhouse://<host>:<port>[/<database>]"
,
"table"
:
[
"table1"
,
"table2"
]
}
],
"preSql"
:
[],
"postSql"
:
[],
"batchSize"
:
65536
,
"batchByteSize"
:
134217728
,
"dryRun"
:
false
,
"writeMode"
:
"insert"
}
}
\ No newline at end of file
package.xml
View file @
d9f2f4aa
...
...
@@ -357,5 +357,12 @@
</includes>
<outputDirectory>
datax
</outputDirectory>
</fileSet>
<fileSet>
<directory>
clickhousewriter/target/datax/
</directory>
<includes>
<include>
**/*.*
</include>
</includes>
<outputDirectory>
datax
</outputDirectory>
</fileSet>
</fileSets>
</assembly>
plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java
View file @
d9f2f4aa
...
...
@@ -18,7 +18,8 @@ public enum DataBaseType {
PostgreSQL
(
"postgresql"
,
"org.postgresql.Driver"
),
RDBMS
(
"rdbms"
,
"com.alibaba.datax.plugin.rdbms.util.DataBaseType"
),
DB2
(
"db2"
,
"com.ibm.db2.jcc.DB2Driver"
),
ADS
(
"ads"
,
"com.mysql.jdbc.Driver"
);
ADS
(
"ads"
,
"com.mysql.jdbc.Driver"
),
ClickHouse
(
"clickhouse"
,
"ru.yandex.clickhouse.ClickHouseDriver"
);
private
String
typeName
;
...
...
@@ -54,6 +55,8 @@ public enum DataBaseType {
break
;
case
PostgreSQL:
break
;
case
ClickHouse:
break
;
case
RDBMS:
break
;
default
:
...
...
@@ -91,6 +94,8 @@ public enum DataBaseType {
break
;
case
PostgreSQL:
break
;
case
ClickHouse:
break
;
case
RDBMS:
break
;
default
:
...
...
pom.xml
View file @
d9f2f4aa
...
...
@@ -93,6 +93,7 @@
<module>
adbpgwriter
</module>
<module>
gdbwriter
</module>
<module>
cassandrawriter
</module>
<module>
clickhousewriter
</module>
<!-- common support module -->
<module>
plugin-rdbms-util
</module>
<module>
plugin-unstructured-storage-util
</module>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment