Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
D
DataX
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
risk-feature
DataX
Commits
1dbd39e0
Commit
1dbd39e0
authored
Sep 02, 2019
by
Liu Jianping
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add gdbwriter plugin
parent
05a000db
Changes
24
Hide whitespace changes
Inline
Side-by-side
Showing
24 changed files
with
2100 additions
and
0 deletions
+2100
-0
gdbwriter.md
gdbwriter/doc/gdbwriter.md
+370
-0
pom.xml
gdbwriter/pom.xml
+103
-0
package.xml
gdbwriter/src/main/assembly/package.xml
+35
-0
GdbWriter.java
.../com/alibaba/datax/plugin/writer/gdbwriter/GdbWriter.java
+251
-0
GdbWriterErrorCode.java
...aba/datax/plugin/writer/gdbwriter/GdbWriterErrorCode.java
+33
-0
Key.java
...n/java/com/alibaba/datax/plugin/writer/gdbwriter/Key.java
+141
-0
GdbGraphManager.java
...datax/plugin/writer/gdbwriter/client/GdbGraphManager.java
+39
-0
GdbWriterConfig.java
...datax/plugin/writer/gdbwriter/client/GdbWriterConfig.java
+41
-0
DefaultGdbMapper.java
...tax/plugin/writer/gdbwriter/mapping/DefaultGdbMapper.java
+190
-0
GdbMapper.java
...baba/datax/plugin/writer/gdbwriter/mapping/GdbMapper.java
+17
-0
MappingRule.java
...ba/datax/plugin/writer/gdbwriter/mapping/MappingRule.java
+41
-0
MappingRuleFactory.java
...x/plugin/writer/gdbwriter/mapping/MappingRuleFactory.java
+181
-0
ValueType.java
...baba/datax/plugin/writer/gdbwriter/mapping/ValueType.java
+71
-0
AbstractGdbGraph.java
...datax/plugin/writer/gdbwriter/model/AbstractGdbGraph.java
+151
-0
GdbEdge.java
.../alibaba/datax/plugin/writer/gdbwriter/model/GdbEdge.java
+20
-0
GdbElement.java
...ibaba/datax/plugin/writer/gdbwriter/model/GdbElement.java
+20
-0
GdbGraph.java
...alibaba/datax/plugin/writer/gdbwriter/model/GdbGraph.java
+20
-0
GdbVertex.java
...libaba/datax/plugin/writer/gdbwriter/model/GdbVertex.java
+17
-0
ScriptGdbGraph.java
...a/datax/plugin/writer/gdbwriter/model/ScriptGdbGraph.java
+196
-0
ConfigHelper.java
...baba/datax/plugin/writer/gdbwriter/util/ConfigHelper.java
+59
-0
GdbDuplicateIdException.java
...plugin/writer/gdbwriter/util/GdbDuplicateIdException.java
+23
-0
plugin.json
gdbwriter/src/main/resources/plugin.json
+6
-0
plugin_job_template.json
gdbwriter/src/main/resources/plugin_job_template.json
+74
-0
pom.xml
pom.xml
+1
-0
No files found.
gdbwriter/doc/gdbwriter.md
0 → 100644
View file @
1dbd39e0
# DataX GDBWriter
## 1 快速介绍
GDBWriter插件实现了写入数据到GDB实例的功能。GDBWriter通过
`Gremlin Client`
连接远程GDB实例,获取Reader的数据,生成写入DSL语句,将数据写入到GDB。
## 2 实现原理
GDBWriter通过DataX框架获取Reader生成的协议数据,使用
`g.addV/E(GDB___label).property(id, GDB___id).property(GDB___PK1, GDB___PV1)...`
语句写入数据到GDB实例。
可以配置
`Gremlin Client`
工作在session模式,由客户端控制事务,在一次事务中实现多个记录的批量写入。
## 3 功能说明
因为GDB中点和边的配置不同,导入时需要区分点和边的配置。
### 3.1 点配置样例
*
这里是一份从内存生成点数据导入GDB实例的配置
```
json
{
"job"
:
{
"setting"
:
{
"speed"
:
{
"channel"
:
1
}
},
"content"
:
[
{
"reader"
:
{
"name"
:
"streamreader"
,
"parameter"
:
{
"column"
:
[
{
"random"
:
"1,100"
,
"type"
:
"double"
},
{
"random"
:
"1000,1200"
,
"type"
:
"long"
},
{
"random"
:
"60,64"
,
"type"
:
"string"
}
],
"sliceRecordCount"
:
1000
}
},
"writer"
:
{
"name"
:
"gdbwriter"
,
"parameter"
:
{
"host"
:
"gdb-endpoint"
,
"port"
:
8182
,
"username"
:
"root"
,
"password"
:
"***"
,
"writeMode"
:
"INSERT"
,
"labelType"
:
"VERTEX"
,
"label"
:
"${1}"
,
"idTransRule"
:
"none"
,
"session"
:
true
,
"maxRecordsInBatch"
:
64
,
"column"
:
[
{
"name"
:
"id"
,
"value"
:
"${0}"
,
"type"
:
"string"
,
"columnType"
:
"primaryKey"
},
{
"name"
:
"vertex_propKey"
,
"value"
:
"${2}"
,
"type"
:
"string"
,
"columnType"
:
"vertexProperty"
}
]
}
}
}
]
}
}
```
### 3.2 边配置样例
*
这里是一份从内存生成边数据导入GDB实例的配置
> **注意**
> 下面配置导入边时,需要提前在GDB实例中写入点,要求分别存在id为`person-{{i}}`和`book-{{i}}`的点,其中i取值0~100。
```
json
{
"job"
:
{
"setting"
:
{
"speed"
:
{
"channel"
:
1
}
},
"content"
:
[
{
"reader"
:
{
"name"
:
"streamreader"
,
"parameter"
:
{
"column"
:
[
{
"random"
:
"100,200"
,
"type"
:
"double"
},
{
"random"
:
"1,100"
,
"type"
:
"long"
},
{
"random"
:
"1,100"
,
"type"
:
"long"
},
{
"random"
:
"2000,2200"
,
"type"
:
"long"
},
{
"random"
:
"60,64"
,
"type"
:
"string"
}
],
"sliceRecordCount"
:
1000
}
},
"writer"
:
{
"name"
:
"gdbwriter"
,
"parameter"
:
{
"host"
:
"gdb-endpoint"
,
"port"
:
8182
,
"username"
:
"root"
,
"password"
:
"***"
,
"writeMode"
:
"INSERT"
,
"labelType"
:
"EDGE"
,
"label"
:
"${3}"
,
"idTransRule"
:
"none"
,
"srcIdTransRule"
:
"labelPrefix"
,
"dstIdTransRule"
:
"labelPrefix"
,
"srcLabel"
:
"person-"
,
"dstLabel"
:
"book-"
,
"session"
:
false
,
"column"
:
[
{
"name"
:
"id"
,
"value"
:
"${0}"
,
"type"
:
"string"
,
"columnType"
:
"primaryKey"
},
{
"name"
:
"id"
,
"value"
:
"${1}"
,
"type"
:
"string"
,
"columnType"
:
"srcPrimaryKey"
},
{
"name"
:
"id"
,
"value"
:
"${2}"
,
"type"
:
"string"
,
"columnType"
:
"dstPrimaryKey"
},
{
"name"
:
"edge_propKey"
,
"value"
:
"${4}"
,
"type"
:
"string"
,
"columnType"
:
"edgeProperty"
}
]
}
}
}
]
}
}
```
### 3.3 参数说明
*
**host**
*
描述:GDB实例连接域名,对应阿里云控制台->"图数据库 GDB"->"实例管理"->"基本信息" 中的"内网地址";
*
必选:是
*
默认值:无
*
**port**
*
描述:GDB实例连接端口
*
必选:是
*
默认值:8182
*
**username**
*
描述:GDB实例账号名
*
必选:是
*
默认值:无
*
**password**
*
描述:图实例账号名对应密码
*
必选:是
*
默认值:无
*
**label**
*
描述:类型名,即点/边名称; label支持从源列中读取,如${0},表示取第一列字段作为label名。源列索引从0开始;
*
必选:是
*
默认值:无
*
**labelType**
*
描述:label类型;
*
枚举值"VERTEX"表示点
*
枚举值"EDGE"表示边
*
必选:是
*
默认值:无
*
**srcLabel**
*
描述:当label为边时,表示起点的点名称;srcLabel支持从源列中读取,如${0},表示取第一列字段作为label名。源列索引从0开始;
*
必选:labelType为边,srcIdTransRule为none时可不填写,否则必填;
*
默认值:无
*
**dstLabel**
*
描述:当label为边时,表示终点的点名称;dstLabel支持从源列中读取,如${0},表示取第一列字段作为label名。源列索引从0开始;
*
必选:labelType为边,dstIdTransRule为none时可不填写,否则必填;
*
默认值:无
*
**writeMode**
*
描述:导入id重复时的处理模式;
*
枚举值"INSERT"表示会报错,错误记录数加1;
*
枚举值"MERGE"表示更新属性值,不计入错误;
*
枚举值"SKIP"表示跳过,不计入错误
*
必选:是
*
默认值:INSERT
*
**idTransRule**
*
描述:主键id转换规则;
*
枚举值"labelPrefix"表示将映射的值转换为{label名}{源字段}
*
枚举值"none"表示映射的值不做转换
*
必选:是
*
默认值:"none"
*
**srcIdTransRule**
*
描述:当label为边时,表示起点的主键id转换规则;
*
枚举值"labelPrefix"表示映射的值转换为为{label名}{源字段}
*
枚举值"none"表示映射的值不做转换,此时srcLabel 可不填写
*
必选:label为边时必选
*
默认值:"none"
*
**dstIdTransRule**
*
描述:当label为边时,表示终点的主键id转换规则;
*
枚举值"labelPrefix"表示映射的值转换为为{label名}{源字段}
*
枚举值"none"表示映射的值不做转换,此时dstLabel 可不填写
*
必选:label为边时必选
*
默认值:"none"
*
**session**
*
描述:是否使用
`Gremlin Client`
的session模式写入数据
*
必选:否
*
默认值:false
*
**maxRecordsInBatch**
*
描述:使用
`Gremlin Client`
的session模式时,一次事务处理的记录数
*
必选:否
*
默认值:16
*
**column**
*
描述:点/边字段映射关系配置
*
必选:是
*
默认值:无
*
**column -> name**
*
描述:点/边映射关系的字段名
*
必选:是
*
默认值:无
*
**column -> value**
*
描述:点/边映射关系的字段值;
*
${N}表示直接映射源端值,N为源端column索引,从0开始;${0}表示映射源端column第1个字段;
*
test-${0} 表示源端值做拼接转换,${0}值前/后可添加固定字符串;
*
${0}-${1}表示做多字段拼接,也可在任意位置添加固定字符串,如test-${0}-test1-${1}-test2
*
必选:是
*
默认值:无
*
**column -> type**
*
描述:点/边映射关系的字段值类型;
*
主键id只支持string类型,GDBWriter插件会强制转换,源id必须保证可转换为string;
*
普通属性支持类型:int, long, float, double, boolean, string
*
必选:是
*
默认值:无
*
**column -> columnType**
*
描述:点/边映射关系字段对应到GDB点/边数据的类型,支持以下几类枚举值:
*
公共枚举值:
*
primaryKey:表示该字段是主键id
*
点枚举值:
*
vertexProperty:labelType为点时,表示该字段是点的普通属性
*
vertexJsonProperty:labelType为点时,表示是点json属性,value结构请见备注
**json properties示例**
,点配置最多只允许出现一个json属性;
*
边枚举值:
*
srcPrimaryKey:labelType为边时,表示该字段是起点主键id
*
dstPrimaryKey:labelType为边时,表示该字段是终点主键id
*
edgeProperty:labelType为边时,表示该字段是边的普通属性
*
edgeJsonProperty:labelType为边时,表示是边json属性,value结构请见备注
**json properties示例**
,边配置最多只允许出现一个json属性;
*
必选:是
*
默认值:无
*
备注:
**json properties示例**
> ```json
> {"properties":[
> {"k":"name","t":"string","v":"tom"},
> {"k":"age","t":"int","v":"20"},
> {"k":"sex","t":"string","v":"male"}
> ]}
> ```
## 4 性能报告
### 4.1 环境参数
GDB实例规格
-
16core 128GB, 1TB SSD
DataX压测机器
-
cpu: 4
*
Intel(R) Xeon(R) Platinum 8163 CPU @ 2.50GHz
-
mem: 16GB
-
net: 千兆双网卡
-
os: CentOS 7, 3.10.0-957.5.1.el7.x86_64
-
jvm: -Xms4g -Xmx4g
### 4.2 数据特征
```
{
id: random double(1~10000)
from: random long(1~40000000)
to: random long(1~40000000)
label: random long(20000000 ~ 20005000)
propertyKey: random string(len: 120~128)
propertyName: random string(len: 120~128)
}
```
-
点/边都有一个属性,属性key和value都是长度120~128字节的随机字符串
-
label是范围20000000 ~ 20005000的随机整数转换的字符串
-
id是浮点数转换的字符串,防止重复
-
边包含关联起点和终点,测试边时已经提前导入twitter数据集的点数据(4200W)
### 4.3 任务配置
分点和边的配置,具体配置与上述的示例配置相似,下面列出关键的差异点
-
增加并发任务数量
> "channel": 32
-
使用session模式
> "session": true
-
增加事务批量处理记录个数
> "maxRecordsInBatch": 128
### 4.4 测试结果
点导入性能:
-
任务平均流量: 4.07MB/s
-
任务总计耗时: 412s
-
记录写入速度: 15609rec/s
-
读出记录总数: 6400000
边导入性能:
-
任务平均流量: 2.76MB/s
-
任务总计耗时: 1602s
-
记录写入速度: 10000rec/s
-
读出记录总数: 16000000
## 5 约束限制
-
导入边记录前要求GDB中已经存在边关联的起点/终点
-
GDBWriter插件与用户查询DSL使用相同的GDB实例端口,导入时可能会影响查询性能
## FAQ
无
gdbwriter/pom.xml
0 → 100644
View file @
1dbd39e0
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<parent>
<artifactId>
datax-all
</artifactId>
<groupId>
com.alibaba.datax
</groupId>
<version>
0.0.1-SNAPSHOT
</version>
</parent>
<artifactId>
gdbwriter
</artifactId>
<name>
gdbwriter
</name>
<packaging>
jar
</packaging>
<properties>
<maven.compiler.target>
1.8
</maven.compiler.target>
<maven.compiler.source>
1.8
</maven.compiler.source>
<gremlin.version>
3.4.1
</gremlin.version>
</properties>
<dependencies>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-common
</artifactId>
<version>
${datax-project-version}
</version>
<exclusions>
<exclusion>
<artifactId>
slf4j-log4j12
</artifactId>
<groupId>
org.slf4j
</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-core
</artifactId>
<version>
${datax-project-version}
</version>
<scope>
test
</scope>
<exclusions>
<exclusion>
<artifactId>
slf4j-log4j12
</artifactId>
<groupId>
org.slf4j
</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
</dependency>
<dependency>
<groupId>
ch.qos.logback
</groupId>
<artifactId>
logback-classic
</artifactId>
</dependency>
<dependency>
<groupId>
org.apache.tinkerpop
</groupId>
<artifactId>
gremlin-driver
</artifactId>
<version>
${gremlin.version}
</version>
</dependency>
<dependency>
<groupId>
org.projectlombok
</groupId>
<artifactId>
lombok
</artifactId>
<version>
1.18.8
</version>
</dependency>
<dependency>
<groupId>
com.github.ben-manes.caffeine
</groupId>
<artifactId>
caffeine
</artifactId>
<version>
2.4.0
</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>
maven-compiler-plugin
</artifactId>
<configuration>
<source>
1.8
</source>
<target>
1.8
</target>
<encoding>
${project-sourceEncoding}
</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>
maven-assembly-plugin
</artifactId>
<configuration>
<descriptors>
<descriptor>
src/main/assembly/package.xml
</descriptor>
</descriptors>
<finalName>
datax
</finalName>
</configuration>
<executions>
<execution>
<id>
dwzip
</id>
<phase>
package
</phase>
<goals>
<goal>
single
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
gdbwriter/src/main/assembly/package.xml
0 → 100644
View file @
1dbd39e0
<assembly
xmlns=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"
>
<id></id>
<formats>
<format>
dir
</format>
</formats>
<includeBaseDirectory>
false
</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>
src/main/resources
</directory>
<includes>
<include>
plugin.json
</include>
<include>
plugin_job_template.json
</include>
</includes>
<outputDirectory>
plugin/writer/gdbwriter
</outputDirectory>
</fileSet>
<fileSet>
<directory>
target/
</directory>
<includes>
<include>
gdbwriter-0.0.1-SNAPSHOT.jar
</include>
</includes>
<outputDirectory>
plugin/writer/gdbwriter
</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>
false
</useProjectArtifact>
<outputDirectory>
plugin/writer/gdbwriter/libs
</outputDirectory>
<scope>
runtime
</scope>
</dependencySet>
</dependencySets>
</assembly>
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriter.java
0 → 100644
View file @
1dbd39e0
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.concurrent.*
;
import
java.util.function.Function
;
import
com.alibaba.datax.common.element.Record
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.plugin.RecordReceiver
;
import
com.alibaba.datax.common.plugin.TaskPluginCollector
;
import
com.alibaba.datax.common.spi.Writer
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.writer.gdbwriter.client.GdbGraphManager
;
import
com.alibaba.datax.plugin.writer.gdbwriter.client.GdbWriterConfig
;
import
com.alibaba.datax.plugin.writer.gdbwriter.mapping.DefaultGdbMapper
;
import
com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRule
;
import
com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRuleFactory
;
import
com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement
;
import
com.alibaba.datax.plugin.writer.gdbwriter.model.GdbGraph
;
import
groovy.lang.Tuple2
;
import
io.netty.util.concurrent.DefaultThreadFactory
;
import
lombok.extern.slf4j.Slf4j
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
public
class
GdbWriter
extends
Writer
{
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
GdbWriter
.
class
);
private
static
Function
<
Record
,
GdbElement
>
mapper
=
null
;
private
static
GdbGraph
globalGraph
=
null
;
private
static
boolean
session
=
false
;
/**
* Job 中的方法仅执行一次,Task 中方法会由框架启动多个 Task 线程并行执行。
* <p/>
* 整个 Writer 执行流程是:
* <pre>
* Job类init-->prepare-->split
*
* Task类init-->prepare-->startWrite-->post-->destroy
* Task类init-->prepare-->startWrite-->post-->destroy
*
* Job类post-->destroy
* </pre>
*/
public
static
class
Job
extends
Writer
.
Job
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
Job
.
class
);
private
Configuration
jobConfig
=
null
;
@Override
public
void
init
()
{
LOG
.
info
(
"GDB datax plugin writer job init begin ..."
);
this
.
jobConfig
=
getPluginJobConf
();
GdbWriterConfig
.
of
(
this
.
jobConfig
);
LOG
.
info
(
"GDB datax plugin writer job init end."
);
/**
* 注意:此方法仅执行一次。
* 最佳实践:通常在这里对用户的配置进行校验:是否缺失必填项?有无错误值?有没有无关配置项?...
* 并给出清晰的报错/警告提示。校验通常建议采用静态工具类进行,以保证本类结构清晰。
*/
}
@Override
public
void
prepare
()
{
/**
* 注意:此方法仅执行一次。
* 最佳实践:如果 Job 中有需要进行数据同步之前的处理,可以在此处完成,如果没有必要则可以直接去掉。
*/
super
.
prepare
();
MappingRule
rule
=
MappingRuleFactory
.
getInstance
().
createV2
(
jobConfig
);
mapper
=
new
DefaultGdbMapper
().
getMapper
(
rule
);
session
=
jobConfig
.
getBool
(
Key
.
SESSION_STATE
,
false
);
/**
* client connect check before task
*/
try
{
globalGraph
=
GdbGraphManager
.
instance
().
getGraph
(
jobConfig
,
false
);
}
catch
(
RuntimeException
e
)
{
throw
DataXException
.
asDataXException
(
GdbWriterErrorCode
.
FAIL_CLIENT_CONNECT
,
e
.
getMessage
());
}
}
@Override
public
List
<
Configuration
>
split
(
int
mandatoryNumber
)
{
/**
* 注意:此方法仅执行一次。
* 最佳实践:通常采用工具静态类完成把 Job 配置切分成多个 Task 配置的工作。
* 这里的 mandatoryNumber 是强制必须切分的份数。
*/
LOG
.
info
(
"split begin..."
);
List
<
Configuration
>
configurationList
=
new
ArrayList
<
Configuration
>();
for
(
int
i
=
0
;
i
<
mandatoryNumber
;
i
++)
{
configurationList
.
add
(
this
.
jobConfig
.
clone
());
}
LOG
.
info
(
"split end..."
);
return
configurationList
;
}
@Override
public
void
post
()
{
/**
* 注意:此方法仅执行一次。
* 最佳实践:如果 Job 中有需要进行数据同步之后的后续处理,可以在此处完成。
*/
globalGraph
.
close
();
}
@Override
public
void
destroy
()
{
/**
* 注意:此方法仅执行一次。
* 最佳实践:通常配合 Job 中的 post() 方法一起完成 Job 的资源释放。
*/
}
}
@Slf4j
public
static
class
Task
extends
Writer
.
Task
{
private
Configuration
taskConfig
;
private
int
failed
=
0
;
private
int
batchRecords
;
private
ExecutorService
submitService
=
null
;
private
GdbGraph
graph
;
@Override
public
void
init
()
{
/**
* 注意:此方法每个 Task 都会执行一次。
* 最佳实践:此处通过对 taskConfig 配置的读取,进而初始化一些资源为 startWrite()做准备。
*/
this
.
taskConfig
=
super
.
getPluginJobConf
();
batchRecords
=
taskConfig
.
getInt
(
Key
.
MAX_RECORDS_IN_BATCH
,
GdbWriterConfig
.
DEFAULT_RECORD_NUM_IN_BATCH
);
submitService
=
new
ThreadPoolExecutor
(
1
,
1
,
0L
,
TimeUnit
.
MILLISECONDS
,
new
LinkedBlockingDeque
<>(),
new
DefaultThreadFactory
(
"submit-dsl"
));
if
(!
session
)
{
graph
=
globalGraph
;
}
else
{
/**
* 分批创建session client,由于服务端groovy编译性能的限制
*/
try
{
Thread
.
sleep
((
getTaskId
()/
10
)*
10000
);
}
catch
(
Exception
e
)
{
// ...
}
graph
=
GdbGraphManager
.
instance
().
getGraph
(
taskConfig
,
session
);
}
}
@Override
public
void
prepare
()
{
/**
* 注意:此方法每个 Task 都会执行一次。
* 最佳实践:如果 Task 中有需要进行数据同步之前的处理,可以在此处完成,如果没有必要则可以直接去掉。
*/
super
.
prepare
();
}
@Override
public
void
startWrite
(
RecordReceiver
recordReceiver
)
{
/**
* 注意:此方法每个 Task 都会执行一次。
* 最佳实践:此处适当封装确保简洁清晰完成数据写入工作。
*/
Record
r
;
Future
<
Boolean
>
future
=
null
;
List
<
Tuple2
<
Record
,
GdbElement
>>
records
=
new
ArrayList
<>(
batchRecords
);
while
((
r
=
recordReceiver
.
getFromReader
())
!=
null
)
{
records
.
add
(
new
Tuple2
<>(
r
,
mapper
.
apply
(
r
)));
if
(
records
.
size
()
>=
batchRecords
)
{
wait4Submit
(
future
);
final
List
<
Tuple2
<
Record
,
GdbElement
>>
batch
=
records
;
future
=
submitService
.
submit
(()
->
batchCommitRecords
(
batch
));
records
=
new
ArrayList
<>(
batchRecords
);
}
}
wait4Submit
(
future
);
if
(!
records
.
isEmpty
())
{
final
List
<
Tuple2
<
Record
,
GdbElement
>>
batch
=
records
;
future
=
submitService
.
submit
(()
->
batchCommitRecords
(
batch
));
wait4Submit
(
future
);
}
}
private
void
wait4Submit
(
Future
<
Boolean
>
future
)
{
if
(
future
==
null
)
{
return
;
}
try
{
future
.
get
();
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
private
boolean
batchCommitRecords
(
final
List
<
Tuple2
<
Record
,
GdbElement
>>
records
)
{
TaskPluginCollector
collector
=
getTaskPluginCollector
();
try
{
List
<
Tuple2
<
Record
,
Exception
>>
errors
=
graph
.
add
(
records
);
errors
.
forEach
(
t
->
collector
.
collectDirtyRecord
(
t
.
getFirst
(),
t
.
getSecond
()));
failed
+=
errors
.
size
();
}
catch
(
Exception
e
)
{
records
.
forEach
(
t
->
collector
.
collectDirtyRecord
(
t
.
getFirst
(),
e
));
failed
+=
records
.
size
();
}
records
.
clear
();
return
true
;
}
@Override
public
void
post
()
{
/**
* 注意:此方法每个 Task 都会执行一次。
* 最佳实践:如果 Task 中有需要进行数据同步之后的后续处理,可以在此处完成。
*/
log
.
info
(
"Task done, dirty record count - {}"
,
failed
);
}
@Override
public
void
destroy
()
{
/**
* 注意:此方法每个 Task 都会执行一次。
* 最佳实践:通常配合Task 中的 post() 方法一起完成 Task 的资源释放。
*/
if
(
session
)
{
graph
.
close
();
}
submitService
.
shutdown
();
}
}
}
\ No newline at end of file
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriterErrorCode.java
0 → 100644
View file @
1dbd39e0
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
;
import
com.alibaba.datax.common.spi.ErrorCode
;
public
enum
GdbWriterErrorCode
implements
ErrorCode
{
BAD_CONFIG_VALUE
(
"GdbWriter-00"
,
"您配置的值不合法."
),
CONFIG_ITEM_MISS
(
"GdbWriter-01"
,
"您配置项缺失."
),
FAIL_CLIENT_CONNECT
(
"GdbWriter-02"
,
"GDB连接异常."
),;
private
final
String
code
;
private
final
String
description
;
private
GdbWriterErrorCode
(
String
code
,
String
description
)
{
this
.
code
=
code
;
this
.
description
=
description
;
}
@Override
public
String
getCode
()
{
return
this
.
code
;
}
@Override
public
String
getDescription
()
{
return
this
.
description
;
}
@Override
public
String
toString
()
{
return
String
.
format
(
"Code:[%s], Description:[%s]. "
,
this
.
code
,
this
.
description
);
}
}
\ No newline at end of file
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/Key.java
0 → 100644
View file @
1dbd39e0
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
;
public
final
class
Key
{
/**
* 此处声明插件用到的需要插件使用者提供的配置项
*/
public
final
static
String
HOST
=
"host"
;
public
final
static
String
PORT
=
"port"
;
public
final
static
String
USERNAME
=
"username"
;
public
static
final
String
PASSWORD
=
"password"
;
/**
* import type and mode
*/
public
static
final
String
IMPORT_TYPE
=
"labelType"
;
public
static
final
String
UPDATE_MODE
=
"writeMode"
;
/**
* label prefix issue
*/
public
static
final
String
ID_TRANS_RULE
=
"idTransRule"
;
public
static
final
String
SRC_ID_TRANS_RULE
=
"srcIdTransRule"
;
public
static
final
String
DST_ID_TRANS_RULE
=
"dstIdTransRule"
;
public
static
final
String
LABEL
=
"label"
;
public
static
final
String
SRC_LABEL
=
"srcLabel"
;
public
static
final
String
DST_LABEL
=
"dstLabel"
;
public
static
final
String
MAPPING
=
"mapping"
;
/**
* column define in Gdb
*/
public
static
final
String
COLUMN
=
"column"
;
public
static
final
String
COLUMN_NAME
=
"name"
;
public
static
final
String
COLUMN_VALUE
=
"value"
;
public
static
final
String
COLUMN_TYPE
=
"type"
;
public
static
final
String
COLUMN_NODE_TYPE
=
"columnType"
;
/**
* Gdb Vertex/Edge elements
*/
public
static
final
String
ID
=
"id"
;
public
static
final
String
FROM
=
"from"
;
public
static
final
String
TO
=
"to"
;
public
static
final
String
PROPERTIES
=
"properties"
;
public
static
final
String
PROP_KEY
=
"name"
;
public
static
final
String
PROP_VALUE
=
"value"
;
public
static
final
String
PROP_TYPE
=
"type"
;
public
static
final
String
PROPERTIES_JSON_STR
=
"propertiesJsonStr"
;
public
static
final
String
MAX_PROPERTIES_BATCH_NUM
=
"maxPropertiesBatchNumber"
;
/**
* session less client configure for connect pool
*/
public
static
final
String
MAX_IN_PROCESS_PER_CONNECTION
=
"maxInProcessPerConnection"
;
public
static
final
String
MAX_CONNECTION_POOL_SIZE
=
"maxConnectionPoolSize"
;
public
static
final
String
MAX_SIMULTANEOUS_USAGE_PER_CONNECTION
=
"maxSimultaneousUsagePerConnection"
;
public
static
final
String
MAX_RECORDS_IN_BATCH
=
"maxRecordsInBatch"
;
public
static
final
String
SESSION_STATE
=
"session"
;
public
static
enum
ImportType
{
/**
* Import vertices
*/
VERTEX
,
/**
* Import edges
*/
EDGE
;
}
public
static
enum
UpdateMode
{
/**
* Insert new records, fail if exists
*/
INSERT
,
/**
* Skip this record if exists
*/
SKIP
,
/**
* Update property of this record if exists
*/
MERGE
;
}
public
static
enum
ColumnType
{
/**
* vertex or edge id
*/
primaryKey
,
/**
* vertex property
*/
vertexProperty
,
/**
* start vertex id of edge
*/
srcPrimaryKey
,
/**
* end vertex id of edge
*/
dstPrimaryKey
,
/**
* edge property
*/
edgeProperty
,
/**
* vertex json style property
*/
vertexJsonProperty
,
/**
* edge json style property
*/
edgeJsonProperty
}
public
static
enum
IdTransRule
{
/**
* vertex or edge id with 'label' prefix
*/
labelPrefix
,
/**
* vertex or edge id raw
*/
none
}
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbGraphManager.java
0 → 100644
View file @
1dbd39e0
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
client
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.writer.gdbwriter.model.GdbGraph
;
import
com.alibaba.datax.plugin.writer.gdbwriter.model.ScriptGdbGraph
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* @author jerrywang
*
*/
public
class
GdbGraphManager
implements
AutoCloseable
{
private
static
final
GdbGraphManager
instance
=
new
GdbGraphManager
();
private
List
<
GdbGraph
>
graphs
=
new
ArrayList
<>();
public
static
GdbGraphManager
instance
()
{
return
instance
;
}
public
GdbGraph
getGraph
(
Configuration
config
,
boolean
session
)
{
GdbGraph
graph
=
new
ScriptGdbGraph
(
config
,
session
);
graphs
.
add
(
graph
);
return
graph
;
}
@Override
public
void
close
()
{
for
(
GdbGraph
graph
:
graphs
)
{
graph
.
close
();
}
graphs
.
clear
();
}
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbWriterConfig.java
0 → 100644
View file @
1dbd39e0
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
client
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.writer.gdbwriter.Key
;
import
static
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
util
.
ConfigHelper
.*;
/**
* @author jerrywang
*
*/
public
class
GdbWriterConfig
{
public
static
final
int
DEFAULT_MAX_IN_PROCESS_PER_CONNECTION
=
4
;
public
static
final
int
DEFAULT_MAX_CONNECTION_POOL_SIZE
=
8
;
public
static
final
int
DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION
=
8
;
public
static
final
int
DEFAULT_BATCH_PROPERTY_NUM
=
30
;
public
static
final
int
DEFAULT_RECORD_NUM_IN_BATCH
=
16
;
private
Configuration
config
;
private
GdbWriterConfig
(
Configuration
config
)
{
this
.
config
=
config
;
validate
();
}
private
void
validate
()
{
assertHasContent
(
config
,
Key
.
HOST
);
assertConfig
(
Key
.
PORT
,
()
->
config
.
getInt
(
Key
.
PORT
)
>
0
);
assertHasContent
(
config
,
Key
.
USERNAME
);
assertHasContent
(
config
,
Key
.
PASSWORD
);
}
public
static
GdbWriterConfig
of
(
Configuration
config
)
{
return
new
GdbWriterConfig
(
config
);
}
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/DefaultGdbMapper.java
0 → 100644
View file @
1dbd39e0
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
mapping
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.UUID
;
import
java.util.function.BiConsumer
;
import
java.util.function.Function
;
import
java.util.regex.Matcher
;
import
java.util.regex.Pattern
;
import
com.alibaba.datax.common.element.Record
;
import
com.alibaba.fastjson.JSONArray
;
import
com.alibaba.fastjson.JSONObject
;
import
com.alibaba.datax.plugin.writer.gdbwriter.Key
;
import
com.alibaba.datax.plugin.writer.gdbwriter.model.GdbEdge
;
import
com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement
;
import
com.alibaba.datax.plugin.writer.gdbwriter.model.GdbVertex
;
import
lombok.extern.slf4j.Slf4j
;
import
static
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
Key
.
ImportType
.
VERTEX
;
/**
* @author jerrywang
*
*/
@Slf4j
public
class
DefaultGdbMapper
implements
GdbMapper
{
private
static
final
Pattern
STR_PATTERN
=
Pattern
.
compile
(
"\\$\\{(\\d+)}"
);
private
static
final
Pattern
NORMAL_PATTERN
=
Pattern
.
compile
(
"^\\$\\{(\\d+)}$"
);
@Override
public
Function
<
Record
,
GdbElement
>
getMapper
(
MappingRule
rule
)
{
return
r
->
{
GdbElement
e
=
(
rule
.
getImportType
()
==
VERTEX
)
?
new
GdbVertex
()
:
new
GdbEdge
();
forElement
(
rule
).
accept
(
r
,
e
);
return
e
;
};
}
private
static
BiConsumer
<
Record
,
GdbElement
>
forElement
(
MappingRule
rule
)
{
List
<
BiConsumer
<
Record
,
GdbElement
>>
properties
=
new
ArrayList
<>();
for
(
MappingRule
.
PropertyMappingRule
propRule
:
rule
.
getProperties
())
{
Function
<
Record
,
String
>
keyFunc
=
forStrColumn
(
propRule
.
getKey
());
if
(
propRule
.
getValueType
()
==
ValueType
.
STRING
)
{
final
Function
<
Record
,
String
>
valueFunc
=
forStrColumn
(
propRule
.
getValue
());
properties
.
add
((
r
,
e
)
->
{
String
k
=
keyFunc
.
apply
(
r
);
String
v
=
valueFunc
.
apply
(
r
);
if
(
k
!=
null
&&
v
!=
null
)
{
e
.
getProperties
().
put
(
k
,
v
);
}
});
}
else
{
final
Function
<
Record
,
Object
>
valueFunc
=
forObjColumn
(
propRule
.
getValue
(),
propRule
.
getValueType
());
properties
.
add
((
r
,
e
)
->
{
String
k
=
keyFunc
.
apply
(
r
);
Object
v
=
valueFunc
.
apply
(
r
);
if
(
k
!=
null
&&
v
!=
null
)
{
e
.
getProperties
().
put
(
k
,
v
);
}
});
}
}
if
(
rule
.
getPropertiesJsonStr
()
!=
null
)
{
Function
<
Record
,
String
>
jsonFunc
=
forStrColumn
(
rule
.
getPropertiesJsonStr
());
properties
.
add
((
r
,
e
)
->
{
String
propertiesStr
=
jsonFunc
.
apply
(
r
);
JSONObject
root
=
(
JSONObject
)
JSONObject
.
parse
(
propertiesStr
);
JSONArray
propertiesList
=
root
.
getJSONArray
(
"properties"
);
for
(
Object
object
:
propertiesList
)
{
JSONObject
jsonObject
=
(
JSONObject
)
object
;
String
key
=
jsonObject
.
getString
(
"k"
);
String
name
=
jsonObject
.
getString
(
"v"
);
String
type
=
jsonObject
.
getString
(
"t"
);
if
(
key
==
null
||
name
==
null
)
{
continue
;
}
addToProperties
(
e
,
key
,
name
,
type
);
}
});
}
BiConsumer
<
Record
,
GdbElement
>
ret
=
(
r
,
e
)
->
{
String
label
=
forStrColumn
(
rule
.
getLabel
()).
apply
(
r
);
String
id
=
forStrColumn
(
rule
.
getId
()).
apply
(
r
);
if
(
rule
.
getImportType
()
==
Key
.
ImportType
.
EDGE
)
{
String
to
=
forStrColumn
(
rule
.
getTo
()).
apply
(
r
);
String
from
=
forStrColumn
(
rule
.
getFrom
()).
apply
(
r
);
if
(
to
==
null
||
from
==
null
)
{
log
.
error
(
"invalid record to: {} , from: {}"
,
to
,
from
);
throw
new
IllegalArgumentException
(
"to or from missed in edge"
);
}
((
GdbEdge
)
e
).
setTo
(
to
);
((
GdbEdge
)
e
).
setFrom
(
from
);
// generate UUID for edge
if
(
id
==
null
)
{
id
=
UUID
.
randomUUID
().
toString
();
}
}
if
(
id
==
null
||
label
==
null
)
{
log
.
error
(
"invalid record id: {} , label: {}"
,
id
,
label
);
throw
new
IllegalArgumentException
(
"id or label missed"
);
}
e
.
setId
(
id
);
e
.
setLabel
(
label
);
properties
.
forEach
(
p
->
p
.
accept
(
r
,
e
));
};
return
ret
;
}
static
Function
<
Record
,
Object
>
forObjColumn
(
String
rule
,
ValueType
type
)
{
Matcher
m
=
NORMAL_PATTERN
.
matcher
(
rule
);
if
(
m
.
matches
())
{
int
index
=
Integer
.
valueOf
(
m
.
group
(
1
));
return
r
->
type
.
applyColumn
(
r
.
getColumn
(
index
));
}
else
{
return
r
->
type
.
fromStrFunc
(
rule
);
}
}
static
Function
<
Record
,
String
>
forStrColumn
(
String
rule
)
{
List
<
BiConsumer
<
StringBuilder
,
Record
>>
list
=
new
ArrayList
<>();
Matcher
m
=
STR_PATTERN
.
matcher
(
rule
);
int
last
=
0
;
while
(
m
.
find
())
{
String
index
=
m
.
group
(
1
);
// as simple integer index.
int
i
=
Integer
.
parseInt
(
index
);
final
int
tmp
=
last
;
final
int
start
=
m
.
start
();
list
.
add
((
sb
,
record
)
->
{
sb
.
append
(
rule
.
subSequence
(
tmp
,
start
));
if
(
record
.
getColumn
(
i
)
!=
null
&&
record
.
getColumn
(
i
).
getByteSize
()
>
0
)
{
sb
.
append
(
record
.
getColumn
(
i
).
asString
());
}
});
last
=
m
.
end
();
}
final
int
tmp
=
last
;
list
.
add
((
sb
,
record
)
->
{
sb
.
append
(
rule
.
subSequence
(
tmp
,
rule
.
length
()));
});
return
r
->
{
StringBuilder
sb
=
new
StringBuilder
();
list
.
forEach
(
c
->
c
.
accept
(
sb
,
r
));
String
res
=
sb
.
toString
();
return
res
.
isEmpty
()
?
null
:
res
;
};
}
static
boolean
addToProperties
(
GdbElement
e
,
String
key
,
String
value
,
String
type
)
{
ValueType
valueType
=
ValueType
.
fromShortName
(
type
);
if
(
valueType
==
ValueType
.
STRING
)
{
e
.
getProperties
().
put
(
key
,
value
);
}
else
if
(
valueType
==
ValueType
.
INT
)
{
e
.
getProperties
().
put
(
key
,
Integer
.
valueOf
(
value
));
}
else
if
(
valueType
==
ValueType
.
LONG
)
{
e
.
getProperties
().
put
(
key
,
Long
.
valueOf
(
value
));
}
else
if
(
valueType
==
ValueType
.
DOUBLE
)
{
e
.
getProperties
().
put
(
key
,
Double
.
valueOf
(
value
));
}
else
if
(
valueType
==
ValueType
.
FLOAT
)
{
e
.
getProperties
().
put
(
key
,
Float
.
valueOf
(
value
));
}
else
if
(
valueType
==
ValueType
.
BOOLEAN
)
{
e
.
getProperties
().
put
(
key
,
Boolean
.
valueOf
(
value
));
}
else
{
log
.
error
(
"invalid property key {}, value {}, type {}"
,
key
,
value
,
type
);
return
false
;
}
return
true
;
}
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/GdbMapper.java
0 → 100644
View file @
1dbd39e0
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
mapping
;
import
java.util.function.Function
;
import
com.alibaba.datax.common.element.Record
;
import
com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement
;
/**
* @author jerrywang
*
*/
public
interface
GdbMapper
{
Function
<
Record
,
GdbElement
>
getMapper
(
MappingRule
rule
);
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRule.java
0 → 100644
View file @
1dbd39e0
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
mapping
;
import
java.util.ArrayList
;
import
java.util.List
;
import
com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType
;
import
lombok.Data
;
/**
* @author jerrywang
*
*/
@Data
public
class
MappingRule
{
private
String
id
=
null
;
private
String
label
=
null
;
private
ImportType
importType
=
null
;
private
String
from
=
null
;
private
String
to
=
null
;
private
List
<
PropertyMappingRule
>
properties
=
new
ArrayList
<>();
private
String
propertiesJsonStr
=
null
;
@Data
public
static
class
PropertyMappingRule
{
private
String
key
=
null
;
private
String
value
=
null
;
private
ValueType
valueType
=
null
;
}
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRuleFactory.java
0 → 100644
View file @
1dbd39e0
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
mapping
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.writer.gdbwriter.GdbWriterErrorCode
;
import
com.alibaba.datax.plugin.writer.gdbwriter.Key
;
import
com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType
;
import
com.alibaba.datax.plugin.writer.gdbwriter.Key.IdTransRule
;
import
com.alibaba.datax.plugin.writer.gdbwriter.Key.ColumnType
;
import
com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRule.PropertyMappingRule
;
import
com.alibaba.datax.plugin.writer.gdbwriter.util.ConfigHelper
;
import
lombok.extern.slf4j.Slf4j
;
import
java.util.List
;
/**
* @author jerrywang
*
*/
@Slf4j
public
class
MappingRuleFactory
{
private
static
final
MappingRuleFactory
instance
=
new
MappingRuleFactory
();
public
static
final
MappingRuleFactory
getInstance
()
{
return
instance
;
}
@Deprecated
public
MappingRule
create
(
Configuration
config
,
ImportType
type
)
{
MappingRule
rule
=
new
MappingRule
();
rule
.
setId
(
config
.
getString
(
Key
.
ID
));
rule
.
setLabel
(
config
.
getString
(
Key
.
LABEL
));
if
(
type
==
ImportType
.
EDGE
)
{
rule
.
setFrom
(
config
.
getString
(
Key
.
FROM
));
rule
.
setTo
(
config
.
getString
(
Key
.
TO
));
}
rule
.
setImportType
(
type
);
List
<
Configuration
>
configurations
=
config
.
getListConfiguration
(
Key
.
PROPERTIES
);
if
(
configurations
!=
null
)
{
for
(
Configuration
prop
:
config
.
getListConfiguration
(
Key
.
PROPERTIES
))
{
PropertyMappingRule
propRule
=
new
PropertyMappingRule
();
propRule
.
setKey
(
prop
.
getString
(
Key
.
PROP_KEY
));
propRule
.
setValue
(
prop
.
getString
(
Key
.
PROP_VALUE
));
propRule
.
setValueType
(
ValueType
.
fromShortName
(
prop
.
getString
(
Key
.
PROP_TYPE
).
toLowerCase
()));
rule
.
getProperties
().
add
(
propRule
);
}
}
String
propertiesJsonStr
=
config
.
getString
(
Key
.
PROPERTIES_JSON_STR
,
null
);
if
(
propertiesJsonStr
!=
null
)
{
rule
.
setPropertiesJsonStr
(
propertiesJsonStr
);
}
return
rule
;
}
public
MappingRule
createV2
(
Configuration
config
)
{
try
{
ImportType
type
=
ImportType
.
valueOf
(
config
.
getString
(
Key
.
IMPORT_TYPE
));
return
createV2
(
config
,
type
);
}
catch
(
NullPointerException
e
)
{
throw
DataXException
.
asDataXException
(
GdbWriterErrorCode
.
CONFIG_ITEM_MISS
,
Key
.
IMPORT_TYPE
);
}
catch
(
IllegalArgumentException
e
)
{
throw
DataXException
.
asDataXException
(
GdbWriterErrorCode
.
BAD_CONFIG_VALUE
,
Key
.
IMPORT_TYPE
);
}
}
public
MappingRule
createV2
(
Configuration
config
,
ImportType
type
)
{
MappingRule
rule
=
new
MappingRule
();
ConfigHelper
.
assertHasContent
(
config
,
Key
.
LABEL
);
rule
.
setLabel
(
config
.
getString
(
Key
.
LABEL
));
rule
.
setImportType
(
type
);
IdTransRule
srcTransRule
=
IdTransRule
.
none
;
IdTransRule
dstTransRule
=
IdTransRule
.
none
;
if
(
type
==
ImportType
.
EDGE
)
{
ConfigHelper
.
assertHasContent
(
config
,
Key
.
SRC_ID_TRANS_RULE
);
ConfigHelper
.
assertHasContent
(
config
,
Key
.
DST_ID_TRANS_RULE
);
srcTransRule
=
IdTransRule
.
valueOf
(
config
.
getString
(
Key
.
SRC_ID_TRANS_RULE
));
dstTransRule
=
IdTransRule
.
valueOf
(
config
.
getString
(
Key
.
DST_ID_TRANS_RULE
));
if
(
srcTransRule
==
IdTransRule
.
labelPrefix
)
{
ConfigHelper
.
assertHasContent
(
config
,
Key
.
SRC_LABEL
);
}
if
(
dstTransRule
==
IdTransRule
.
labelPrefix
)
{
ConfigHelper
.
assertHasContent
(
config
,
Key
.
DST_LABEL
);
}
}
ConfigHelper
.
assertHasContent
(
config
,
Key
.
ID_TRANS_RULE
);
IdTransRule
transRule
=
IdTransRule
.
valueOf
(
config
.
getString
(
Key
.
ID_TRANS_RULE
));
List
<
Configuration
>
configurationList
=
config
.
getListConfiguration
(
Key
.
COLUMN
);
ConfigHelper
.
assertConfig
(
Key
.
COLUMN
,
()
->
(
configurationList
!=
null
&&
!
configurationList
.
isEmpty
()));
for
(
Configuration
column
:
configurationList
)
{
ConfigHelper
.
assertHasContent
(
column
,
Key
.
COLUMN_NAME
);
ConfigHelper
.
assertHasContent
(
column
,
Key
.
COLUMN_VALUE
);
ConfigHelper
.
assertHasContent
(
column
,
Key
.
COLUMN_TYPE
);
ConfigHelper
.
assertHasContent
(
column
,
Key
.
COLUMN_NODE_TYPE
);
String
columnValue
=
column
.
getString
(
Key
.
COLUMN_VALUE
);
ColumnType
columnType
=
ColumnType
.
valueOf
(
column
.
getString
(
Key
.
COLUMN_NODE_TYPE
));
if
(
columnValue
==
null
||
columnValue
.
isEmpty
())
{
// only allow edge empty id
ConfigHelper
.
assertConfig
(
"empty column value"
,
()
->
(
type
==
ImportType
.
EDGE
&&
columnType
==
ColumnType
.
primaryKey
));
}
if
(
columnType
==
ColumnType
.
primaryKey
)
{
ValueType
propType
=
ValueType
.
fromShortName
(
column
.
getString
(
Key
.
COLUMN_TYPE
));
ConfigHelper
.
assertConfig
(
"only string is allowed in primary key"
,
()
->
(
propType
==
ValueType
.
STRING
));
if
(
transRule
==
IdTransRule
.
labelPrefix
)
{
rule
.
setId
(
config
.
getString
(
Key
.
LABEL
)
+
columnValue
);
}
else
{
rule
.
setId
(
columnValue
);
}
}
else
if
(
columnType
==
ColumnType
.
edgeJsonProperty
||
columnType
==
ColumnType
.
vertexJsonProperty
)
{
// only support one json property in column
ConfigHelper
.
assertConfig
(
"multi JsonProperty"
,
()
->
(
rule
.
getPropertiesJsonStr
()
==
null
));
rule
.
setPropertiesJsonStr
(
columnValue
);
}
else
if
(
columnType
==
ColumnType
.
vertexProperty
||
columnType
==
ColumnType
.
edgeProperty
)
{
PropertyMappingRule
propertyMappingRule
=
new
PropertyMappingRule
();
propertyMappingRule
.
setKey
(
column
.
getString
(
Key
.
COLUMN_NAME
));
propertyMappingRule
.
setValue
(
columnValue
);
ValueType
propType
=
ValueType
.
fromShortName
(
column
.
getString
(
Key
.
COLUMN_TYPE
));
ConfigHelper
.
assertConfig
(
"unsupported property type"
,
()
->
propType
!=
null
);
propertyMappingRule
.
setValueType
(
propType
);
rule
.
getProperties
().
add
(
propertyMappingRule
);
}
else
if
(
columnType
==
ColumnType
.
srcPrimaryKey
)
{
if
(
type
!=
ImportType
.
EDGE
)
{
continue
;
}
ValueType
propType
=
ValueType
.
fromShortName
(
column
.
getString
(
Key
.
COLUMN_TYPE
));
ConfigHelper
.
assertConfig
(
"only string is allowed in primary key"
,
()
->
(
propType
==
ValueType
.
STRING
));
if
(
srcTransRule
==
IdTransRule
.
labelPrefix
)
{
rule
.
setFrom
(
config
.
getString
(
Key
.
SRC_LABEL
)
+
columnValue
);
}
else
{
rule
.
setFrom
(
columnValue
);
}
}
else
if
(
columnType
==
ColumnType
.
dstPrimaryKey
)
{
if
(
type
!=
ImportType
.
EDGE
)
{
continue
;
}
ValueType
propType
=
ValueType
.
fromShortName
(
column
.
getString
(
Key
.
COLUMN_TYPE
));
ConfigHelper
.
assertConfig
(
"only string is allowed in primary key"
,
()
->
(
propType
==
ValueType
.
STRING
));
if
(
dstTransRule
==
IdTransRule
.
labelPrefix
)
{
rule
.
setTo
(
config
.
getString
(
Key
.
DST_LABEL
)
+
columnValue
);
}
else
{
rule
.
setTo
(
columnValue
);
}
}
}
if
(
rule
.
getImportType
()
==
ImportType
.
EDGE
)
{
if
(
rule
.
getId
()
==
null
)
{
rule
.
setId
(
""
);
log
.
info
(
"edge id is missed, uuid be default"
);
}
ConfigHelper
.
assertConfig
(
"to needed in edge"
,
()
->
(
rule
.
getTo
()
!=
null
));
ConfigHelper
.
assertConfig
(
"from needed in edge"
,
()
->
(
rule
.
getFrom
()
!=
null
));
}
ConfigHelper
.
assertConfig
(
"id needed"
,
()
->
(
rule
.
getId
()
!=
null
));
return
rule
;
}
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/ValueType.java
0 → 100644
View file @
1dbd39e0
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
mapping
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.function.Function
;
import
com.alibaba.datax.common.element.Column
;
import
lombok.extern.slf4j.Slf4j
;
/**
* @author jerrywang
*
*/
@Slf4j
public
enum
ValueType
{
INT
(
Integer
.
class
,
"int"
,
Column:
:
asLong
,
Integer:
:
valueOf
),
LONG
(
Long
.
class
,
"long"
,
Column:
:
asLong
,
Long:
:
valueOf
),
DOUBLE
(
Double
.
class
,
"double"
,
Column:
:
asDouble
,
Double:
:
valueOf
),
FLOAT
(
Float
.
class
,
"float"
,
Column:
:
asDouble
,
Float:
:
valueOf
),
BOOLEAN
(
Boolean
.
class
,
"boolean"
,
Column:
:
asBoolean
,
Boolean:
:
valueOf
),
STRING
(
String
.
class
,
"string"
,
Column:
:
asString
,
String:
:
valueOf
);
private
Class
<?>
type
=
null
;
private
String
shortName
=
null
;
private
Function
<
Column
,
Object
>
columnFunc
=
null
;
private
Function
<
String
,
Object
>
fromStrFunc
=
null
;
private
ValueType
(
Class
<?>
type
,
String
name
,
Function
<
Column
,
Object
>
columnFunc
,
Function
<
String
,
Object
>
fromStrFunc
)
{
this
.
type
=
type
;
this
.
shortName
=
name
;
this
.
columnFunc
=
columnFunc
;
this
.
fromStrFunc
=
fromStrFunc
;
ValueTypeHolder
.
shortName2type
.
put
(
name
,
this
);
}
public
static
ValueType
fromShortName
(
String
name
)
{
return
ValueTypeHolder
.
shortName2type
.
get
(
name
);
}
public
Class
<?>
type
()
{
return
this
.
type
;
}
public
String
shortName
()
{
return
this
.
shortName
;
}
public
Object
applyColumn
(
Column
column
)
{
try
{
if
(
column
==
null
)
{
return
null
;
}
return
columnFunc
.
apply
(
column
);
}
catch
(
Exception
e
)
{
log
.
error
(
"applyColumn error {}, column {}"
,
e
.
toString
(),
column
);
throw
e
;
}
}
public
Object
fromStrFunc
(
String
str
)
{
return
fromStrFunc
.
apply
(
str
);
}
private
static
class
ValueTypeHolder
{
private
static
Map
<
String
,
ValueType
>
shortName2type
=
new
HashMap
<>();
}
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/AbstractGdbGraph.java
0 → 100644
View file @
1dbd39e0
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
model
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.writer.gdbwriter.Key
;
import
com.alibaba.datax.plugin.writer.gdbwriter.client.GdbWriterConfig
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.tinkerpop.gremlin.driver.Client
;
import
org.apache.tinkerpop.gremlin.driver.Cluster
;
import
org.apache.tinkerpop.gremlin.driver.RequestOptions
;
import
org.apache.tinkerpop.gremlin.driver.ResultSet
;
import
org.apache.tinkerpop.gremlin.driver.ser.Serializers
;
import
java.util.Map
;
import
java.util.UUID
;
import
java.util.concurrent.TimeUnit
;
/**
* @author jerrywang
*
*/
@Slf4j
public
abstract
class
AbstractGdbGraph
implements
GdbGraph
{
private
final
static
int
DEFAULT_TIMEOUT
=
30000
;
protected
Client
client
=
null
;
protected
Key
.
UpdateMode
updateMode
=
Key
.
UpdateMode
.
INSERT
;
protected
int
propertiesBatchNum
=
GdbWriterConfig
.
DEFAULT_BATCH_PROPERTY_NUM
;
protected
boolean
session
=
false
;
protected
AbstractGdbGraph
()
{}
protected
AbstractGdbGraph
(
Configuration
config
,
boolean
session
)
{
initClient
(
config
,
session
);
}
protected
void
initClient
(
Configuration
config
,
boolean
session
)
{
updateMode
=
Key
.
UpdateMode
.
valueOf
(
config
.
getString
(
Key
.
UPDATE_MODE
,
"INSERT"
));
log
.
info
(
"init graphdb client"
);
String
host
=
config
.
getString
(
Key
.
HOST
);
int
port
=
config
.
getInt
(
Key
.
PORT
);
String
username
=
config
.
getString
(
Key
.
USERNAME
);
String
password
=
config
.
getString
(
Key
.
PASSWORD
);
int
maxDepthPerConnection
=
config
.
getInt
(
Key
.
MAX_IN_PROCESS_PER_CONNECTION
,
GdbWriterConfig
.
DEFAULT_MAX_IN_PROCESS_PER_CONNECTION
);
int
maxConnectionPoolSize
=
config
.
getInt
(
Key
.
MAX_CONNECTION_POOL_SIZE
,
GdbWriterConfig
.
DEFAULT_MAX_CONNECTION_POOL_SIZE
);
int
maxSimultaneousUsagePerConnection
=
config
.
getInt
(
Key
.
MAX_SIMULTANEOUS_USAGE_PER_CONNECTION
,
GdbWriterConfig
.
DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION
);
this
.
session
=
session
;
if
(
this
.
session
)
{
maxConnectionPoolSize
=
GdbWriterConfig
.
DEFAULT_MAX_CONNECTION_POOL_SIZE
;
maxDepthPerConnection
=
GdbWriterConfig
.
DEFAULT_MAX_IN_PROCESS_PER_CONNECTION
;
maxSimultaneousUsagePerConnection
=
GdbWriterConfig
.
DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION
;
}
try
{
Cluster
cluster
=
Cluster
.
build
(
host
).
port
(
port
).
credentials
(
username
,
password
)
.
serializer
(
Serializers
.
GRAPHBINARY_V1D0
)
.
maxContentLength
(
1048576
)
.
maxInProcessPerConnection
(
maxDepthPerConnection
)
.
minInProcessPerConnection
(
0
)
.
maxConnectionPoolSize
(
maxConnectionPoolSize
)
.
minConnectionPoolSize
(
maxConnectionPoolSize
)
.
maxSimultaneousUsagePerConnection
(
maxSimultaneousUsagePerConnection
)
.
resultIterationBatchSize
(
64
)
.
create
();
client
=
session
?
cluster
.
connect
(
UUID
.
randomUUID
().
toString
()).
init
()
:
cluster
.
connect
().
init
();
warmClient
(
maxConnectionPoolSize
*
maxDepthPerConnection
);
}
catch
(
RuntimeException
e
)
{
log
.
error
(
"Failed to connect to GDB {}:{}, due to {}"
,
host
,
port
,
e
);
throw
e
;
}
propertiesBatchNum
=
config
.
getInt
(
Key
.
MAX_PROPERTIES_BATCH_NUM
,
GdbWriterConfig
.
DEFAULT_BATCH_PROPERTY_NUM
);
}
/**
* @param dsl
* @param parameters
*/
protected
void
runInternal
(
String
dsl
,
final
Map
<
String
,
Object
>
parameters
)
throws
Exception
{
RequestOptions
.
Builder
options
=
RequestOptions
.
build
().
timeout
(
DEFAULT_TIMEOUT
);
if
(
parameters
!=
null
&&
!
parameters
.
isEmpty
())
{
parameters
.
forEach
(
options:
:
addParameter
);
}
ResultSet
results
=
client
.
submitAsync
(
dsl
,
options
.
create
()).
get
(
DEFAULT_TIMEOUT
,
TimeUnit
.
MILLISECONDS
);
results
.
all
().
get
(
DEFAULT_TIMEOUT
+
1000
,
TimeUnit
.
MILLISECONDS
);
}
void
beginTx
()
{
if
(!
session
)
{
return
;
}
String
dsl
=
"g.tx().open()"
;
client
.
submit
(
dsl
).
all
().
join
();
}
void
doCommit
()
{
if
(!
session
)
{
return
;
}
try
{
String
dsl
=
"g.tx().commit()"
;
client
.
submit
(
dsl
).
all
().
join
();
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
e
);
}
}
void
doRollback
()
{
if
(!
session
)
{
return
;
}
String
dsl
=
"g.tx().rollback()"
;
client
.
submit
(
dsl
).
all
().
join
();
}
private
void
warmClient
(
int
num
)
{
try
{
beginTx
();
runInternal
(
"g.V('test')"
,
null
);
doCommit
();
log
.
info
(
"warm graphdb client over"
);
}
catch
(
Exception
e
)
{
log
.
error
(
"warmClient error"
);
doRollback
();
throw
new
RuntimeException
(
e
);
}
}
@Override
public
void
close
()
{
if
(
client
!=
null
)
{
log
.
info
(
"close graphdb client"
);
client
.
close
();
}
}
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbEdge.java
0 → 100644
View file @
1dbd39e0
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
model
;
import
lombok.Data
;
import
lombok.EqualsAndHashCode
;
import
lombok.ToString
;
/**
* @author jerrywang
*
*/
@Data
@EqualsAndHashCode
(
callSuper
=
true
)
@ToString
(
callSuper
=
true
)
public
class
GdbEdge
extends
GdbElement
{
private
String
from
=
null
;
private
String
to
=
null
;
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbElement.java
0 → 100644
View file @
1dbd39e0
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
model
;
import
java.util.HashMap
;
import
java.util.Map
;
import
lombok.Data
;
/**
* @author jerrywang
*
*/
@Data
public
class
GdbElement
{
String
id
=
null
;
String
label
=
null
;
Map
<
String
,
Object
>
properties
=
new
HashMap
<>();
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbGraph.java
0 → 100644
View file @
1dbd39e0
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
model
;
import
com.alibaba.datax.common.element.Record
;
import
groovy.lang.Tuple2
;
import
java.util.List
;
/**
* @author jerrywang
*
*/
public
interface
GdbGraph
extends
AutoCloseable
{
List
<
Tuple2
<
Record
,
Exception
>>
add
(
List
<
Tuple2
<
Record
,
GdbElement
>>
records
);
@Override
void
close
();
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbVertex.java
0 → 100644
View file @
1dbd39e0
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
model
;
import
lombok.EqualsAndHashCode
;
import
lombok.ToString
;
/**
* @author jerrywang
*
*/
@EqualsAndHashCode
(
callSuper
=
true
)
@ToString
(
callSuper
=
true
)
public
class
GdbVertex
extends
GdbElement
{
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/ScriptGdbGraph.java
0 → 100644
View file @
1dbd39e0
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
model
;
import
java.util.*
;
import
com.alibaba.datax.common.element.Record
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.writer.gdbwriter.Key
;
import
com.alibaba.datax.plugin.writer.gdbwriter.util.GdbDuplicateIdException
;
import
com.github.benmanes.caffeine.cache.Cache
;
import
com.github.benmanes.caffeine.cache.Caffeine
;
import
groovy.lang.Tuple2
;
import
lombok.extern.slf4j.Slf4j
;
/**
* @author jerrywang
*
*/
@Slf4j
public
class
ScriptGdbGraph
extends
AbstractGdbGraph
{
private
static
final
String
VAR_PREFIX
=
"GDB___"
;
private
static
final
String
VAR_ID
=
VAR_PREFIX
+
"id"
;
private
static
final
String
VAR_LABEL
=
VAR_PREFIX
+
"label"
;
private
static
final
String
VAR_FROM
=
VAR_PREFIX
+
"from"
;
private
static
final
String
VAR_TO
=
VAR_PREFIX
+
"to"
;
private
static
final
String
VAR_PROP_KEY
=
VAR_PREFIX
+
"PK"
;
private
static
final
String
VAR_PROP_VALUE
=
VAR_PREFIX
+
"PV"
;
private
static
final
String
ADD_V_START
=
"g.addV("
+
VAR_LABEL
+
").property(id, "
+
VAR_ID
+
")"
;
private
static
final
String
ADD_E_START
=
"g.addE("
+
VAR_LABEL
+
").property(id, "
+
VAR_ID
+
").from(V("
+
VAR_FROM
+
")).to(V("
+
VAR_TO
+
"))"
;
private
static
final
String
UPDATE_V_START
=
"g.V("
+
VAR_ID
+
")"
;
private
static
final
String
UPDATE_E_START
=
"g.E("
+
VAR_ID
+
")"
;
private
Cache
<
Integer
,
String
>
propertyCache
;
private
Random
random
;
public
ScriptGdbGraph
()
{
propertyCache
=
Caffeine
.
newBuilder
().
maximumSize
(
1024
).
build
();
random
=
new
Random
();
}
public
ScriptGdbGraph
(
Configuration
config
,
boolean
session
)
{
super
(
config
,
session
);
propertyCache
=
Caffeine
.
newBuilder
().
maximumSize
(
1024
).
build
();
random
=
new
Random
();
log
.
info
(
"Init as ScriptGdbGraph."
);
}
/**
* Apply list of {@link GdbElement} to GDB, return the failed records
* @param records list of element to apply
* @return
*/
@Override
public
List
<
Tuple2
<
Record
,
Exception
>>
add
(
List
<
Tuple2
<
Record
,
GdbElement
>>
records
)
{
List
<
Tuple2
<
Record
,
Exception
>>
errors
=
new
ArrayList
<>();
try
{
beginTx
();
for
(
Tuple2
<
Record
,
GdbElement
>
elementTuple2
:
records
)
{
try
{
addInternal
(
elementTuple2
.
getSecond
());
}
catch
(
Exception
e
)
{
errors
.
add
(
new
Tuple2
<>(
elementTuple2
.
getFirst
(),
e
));
}
}
doCommit
();
}
catch
(
Exception
ex
)
{
doRollback
();
throw
new
RuntimeException
(
ex
);
}
return
errors
;
}
private
void
addInternal
(
GdbElement
element
)
{
try
{
addInternal
(
element
,
false
);
}
catch
(
GdbDuplicateIdException
e
)
{
if
(
updateMode
==
Key
.
UpdateMode
.
SKIP
)
{
log
.
debug
(
"Skip duplicate id {}"
,
element
.
getId
());
}
else
if
(
updateMode
==
Key
.
UpdateMode
.
INSERT
)
{
throw
new
RuntimeException
(
e
);
}
else
if
(
updateMode
==
Key
.
UpdateMode
.
MERGE
)
{
if
(
element
.
getProperties
().
isEmpty
())
{
return
;
}
try
{
addInternal
(
element
,
true
);
}
catch
(
GdbDuplicateIdException
e1
)
{
log
.
error
(
"duplicate id {} while update..."
,
element
.
getId
());
throw
new
RuntimeException
(
e1
);
}
}
}
}
private
void
addInternal
(
GdbElement
element
,
boolean
update
)
throws
GdbDuplicateIdException
{
Map
<
String
,
Object
>
params
=
element
.
getProperties
();
Map
<
String
,
Object
>
subParams
=
new
HashMap
<>(
propertiesBatchNum
);
boolean
firstAdd
=
!
update
;
boolean
isVertex
=
(
element
instanceof
GdbVertex
);
for
(
Map
.
Entry
<
String
,
Object
>
entry
:
params
.
entrySet
())
{
subParams
.
put
(
entry
.
getKey
(),
entry
.
getValue
());
if
(
subParams
.
size
()
>=
propertiesBatchNum
)
{
setGraphDbElement
(
element
,
subParams
,
isVertex
,
firstAdd
);
firstAdd
=
false
;
subParams
.
clear
();
}
}
if
(!
subParams
.
isEmpty
()
||
firstAdd
)
{
setGraphDbElement
(
element
,
subParams
,
isVertex
,
firstAdd
);
}
}
private
Tuple2
<
String
,
Map
<
String
,
Object
>>
buildDsl
(
GdbElement
element
,
Map
<
String
,
Object
>
properties
,
boolean
isVertex
,
boolean
firstAdd
)
{
Map
<
String
,
Object
>
params
=
new
HashMap
<>();
String
dslPropertyPart
=
propertyCache
.
get
(
properties
.
size
(),
keys
->
{
final
StringBuilder
sb
=
new
StringBuilder
();
for
(
int
i
=
0
;
i
<
keys
;
i
++)
{
sb
.
append
(
".property("
).
append
(
VAR_PROP_KEY
).
append
(
i
)
.
append
(
", "
).
append
(
VAR_PROP_VALUE
).
append
(
i
).
append
(
")"
);
}
return
sb
.
toString
();
});
String
dsl
;
if
(
isVertex
)
{
dsl
=
(
firstAdd
?
ADD_V_START
:
UPDATE_V_START
)
+
dslPropertyPart
;
}
else
{
dsl
=
(
firstAdd
?
ADD_E_START
:
UPDATE_E_START
)
+
dslPropertyPart
;
if
(
firstAdd
)
{
params
.
put
(
VAR_FROM
,
((
GdbEdge
)
element
).
getFrom
());
params
.
put
(
VAR_TO
,
((
GdbEdge
)
element
).
getTo
());
}
}
int
index
=
0
;
for
(
Map
.
Entry
<
String
,
Object
>
entry
:
properties
.
entrySet
())
{
params
.
put
(
VAR_PROP_KEY
+
index
,
entry
.
getKey
());
params
.
put
(
VAR_PROP_VALUE
+
index
,
entry
.
getValue
());
index
++;
}
if
(
firstAdd
)
{
params
.
put
(
VAR_LABEL
,
element
.
getLabel
());
}
params
.
put
(
VAR_ID
,
element
.
getId
());
return
new
Tuple2
<>(
dsl
,
params
);
}
private
void
setGraphDbElement
(
GdbElement
element
,
Map
<
String
,
Object
>
properties
,
boolean
isVertex
,
boolean
firstAdd
)
throws
GdbDuplicateIdException
{
int
retry
=
10
;
int
idleTime
=
random
.
nextInt
(
10
)
+
10
;
Tuple2
<
String
,
Map
<
String
,
Object
>>
elementDsl
=
buildDsl
(
element
,
properties
,
isVertex
,
firstAdd
);
while
(
retry
>
0
)
{
try
{
runInternal
(
elementDsl
.
getFirst
(),
elementDsl
.
getSecond
());
log
.
debug
(
"AddElement {}"
,
element
.
getId
());
return
;
}
catch
(
Exception
e
)
{
String
cause
=
e
.
getCause
()
==
null
?
""
:
e
.
getCause
().
toString
();
if
(
cause
.
contains
(
"rejected from"
))
{
retry
--;
try
{
Thread
.
sleep
(
idleTime
);
}
catch
(
InterruptedException
e1
)
{
// ...
}
idleTime
=
Math
.
min
(
idleTime
*
2
,
2000
);
continue
;
}
else
if
(
firstAdd
&&
cause
.
contains
(
"GraphDB id exists"
))
{
throw
new
GdbDuplicateIdException
(
e
);
}
log
.
error
(
"Add Failed id {}, dsl {}, params {}, e {}"
,
element
.
getId
(),
elementDsl
.
getFirst
(),
elementDsl
.
getSecond
(),
e
);
throw
new
RuntimeException
(
e
);
}
}
log
.
error
(
"Add Failed id {}, dsl {}, params {}"
,
element
.
getId
(),
elementDsl
.
getFirst
(),
elementDsl
.
getSecond
());
throw
new
RuntimeException
(
"failed to queue new element to server"
);
}
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/ConfigHelper.java
0 → 100644
View file @
1dbd39e0
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
util
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
java.util.function.Supplier
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.writer.gdbwriter.GdbWriterErrorCode
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
org.apache.commons.lang3.StringUtils
;
/**
* @author jerrywang
*
*/
public
interface
ConfigHelper
{
static
void
assertConfig
(
String
key
,
Supplier
<
Boolean
>
f
)
{
if
(!
f
.
get
())
{
throw
DataXException
.
asDataXException
(
GdbWriterErrorCode
.
BAD_CONFIG_VALUE
,
key
);
}
}
static
void
assertHasContent
(
Configuration
config
,
String
key
)
{
assertConfig
(
key
,
()
->
StringUtils
.
isNotBlank
(
config
.
getString
(
key
)));
}
/**
* NOTE: {@code Configuration::get(String, Class<T>)} doesn't work.
*
* @param conf Configuration
* @param key key path to configuration
* @param cls Class of result type
* @return the target configuration object of type T
*/
static
<
T
>
T
getConfig
(
Configuration
conf
,
String
key
,
Class
<
T
>
cls
)
{
JSONObject
j
=
(
JSONObject
)
conf
.
get
(
key
);
return
JSON
.
toJavaObject
(
j
,
cls
);
}
/**
* Create a configuration from the specified file on the classpath.
*
* @param name file name
* @return Configuration instance.
*/
static
Configuration
fromClasspath
(
String
name
)
{
try
(
InputStream
is
=
Thread
.
currentThread
().
getContextClassLoader
().
getResourceAsStream
(
name
))
{
return
Configuration
.
from
(
is
);
}
catch
(
IOException
e
)
{
throw
new
IllegalArgumentException
(
"File not found: "
+
name
);
}
}
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/GdbDuplicateIdException.java
0 → 100644
View file @
1dbd39e0
/*
* (C) 2019-present Alibaba Group Holding Limited.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
util
;
/**
* @author : Liu Jianping
* @date : 2019/8/3
*/
public
class
GdbDuplicateIdException
extends
Exception
{
public
GdbDuplicateIdException
(
Exception
e
)
{
super
(
e
);
}
public
GdbDuplicateIdException
()
{
super
();
}
}
gdbwriter/src/main/resources/plugin.json
0 → 100644
View file @
1dbd39e0
{
"name"
:
"gdbwriter"
,
"class"
:
"com.alibaba.datax.plugin.writer.gdbwriter.GdbWriter"
,
"description"
:
"useScene: prod. mechanism: connect GDB with gremlin-client, execute DSL as 'g.addV() or g.addE()' to write record"
,
"developer"
:
"alibaba"
}
gdbwriter/src/main/resources/plugin_job_template.json
0 → 100644
View file @
1dbd39e0
{
"job"
:
{
"setting"
:
{
"speed"
:
{
"channel"
:
1
}
},
"content"
:
[
{
"reader"
:
{
"name"
:
"odpsreader"
},
"writer"
:
{
"name"
:
"gdbwriter"
,
"parameter"
:
{
"host"
:
"localhost"
,
"port"
:
8182
,
"username"
:
"username"
,
"password"
:
"password"
,
"label"
:
"test-label"
,
"srcLabel"
:
"test-srcLabel-"
,
"dstLabel"
:
"test-dstLabel-"
,
"labelType"
:
"EDGE"
,
"writeMode"
:
"INSERT"
,
"idTransRule"
:
"labelPrefix"
,
"srcIdTransRule"
:
"labelPrefix"
,
"dstIdTransRule"
:
"labelPrefix"
,
"column"
:
[
{
"name"
:
"id"
,
"value"
:
"-test-${0}"
,
"type"
:
"string"
,
"columnType"
:
"primaryKey"
},
{
"name"
:
"id"
,
"value"
:
"from-id-${2}"
,
"type"
:
"string"
,
"columnType"
:
"srcPrimaryKey"
},
{
"name"
:
"id"
,
"value"
:
"to-id-${3}"
,
"type"
:
"string"
,
"columnType"
:
"dstPrimaryKey"
},
{
"name"
:
"strValue-${2}-key"
,
"value"
:
"strValue-${2}-value"
,
"type"
:
"string"
,
"columnType"
:
"edgeProperty"
},
{
"name"
:
"intProp"
,
"value"
:
"${3}"
,
"type"
:
"int"
,
"columnType"
:
"edgeProperty"
},
{
"name"
:
"booleanProp"
,
"value"
:
"${5}"
,
"type"
:
"boolean"
,
"columnType"
:
"edgeProperty"
}
]
}
}
}
]
}
}
pom.xml
View file @
1dbd39e0
...
...
@@ -88,6 +88,7 @@
<module>
elasticsearchwriter
</module>
<module>
tsdbwriter
</module>
<module>
adbpgwriter
</module>
<module>
gdbwriter
</module>
<!-- common support module -->
<module>
plugin-rdbms-util
</module>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment