Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
D
DataX
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
risk-feature
DataX
Commits
8059bba6
Commit
8059bba6
authored
Jul 25, 2019
by
韵成
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add adbpgwriter plugin
parent
3c21cc28
Changes
15
Hide whitespace changes
Inline
Side-by-side
Showing
15 changed files
with
896 additions
and
1 deletion
+896
-1
duplicate
adbpgwriter/duplicate
+0
-0
pom.xml
adbpgwriter/pom.xml
+113
-0
package.xml
adbpgwriter/src/main/assembly/package.xml
+35
-0
adbpgwriter.md
adbpgwriter/src/main/doc/adbpgwriter.md
+216
-0
AdbpgWriter.java
....alibaba.datax.plugin.writer.adbpgwriter/AdbpgWriter.java
+117
-0
package-info.java
...alibaba.datax.plugin.writer.adbpgwriter/package-info.java
+8
-0
Adb4pgClientProxy.java
...tax/plugin/writer/adbpgwriter/copy/Adb4pgClientProxy.java
+182
-0
AdbProxy.java
...libaba/datax/plugin/writer/adbpgwriter/copy/AdbProxy.java
+13
-0
Adb4pgUtil.java
...baba/datax/plugin/writer/adbpgwriter/util/Adb4pgUtil.java
+146
-0
Constant.java
...libaba/datax/plugin/writer/adbpgwriter/util/Constant.java
+12
-0
Key.java
...com/alibaba/datax/plugin/writer/adbpgwriter/util/Key.java
+26
-0
plugin.json
adbpgwriter/src/main/resources/plugin.json
+6
-0
plugin_job_template.json
adbpgwriter/src/main/resources/plugin_job_template.json
+13
-0
package.xml
package.xml
+7
-0
pom.xml
pom.xml
+2
-1
No files found.
adbpgwriter/duplicate
0 → 100644
View file @
8059bba6
adbpgwriter/pom.xml
0 → 100644
View file @
8059bba6
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<parent>
<artifactId>
datax-all
</artifactId>
<groupId>
com.alibaba.datax
</groupId>
<version>
0.0.1-SNAPSHOT
</version>
</parent>
<modelVersion>
4.0.0
</modelVersion>
<artifactId>
adbpgwriter
</artifactId>
<name>
adbpgwriter
</name>
<packaging>
jar
</packaging>
<dependencies>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-common
</artifactId>
<version>
${datax-project-version}
</version>
<exclusions>
<exclusion>
<artifactId>
slf4j-log4j12
</artifactId>
<groupId>
org.slf4j
</groupId>
</exclusion>
<exclusion>
<groupId>
mysql
</groupId>
<artifactId>
mysql-connector-java
</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-core
</artifactId>
<version>
${datax-project-version}
</version>
</dependency>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
plugin-rdbms-util
</artifactId>
<version>
${datax-project-version}
</version>
<exclusions>
<exclusion>
<groupId>
com.alibaba
</groupId>
<artifactId>
druid
</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
com.alibaba
</groupId>
<artifactId>
druid
</artifactId>
<version>
1.1.17
</version>
</dependency>
<dependency>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
</dependency>
<dependency>
<groupId>
org.apache.commons
</groupId>
<artifactId>
commons-exec
</artifactId>
<version>
1.3
</version>
</dependency>
<dependency>
<groupId>
ch.qos.logback
</groupId>
<artifactId>
logback-classic
</artifactId>
</dependency>
<dependency>
<groupId>
commons-configuration
</groupId>
<artifactId>
commons-configuration
</artifactId>
<version>
1.10
</version>
</dependency>
<dependency>
<groupId>
com.alibaba.cloud.analyticdb
</groupId>
<artifactId>
adb4pgclient
</artifactId>
<version>
1.0.0-SNAPSHOT
</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>
maven-compiler-plugin
</artifactId>
<configuration>
<source>
1.8
</source>
<target>
1.8
</target>
<encoding>
${project-sourceEncoding}
</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>
maven-assembly-plugin
</artifactId>
<configuration>
<descriptors>
<descriptor>
src/main/assembly/package.xml
</descriptor>
</descriptors>
<finalName>
datax
</finalName>
</configuration>
<executions>
<execution>
<id>
dwzip
</id>
<phase>
package
</phase>
<goals>
<goal>
single
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
adbpgwriter/src/main/assembly/package.xml
0 → 100644
View file @
8059bba6
<assembly
xmlns=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"
>
<id></id>
<formats>
<format>
dir
</format>
</formats>
<includeBaseDirectory>
false
</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>
src/main/resources
</directory>
<includes>
<include>
plugin.json
</include>
<include>
plugin_job_template.json
</include>
</includes>
<outputDirectory>
plugin/writer/adbpgwriter
</outputDirectory>
</fileSet>
<fileSet>
<directory>
target/
</directory>
<includes>
<include>
adbpgwriter-0.0.1-SNAPSHOT.jar
</include>
</includes>
<outputDirectory>
plugin/writer/adbpgwriter
</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>
false
</useProjectArtifact>
<outputDirectory>
plugin/writer/adbpgwriter/libs
</outputDirectory>
<scope>
runtime
</scope>
</dependencySet>
</dependencySets>
</assembly>
adbpgwriter/src/main/doc/adbpgwriter.md
0 → 100644
View file @
8059bba6
# DataX ADB PG Writer
---
## 1 快速介绍
AdbpgWriter 插件实现了写入数据到 ABD PG版数据库的功能。在底层实现上,AdbpgWriter 插件会先缓存需要写入的数据,当缓存的
数据量达到 commitSize 时,插件会通过 JDBC 连接远程 ADB PG版 数据库,并执行 COPY 命令将数据写入 ADB PG 数据库。
AdbpgWriter 可以作为数据迁移工具为用户提供服务。
## 2 实现原理
AdbpgWriter 通过 DataX 框架获取 Reader 生成的协议数据,首先会将数据缓存,当缓存的数据量达到commitSize时,插件根据你配置生成相应的COPY语句,执行
COPY命令将数据写入ADB PG数据库中。
## 3 功能说明
### 3.1 配置样例
*
这里使用一份从内存产生到 AdbpgWriter导入的数据
```
json
{
"job"
:
{
"setting"
:
{
"speed"
:
{
"channel"
:
32
}
},
"content"
:
[
{
"reader"
:
{
"name"
:
"streamreader"
,
"parameter"
:
{
"column"
:
[
{
"value"
:
"DataX"
,
"type"
:
"string"
},
{
"value"
:
19880808
,
"type"
:
"long"
},
{
"value"
:
"1988-08-08 08:08:08"
,
"type"
:
"date"
},
{
"value"
:
true
,
"type"
:
"bool"
},
{
"value"
:
"test"
,
"type"
:
"bytes"
}
]
},
"sliceRecordCount"
:
1000
},
"writer"
:
{
"name"
:
"adbpgwriter"
,
"parameter"
:
{
"username"
:
"username"
,
"password"
:
"password"
,
"host"
:
"host"
,
"port"
:
"1234"
,
"database"
:
"database"
,
"schema"
:
"schema"
,
"table"
:
"table"
,
"column"
:
[
"*"
]
}
}
}
]
}
}
```
### 3.2 参数说明
*
**name**
*
描述:插件名称
<br
/>
*
必选:是
<br
/>
*
默认值:无
<br
/>
*
**username**
*
描述:目的数据库的用户名
<br
/>
*
必选:是
<br
/>
*
默认值:无
<br
/>
*
**password**
*
描述:目的数据库的密码
<br
/>
*
必选:是
<br
/>
*
默认值:无
<br
/>
*
**host**
*
描述:目的数据库主机名
<br
/>
*
必选:是
<br
/>
*
默认值:无
<br
/>
*
**port**
*
描述:目的数据库的端口
<br
/>
*
必选:是
<br
/>
*
默认值:无
<br
/>
*
**database**
*
描述:需要写入的表所属的数据库名称
<br
/>
*
必选:是
<br
/>
*
默认值:无
<br
/>
*
**schema**
*
描述:需要写入的表所属的schema名称
<br
/>
*
必选:是
<br
/>
*
默认值:无
<br
/>
*
**table**
*
描述:需要写入的表名称
<br
/>
*
必选:是
<br
/>
*
默认值:无
<br
/>
*
**column**
*
描述:目的表需要写入数据的字段,字段之间用英文逗号分隔。例如: "column":
[
"id","name","age"
]
。如果要依次写入全部列,使用
*表示, 例如: "column": ["*
"]
注意:1、我们强烈不推荐你这样配置,因为当你目的表字段个数、类型等有改动时,你的任务可能运行不正确或者失败
2、此处 column 不能配置任何常量值
*
必选:是
<br
/>
*
默认值:否
<br
/>
### 3.3 类型转换
目前 AdbpgWriter 支持大部分 ADB PG 数据库的类型,但也存在部分没有支持的情况,请注意检查你的类型。
下面列出 AdbpgWriter 针对 ADB PG 类型转换列表:
| DataX 内部类型| ADB PG 数据类型 |
| -------- | ----- |
| Long |bigint, bigserial, integer, smallint, serial |
| Double |double precision, float, numeric, real |
| String |varchar, char, text|
| Date |date, time, timestamp |
| Boolean |bool|
## 4 性能报告
### 4.1 环境准备
#### 4.1.1 数据特征
建表语句:
```
sql
create
table
schematest
.
test_datax
(
t1
int
,
t2
bigint
,
t3
bigserial
,
t4
float
,
t5
timestamp
,
t6
varchar
)
distributed
by
(
t1
);
```
#### 4.1.2 机器参数
*
执行DataX的机器参数为:
1.
cpu: 24核
2.
mem: 96GB
*
ADB PG数据库机器参数为:
1.
平均core数量:4
2.
primary segment 数量: 4
3.
计算组数量:2
### 4.2 测试报告
#### 4.2.1 单表测试报告
| 通道数| commitSize MB | DataX速度(Rec/s)| DataX流量(M/s)
|--------|--------| --------|--------|
|1| 10 | 54098 | 15.54 |
|1| 20 | 55000 | 15.80 |
|4| 10 | 183333 | 52.66 |
|4| 20 | 173684 | 49.89 |
|8| 10 | 330000 | 94.79 |
|8| 20 | 300000 | 86.17 |
|16| 10 | 412500 | 118.48 |
|16| 20 | 366666 | 105.32 |
|32| 10 | 366666 | 105.32 |
#### 4.2.2 性能测试小结
1.
`channel数对性能影响很大`
2.
`通常不建议写入数据库时,通道个数 > 32`
\ No newline at end of file
adbpgwriter/src/main/java/com.alibaba.datax.plugin.writer.adbpgwriter/AdbpgWriter.java
0 → 100644
View file @
8059bba6
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
adbpgwriter
;
import
com.alibaba.datax.common.plugin.RecordReceiver
;
import
com.alibaba.datax.common.plugin.RecordSender
;
import
com.alibaba.datax.common.spi.Writer
;
import
com.alibaba.datax.common.util.Configuration
;
import
java.util.ArrayList
;
import
java.util.List
;
import
com.alibaba.datax.plugin.rdbms.util.DataBaseType
;
import
com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter
;
import
com.alibaba.datax.plugin.rdbms.writer.Key
;
import
com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil
;
import
com.alibaba.datax.plugin.writer.adbpgwriter.copy.Adb4pgClientProxy
;
import
com.alibaba.datax.plugin.writer.adbpgwriter.util.Adb4pgUtil
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
static
com
.
alibaba
.
datax
.
plugin
.
rdbms
.
util
.
DBUtilErrorCode
.*;
import
static
com
.
alibaba
.
datax
.
plugin
.
rdbms
.
util
.
DataBaseType
.
PostgreSQL
;
/**
* Created by yuncheng on 07/13/2019.
*/
public
class
AdbpgWriter
extends
Writer
{
private
static
final
DataBaseType
DATABASE_TYPE
=
DataBaseType
.
PostgreSQL
;
public
static
class
Job
extends
Writer
.
Job
{
private
Configuration
originalConfig
;
private
CommonRdbmsWriter
.
Job
commonRdbmsWriterMaster
;
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
Writer
.
Job
.
class
);
@Override
public
void
init
()
{
this
.
originalConfig
=
super
.
getPluginJobConf
();
LOG
.
info
(
"in Job.init(), config is:[\n{}\n]"
,
originalConfig
.
toJSON
());
this
.
commonRdbmsWriterMaster
=
new
CommonRdbmsWriter
.
Job
(
DATABASE_TYPE
);
//convert to DatabaseConfig, use DatabaseConfig to check user configuration
Adb4pgUtil
.
checkConfig
(
originalConfig
);
}
@Override
public
void
prepare
()
{
Adb4pgUtil
.
prepare
(
originalConfig
);
}
@Override
public
List
<
Configuration
>
split
(
int
adviceNumber
)
{
List
<
Configuration
>
splitResult
=
new
ArrayList
<
Configuration
>();
for
(
int
i
=
0
;
i
<
adviceNumber
;
i
++)
{
splitResult
.
add
(
this
.
originalConfig
.
clone
());
}
return
splitResult
;
}
@Override
public
void
post
()
{
Adb4pgUtil
.
post
(
originalConfig
);
}
@Override
public
void
destroy
()
{
}
}
public
static
class
Task
extends
Writer
.
Task
{
private
Configuration
writerSliceConfig
;
private
CommonRdbmsWriter
.
Task
commonRdbmsWriterSlave
;
private
Adb4pgClientProxy
adb4pgClientProxy
;
//Adb4pgClient client;
@Override
public
void
init
()
{
this
.
writerSliceConfig
=
super
.
getPluginJobConf
();
this
.
adb4pgClientProxy
=
new
Adb4pgClientProxy
(
writerSliceConfig
,
super
.
getTaskPluginCollector
());
this
.
commonRdbmsWriterSlave
=
new
CommonRdbmsWriter
.
Task
(
DATABASE_TYPE
){
@Override
public
String
calcValueHolder
(
String
columnType
){
if
(
"serial"
.
equalsIgnoreCase
(
columnType
)){
return
"?::int"
;
}
else
if
(
"bit"
.
equalsIgnoreCase
(
columnType
)){
return
"?::bit varying"
;
}
return
"?::"
+
columnType
;
}
};
}
@Override
public
void
prepare
()
{
}
@Override
public
void
startWrite
(
RecordReceiver
recordReceiver
)
{
this
.
adb4pgClientProxy
.
startWriteWithConnection
(
recordReceiver
,
Adb4pgUtil
.
getAdbpgConnect
(
writerSliceConfig
));
}
@Override
public
void
post
()
{
}
@Override
public
void
destroy
()
{
}
}
}
adbpgwriter/src/main/java/com.alibaba.datax.plugin.writer.adbpgwriter/package-info.java
0 → 100644
View file @
8059bba6
/**
* Greenplum Writer.
*
* @since 0.0.1
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
adbpgwriter
;
adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/copy/Adb4pgClientProxy.java
0 → 100644
View file @
8059bba6
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
adbpgwriter
.
copy
;
import
com.alibaba.cloud.analyticdb.adb4pgclient.*
;
import
com.alibaba.datax.common.element.Column
;
import
com.alibaba.datax.common.element.Record
;
import
com.alibaba.datax.common.element.StringColumn
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.plugin.RecordReceiver
;
import
com.alibaba.datax.common.plugin.TaskPluginCollector
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.core.transport.record.DefaultRecord
;
import
com.alibaba.datax.plugin.rdbms.util.DBUtil
;
import
com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode
;
import
com.alibaba.datax.plugin.writer.adbpgwriter.util.Adb4pgUtil
;
import
com.alibaba.datax.plugin.writer.adbpgwriter.util.Constant
;
import
com.alibaba.datax.plugin.writer.adbpgwriter.util.Key
;
import
org.apache.commons.lang3.tuple.Pair
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.sql.Connection
;
import
java.sql.Types
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* Created by yuncheng on 07/15/2019.
*/
public
class
Adb4pgClientProxy
implements
AdbProxy
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
Adb4pgClientProxy
.
class
);
private
Adb4pgClient
adb4pgClient
;
private
String
table
;
private
String
schema
;
List
<
String
>
columns
;
private
TableInfo
tableInfo
;
private
TaskPluginCollector
taskPluginCollector
;
private
boolean
useRawData
[];
public
Adb4pgClientProxy
(
Configuration
configuration
,
TaskPluginCollector
taskPluginCollector
)
{
this
.
taskPluginCollector
=
taskPluginCollector
;
DatabaseConfig
databaseConfig
=
Adb4pgUtil
.
convertConfiguration
(
configuration
);
// If the value of column is empty, set null
boolean
emptyAsNull
=
configuration
.
getBool
(
Key
.
EMPTY_AS_NULL
,
false
);
databaseConfig
.
setEmptyAsNull
(
emptyAsNull
);
// 使用insert ignore into方式进行插入
boolean
ignoreInsert
=
configuration
.
getBool
(
Key
.
IGNORE_INSERT
,
false
);
databaseConfig
.
setInsertIgnore
(
ignoreInsert
);
// commit时,写入ADB出现异常时重试的3次
int
retryTimes
=
configuration
.
getInt
(
Key
.
RETRY_CONNECTION_TIME
,
Constant
.
DEFAULT_RETRY_TIMES
);
databaseConfig
.
setRetryTimes
(
retryTimes
);
// 重试间隔的时间为1s,单位是ms
int
retryIntervalTime
=
configuration
.
getInt
(
Key
.
RETRY_INTERVAL_TIME
,
1000
);
databaseConfig
.
setRetryIntervalTime
(
retryIntervalTime
);
// 设置自动提交的SQL长度(单位Byte),默认为32KB,一般不建议设置
int
commitSize
=
configuration
.
getInt
(
"commitSize"
,
10
*
1024
*
1024
);
databaseConfig
.
setCommitSize
(
commitSize
);
// 设置写入adb时的并发线程数,默认4,针对配置的所有表
int
parallelNumber
=
configuration
.
getInt
(
"parallelNumber"
,
4
);
databaseConfig
.
setParallelNumber
(
parallelNumber
);
// 设置client中使用的logger对象,此处使用slf4j.Logger
databaseConfig
.
setLogger
(
Adb4pgClientProxy
.
LOG
);
// sdk 默认值为true
boolean
shareDataSource
=
configuration
.
getBool
(
"shareDataSource"
,
true
);
databaseConfig
.
setShareDataSource
(
shareDataSource
);
//List<String> columns = configuration.getList(Key.COLUMN, String.class);
this
.
table
=
configuration
.
getString
(
com
.
alibaba
.
datax
.
plugin
.
rdbms
.
writer
.
Key
.
TABLE
);
this
.
schema
=
configuration
.
getString
(
com
.
alibaba
.
datax
.
plugin
.
writer
.
adbpgwriter
.
util
.
Key
.
SCHEMA
);
this
.
adb4pgClient
=
new
Adb4pgClient
(
databaseConfig
);
this
.
columns
=
databaseConfig
.
getColumns
(
table
,
schema
);
this
.
tableInfo
=
adb4pgClient
.
getTableInfo
(
table
,
schema
);
this
.
useRawData
=
new
boolean
[
this
.
columns
.
size
()];
List
<
ColumnInfo
>
columnInfos
=
tableInfo
.
getColumns
();
for
(
int
i
=
0
;
i
<
this
.
columns
.
size
();
i
++)
{
String
oriEachColumn
=
columns
.
get
(
i
);
String
eachColumn
=
oriEachColumn
;
// 防御性保留字
if
(
eachColumn
.
startsWith
(
Constant
.
COLUMN_QUOTE_CHARACTER
)
&&
eachColumn
.
endsWith
(
Constant
.
COLUMN_QUOTE_CHARACTER
))
{
eachColumn
=
eachColumn
.
substring
(
1
,
eachColumn
.
length
()
-
1
);
}
for
(
ColumnInfo
eachAdsColumn
:
columnInfos
)
{
if
(
eachColumn
.
equals
(
eachAdsColumn
.
getName
()))
{
int
columnSqltype
=
eachAdsColumn
.
getDataType
().
sqlType
;
switch
(
columnSqltype
)
{
case
Types
.
DATE
:
case
Types
.
TIME
:
case
Types
.
TIMESTAMP
:
this
.
useRawData
[
i
]
=
false
;
break
;
default
:
this
.
useRawData
[
i
]
=
true
;
break
;
}
}
}
}
}
@Override
public
void
startWriteWithConnection
(
RecordReceiver
recordReceiver
,
Connection
connection
)
{
try
{
Record
record
;
while
((
record
=
recordReceiver
.
getFromReader
())
!=
null
)
{
Row
row
=
new
Row
();
List
<
Object
>
values
=
new
ArrayList
<
Object
>();
this
.
prepareColumnTypeValue
(
record
,
values
);
row
.
setColumnValues
(
values
);
try
{
this
.
adb4pgClient
.
addRow
(
row
,
this
.
table
,
this
.
schema
);
}
catch
(
Adb4pgClientException
e
)
{
if
(
101
==
e
.
getCode
())
{
for
(
String
each
:
e
.
getErrData
())
{
Record
dirtyData
=
new
DefaultRecord
();
dirtyData
.
addColumn
(
new
StringColumn
(
each
));
this
.
taskPluginCollector
.
collectDirtyRecord
(
dirtyData
,
e
.
getMessage
());
}
}
else
{
throw
e
;
}
}
}
try
{
this
.
adb4pgClient
.
commit
();
}
catch
(
Adb4pgClientException
e
)
{
if
(
101
==
e
.
getCode
())
{
for
(
String
each
:
e
.
getErrData
())
{
Record
dirtyData
=
new
DefaultRecord
();
dirtyData
.
addColumn
(
new
StringColumn
(
each
));
this
.
taskPluginCollector
.
collectDirtyRecord
(
dirtyData
,
e
.
getMessage
());
}
}
else
{
throw
e
;
}
}
}
catch
(
Exception
e
)
{
throw
DataXException
.
asDataXException
(
DBUtilErrorCode
.
WRITE_DATA_ERROR
,
e
);
}
finally
{
DBUtil
.
closeDBResources
(
null
,
null
,
connection
);
}
return
;
}
private
void
prepareColumnTypeValue
(
Record
record
,
List
<
Object
>
values
)
{
for
(
int
i
=
0
;
i
<
this
.
columns
.
size
();
i
++)
{
Column
column
=
record
.
getColumn
(
i
);
if
(
this
.
useRawData
[
i
])
{
values
.
add
(
column
.
getRawData
());
}
else
{
values
.
add
(
column
.
asString
());
}
}
}
@Override
public
void
closeResource
()
{
try
{
LOG
.
info
(
"stop the adb4pgClient"
);
this
.
adb4pgClient
.
stop
();
}
catch
(
Exception
e
)
{
LOG
.
warn
(
"stop adbClient meet a exception, ignore it: {}"
,
e
.
getMessage
(),
e
);
}
}
}
adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/copy/AdbProxy.java
0 → 100644
View file @
8059bba6
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
adbpgwriter
.
copy
;
import
com.alibaba.datax.common.plugin.RecordReceiver
;
import
java.sql.Connection
;
/**
* Created by yuncheng on 07/15/2019.
*/
public
interface
AdbProxy
{
public
abstract
void
startWriteWithConnection
(
RecordReceiver
recordReceiver
,
Connection
connection
);
public
void
closeResource
();
}
adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Adb4pgUtil.java
0 → 100644
View file @
8059bba6
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
adbpgwriter
.
util
;
import
com.alibaba.cloud.analyticdb.adb4pgclient.Adb4pgClient
;
import
com.alibaba.cloud.analyticdb.adb4pgclient.Adb4pgClientException
;
import
com.alibaba.cloud.analyticdb.adb4pgclient.DatabaseConfig
;
import
com.alibaba.datax.common.element.Column
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.spi.ErrorCode
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.rdbms.util.DBUtil
;
import
com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode
;
import
com.alibaba.datax.plugin.rdbms.util.DataBaseType
;
import
com.alibaba.datax.plugin.rdbms.writer.Constant
;
import
com.alibaba.datax.plugin.rdbms.writer.Key
;
import
com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.sql.Connection
;
import
java.util.*
;
import
static
com
.
alibaba
.
datax
.
plugin
.
rdbms
.
util
.
DBUtilErrorCode
.
COLUMN_SPLIT_ERROR
;
/**
* Created by yuncheng on 07/13/2019.
*/
public
class
Adb4pgUtil
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
Adb4pgUtil
.
class
);
private
static
final
DataBaseType
DATABASE_TYPE
=
DataBaseType
.
PostgreSQL
;
public
static
void
checkConfig
(
Configuration
originalConfig
)
{
try
{
DatabaseConfig
databaseConfig
=
convertConfiguration
(
originalConfig
);
Adb4pgClient
testConfigClient
=
new
Adb4pgClient
(
databaseConfig
);
}
catch
(
Exception
e
)
{
throw
new
Adb4pgClientException
(
Adb4pgClientException
.
CONFIG_ERROR
,
"Check config exception: "
+
e
.
getMessage
(),
null
);
}
}
public
static
DatabaseConfig
convertConfiguration
(
Configuration
originalConfig
)
{
originalConfig
.
getNecessaryValue
(
Key
.
USERNAME
,
COLUMN_SPLIT_ERROR
);
originalConfig
.
getNecessaryValue
(
Key
.
PASSWORD
,
COLUMN_SPLIT_ERROR
);
String
userName
=
originalConfig
.
getString
(
Key
.
USERNAME
);
String
passWord
=
originalConfig
.
getString
(
Key
.
PASSWORD
);
String
tableName
=
originalConfig
.
getString
(
Key
.
TABLE
);
String
schemaName
=
originalConfig
.
getString
(
com
.
alibaba
.
datax
.
plugin
.
writer
.
adbpgwriter
.
util
.
Key
.
SCHEMA
);
String
host
=
originalConfig
.
getString
(
com
.
alibaba
.
datax
.
plugin
.
writer
.
adbpgwriter
.
util
.
Key
.
HOST
);
String
port
=
originalConfig
.
getString
(
com
.
alibaba
.
datax
.
plugin
.
writer
.
adbpgwriter
.
util
.
Key
.
PORT
);
String
databseName
=
originalConfig
.
getString
(
com
.
alibaba
.
datax
.
plugin
.
writer
.
adbpgwriter
.
util
.
Key
.
DATABASE
);
List
<
String
>
columns
=
originalConfig
.
getList
(
Key
.
COLUMN
,
String
.
class
);
DatabaseConfig
databaseConfig
=
new
DatabaseConfig
();
databaseConfig
.
setHost
(
host
);
databaseConfig
.
setPort
(
Integer
.
valueOf
(
port
));
databaseConfig
.
setDatabase
(
databseName
);
databaseConfig
.
setUser
(
userName
);
databaseConfig
.
setPassword
(
passWord
);
databaseConfig
.
setLogger
(
LOG
);
databaseConfig
.
setInsertIgnore
(
originalConfig
.
getBool
(
com
.
alibaba
.
datax
.
plugin
.
writer
.
adbpgwriter
.
util
.
Key
.
IS_INSERTINGORE
,
true
));
databaseConfig
.
addTable
(
Collections
.
singletonList
(
tableName
),
schemaName
);
databaseConfig
.
setColumns
(
columns
,
tableName
,
schemaName
);
return
databaseConfig
;
}
private
static
Map
<
String
,
List
<
String
>>
splitBySchemaName
(
List
<
String
>
tables
)
{
HashMap
<
String
,
List
<
String
>>
res
=
new
HashMap
<
String
,
List
<
String
>>(
16
);
for
(
String
schemaNameTableName:
tables
)
{
String
[]
s
=
schemaNameTableName
.
split
(
"\\."
);
if
(!
res
.
containsKey
(
s
[
0
]))
{
res
.
put
(
s
[
0
],
new
ArrayList
<
String
>());
}
res
.
get
(
s
[
0
]).
add
(
s
[
1
]);
}
return
res
;
}
public
static
Connection
getAdbpgConnect
(
Configuration
conf
)
{
String
userName
=
conf
.
getString
(
Key
.
USERNAME
);
String
passWord
=
conf
.
getString
(
Key
.
PASSWORD
);
return
DBUtil
.
getConnection
(
DataBaseType
.
PostgreSQL
,
generateJdbcUrl
(
conf
),
userName
,
passWord
);
}
private
static
String
generateJdbcUrl
(
Configuration
configuration
)
{
String
host
=
configuration
.
getString
(
com
.
alibaba
.
datax
.
plugin
.
writer
.
adbpgwriter
.
util
.
Key
.
HOST
);
String
port
=
configuration
.
getString
(
com
.
alibaba
.
datax
.
plugin
.
writer
.
adbpgwriter
.
util
.
Key
.
PORT
);
String
databseName
=
configuration
.
getString
(
com
.
alibaba
.
datax
.
plugin
.
writer
.
adbpgwriter
.
util
.
Key
.
DATABASE
);
String
jdbcUrl
=
"jdbc:postgresql://"
+
host
+
":"
+
port
+
"/"
+
databseName
;
return
jdbcUrl
;
}
public
static
void
prepare
(
Configuration
originalConfig
)
{
List
<
String
>
preSqls
=
originalConfig
.
getList
(
Key
.
PRE_SQL
,
String
.
class
);
String
tableName
=
originalConfig
.
getString
(
Key
.
TABLE
);
List
<
String
>
renderedPreSqls
=
WriterUtil
.
renderPreOrPostSqls
(
preSqls
,
tableName
);
if
(
renderedPreSqls
.
size
()
==
0
)
{
return
;
}
originalConfig
.
remove
(
Key
.
PRE_SQL
);
Connection
conn
=
getAdbpgConnect
(
originalConfig
);
WriterUtil
.
executeSqls
(
conn
,
renderedPreSqls
,
generateJdbcUrl
(
originalConfig
),
DATABASE_TYPE
);
DBUtil
.
closeDBResources
(
null
,
null
,
conn
);
}
public
static
void
post
(
Configuration
configuration
)
{
List
<
String
>
postSqls
=
configuration
.
getList
(
Key
.
POST_SQL
,
String
.
class
);
String
tableName
=
configuration
.
getString
(
Key
.
TABLE
);
List
<
String
>
renderedPostSqls
=
WriterUtil
.
renderPreOrPostSqls
(
postSqls
,
tableName
);
if
(
renderedPostSqls
.
size
()
==
0
)
{
return
;
}
configuration
.
remove
(
Key
.
POST_SQL
);
Connection
conn
=
getAdbpgConnect
(
configuration
);
WriterUtil
.
executeSqls
(
conn
,
renderedPostSqls
,
generateJdbcUrl
(
configuration
),
DATABASE_TYPE
);
DBUtil
.
closeDBResources
(
null
,
null
,
conn
);
}
}
adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Constant.java
0 → 100644
View file @
8059bba6
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
adbpgwriter
.
util
;
/**
* Created by yuncheng on 07/13/2019.
*/
public
class
Constant
{
public
static
final
int
DEFAULT_RETRY_TIMES
=
3
;
public
static
final
String
COLUMN_QUOTE_CHARACTER
=
"\""
;
}
adbpgwriter/src/main/java/com/alibaba/datax/plugin/writer/adbpgwriter/util/Key.java
0 → 100644
View file @
8059bba6
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
adbpgwriter
.
util
;
/**
* Created by yuncheng on 07/13/2019.
*/
public
class
Key
{
public
final
static
String
COLUMN
=
"column"
;
public
final
static
String
IS_INSERTINGORE
=
"insertIgnore"
;
public
final
static
String
HOST
=
"host"
;
public
final
static
String
PORT
=
"port"
;
public
final
static
String
DATABASE
=
"database"
;
public
final
static
String
SCHEMA
=
"schema"
;
public
final
static
String
EMPTY_AS_NULL
=
"emptyAsNull"
;
public
final
static
String
IGNORE_INSERT
=
"ignoreInsert"
;
public
final
static
String
RETRY_CONNECTION_TIME
=
"retryTimes"
;
public
final
static
String
RETRY_INTERVAL_TIME
=
"retryIntervalTime"
;
public
final
static
String
COMMIT_SIZE
=
"commitSize"
;
public
final
static
String
PARALLEL_NUMBER
=
"parallelNumber"
;
public
final
static
String
SHARED_DATASOURCE
=
"shareDataSource"
;
}
adbpgwriter/src/main/resources/plugin.json
0 → 100644
View file @
8059bba6
{
"name"
:
"adbpgwriter"
,
"class"
:
"com.alibaba.datax.plugin.writer.adbpgwriter.AdbpgWriter"
,
"description"
:
""
,
"developer"
:
"alibaba"
}
\ No newline at end of file
adbpgwriter/src/main/resources/plugin_job_template.json
0 → 100644
View file @
8059bba6
{
"name"
:
"adbpgwriter"
,
"parameter"
:
{
"username"
:
""
,
"password"
:
""
,
"host"
:
""
,
"port"
:
""
,
"database"
:
""
,
"schema"
:
""
,
"table"
:
""
,
"column"
:
[
"*"
]
}
}
\ No newline at end of file
package.xml
View file @
8059bba6
...
@@ -336,5 +336,12 @@
...
@@ -336,5 +336,12 @@
</includes>
</includes>
<outputDirectory>
datax
</outputDirectory>
<outputDirectory>
datax
</outputDirectory>
</fileSet>
</fileSet>
<fileSet>
<directory>
adbpgwriter/target/datax/
</directory>
<includes>
<include>
**/*.*
</include>
</includes>
<outputDirectory>
datax
</outputDirectory>
</fileSet>
</fileSets>
</fileSets>
</assembly>
</assembly>
pom.xml
View file @
8059bba6
...
@@ -87,7 +87,8 @@
...
@@ -87,7 +87,8 @@
<module>
hbase11xsqlreader
</module>
<module>
hbase11xsqlreader
</module>
<module>
elasticsearchwriter
</module>
<module>
elasticsearchwriter
</module>
<module>
tsdbwriter
</module>
<module>
tsdbwriter
</module>
<module>
adbpgwriter
</module>
<!-- common support module -->
<!-- common support module -->
<module>
plugin-rdbms-util
</module>
<module>
plugin-rdbms-util
</module>
<module>
plugin-unstructured-storage-util
</module>
<module>
plugin-unstructured-storage-util
</module>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment