Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
D
DataX
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
risk-feature
DataX
Commits
57c8dd86
Unverified
Commit
57c8dd86
authored
Sep 05, 2019
by
Trafalgar
Committed by
GitHub
Sep 05, 2019
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #429 from heljoyLiu/master
add gdbwriter plugin
parents
05a000db
1dbd39e0
Changes
24
Hide whitespace changes
Inline
Side-by-side
Showing
24 changed files
with
2100 additions
and
0 deletions
+2100
-0
gdbwriter.md
gdbwriter/doc/gdbwriter.md
+370
-0
pom.xml
gdbwriter/pom.xml
+103
-0
package.xml
gdbwriter/src/main/assembly/package.xml
+35
-0
GdbWriter.java
.../com/alibaba/datax/plugin/writer/gdbwriter/GdbWriter.java
+251
-0
GdbWriterErrorCode.java
...aba/datax/plugin/writer/gdbwriter/GdbWriterErrorCode.java
+33
-0
Key.java
...n/java/com/alibaba/datax/plugin/writer/gdbwriter/Key.java
+141
-0
GdbGraphManager.java
...datax/plugin/writer/gdbwriter/client/GdbGraphManager.java
+39
-0
GdbWriterConfig.java
...datax/plugin/writer/gdbwriter/client/GdbWriterConfig.java
+41
-0
DefaultGdbMapper.java
...tax/plugin/writer/gdbwriter/mapping/DefaultGdbMapper.java
+190
-0
GdbMapper.java
...baba/datax/plugin/writer/gdbwriter/mapping/GdbMapper.java
+17
-0
MappingRule.java
...ba/datax/plugin/writer/gdbwriter/mapping/MappingRule.java
+41
-0
MappingRuleFactory.java
...x/plugin/writer/gdbwriter/mapping/MappingRuleFactory.java
+181
-0
ValueType.java
...baba/datax/plugin/writer/gdbwriter/mapping/ValueType.java
+71
-0
AbstractGdbGraph.java
...datax/plugin/writer/gdbwriter/model/AbstractGdbGraph.java
+151
-0
GdbEdge.java
.../alibaba/datax/plugin/writer/gdbwriter/model/GdbEdge.java
+20
-0
GdbElement.java
...ibaba/datax/plugin/writer/gdbwriter/model/GdbElement.java
+20
-0
GdbGraph.java
...alibaba/datax/plugin/writer/gdbwriter/model/GdbGraph.java
+20
-0
GdbVertex.java
...libaba/datax/plugin/writer/gdbwriter/model/GdbVertex.java
+17
-0
ScriptGdbGraph.java
...a/datax/plugin/writer/gdbwriter/model/ScriptGdbGraph.java
+196
-0
ConfigHelper.java
...baba/datax/plugin/writer/gdbwriter/util/ConfigHelper.java
+59
-0
GdbDuplicateIdException.java
...plugin/writer/gdbwriter/util/GdbDuplicateIdException.java
+23
-0
plugin.json
gdbwriter/src/main/resources/plugin.json
+6
-0
plugin_job_template.json
gdbwriter/src/main/resources/plugin_job_template.json
+74
-0
pom.xml
pom.xml
+1
-0
No files found.
gdbwriter/doc/gdbwriter.md
0 → 100644
View file @
57c8dd86
# DataX GDBWriter
## 1 快速介绍
GDBWriter插件实现了写入数据到GDB实例的功能。GDBWriter通过
`Gremlin Client`
连接远程GDB实例,获取Reader的数据,生成写入DSL语句,将数据写入到GDB。
## 2 实现原理
GDBWriter通过DataX框架获取Reader生成的协议数据,使用
`g.addV/E(GDB___label).property(id, GDB___id).property(GDB___PK1, GDB___PV1)...`
语句写入数据到GDB实例。
可以配置
`Gremlin Client`
工作在session模式,由客户端控制事务,在一次事务中实现多个记录的批量写入。
## 3 功能说明
因为GDB中点和边的配置不同,导入时需要区分点和边的配置。
### 3.1 点配置样例
*
这里是一份从内存生成点数据导入GDB实例的配置
```
json
{
"job"
:
{
"setting"
:
{
"speed"
:
{
"channel"
:
1
}
},
"content"
:
[
{
"reader"
:
{
"name"
:
"streamreader"
,
"parameter"
:
{
"column"
:
[
{
"random"
:
"1,100"
,
"type"
:
"double"
},
{
"random"
:
"1000,1200"
,
"type"
:
"long"
},
{
"random"
:
"60,64"
,
"type"
:
"string"
}
],
"sliceRecordCount"
:
1000
}
},
"writer"
:
{
"name"
:
"gdbwriter"
,
"parameter"
:
{
"host"
:
"gdb-endpoint"
,
"port"
:
8182
,
"username"
:
"root"
,
"password"
:
"***"
,
"writeMode"
:
"INSERT"
,
"labelType"
:
"VERTEX"
,
"label"
:
"${1}"
,
"idTransRule"
:
"none"
,
"session"
:
true
,
"maxRecordsInBatch"
:
64
,
"column"
:
[
{
"name"
:
"id"
,
"value"
:
"${0}"
,
"type"
:
"string"
,
"columnType"
:
"primaryKey"
},
{
"name"
:
"vertex_propKey"
,
"value"
:
"${2}"
,
"type"
:
"string"
,
"columnType"
:
"vertexProperty"
}
]
}
}
}
]
}
}
```
### 3.2 边配置样例
*
这里是一份从内存生成边数据导入GDB实例的配置
> **注意**
> 下面配置导入边时,需要提前在GDB实例中写入点,要求分别存在id为`person-{{i}}`和`book-{{i}}`的点,其中i取值0~100。
```
json
{
"job"
:
{
"setting"
:
{
"speed"
:
{
"channel"
:
1
}
},
"content"
:
[
{
"reader"
:
{
"name"
:
"streamreader"
,
"parameter"
:
{
"column"
:
[
{
"random"
:
"100,200"
,
"type"
:
"double"
},
{
"random"
:
"1,100"
,
"type"
:
"long"
},
{
"random"
:
"1,100"
,
"type"
:
"long"
},
{
"random"
:
"2000,2200"
,
"type"
:
"long"
},
{
"random"
:
"60,64"
,
"type"
:
"string"
}
],
"sliceRecordCount"
:
1000
}
},
"writer"
:
{
"name"
:
"gdbwriter"
,
"parameter"
:
{
"host"
:
"gdb-endpoint"
,
"port"
:
8182
,
"username"
:
"root"
,
"password"
:
"***"
,
"writeMode"
:
"INSERT"
,
"labelType"
:
"EDGE"
,
"label"
:
"${3}"
,
"idTransRule"
:
"none"
,
"srcIdTransRule"
:
"labelPrefix"
,
"dstIdTransRule"
:
"labelPrefix"
,
"srcLabel"
:
"person-"
,
"dstLabel"
:
"book-"
,
"session"
:
false
,
"column"
:
[
{
"name"
:
"id"
,
"value"
:
"${0}"
,
"type"
:
"string"
,
"columnType"
:
"primaryKey"
},
{
"name"
:
"id"
,
"value"
:
"${1}"
,
"type"
:
"string"
,
"columnType"
:
"srcPrimaryKey"
},
{
"name"
:
"id"
,
"value"
:
"${2}"
,
"type"
:
"string"
,
"columnType"
:
"dstPrimaryKey"
},
{
"name"
:
"edge_propKey"
,
"value"
:
"${4}"
,
"type"
:
"string"
,
"columnType"
:
"edgeProperty"
}
]
}
}
}
]
}
}
```
### 3.3 参数说明
*
**host**
*
描述:GDB实例连接域名,对应阿里云控制台->"图数据库 GDB"->"实例管理"->"基本信息" 中的"内网地址";
*
必选:是
*
默认值:无
*
**port**
*
描述:GDB实例连接端口
*
必选:是
*
默认值:8182
*
**username**
*
描述:GDB实例账号名
*
必选:是
*
默认值:无
*
**password**
*
描述:图实例账号名对应密码
*
必选:是
*
默认值:无
*
**label**
*
描述:类型名,即点/边名称; label支持从源列中读取,如${0},表示取第一列字段作为label名。源列索引从0开始;
*
必选:是
*
默认值:无
*
**labelType**
*
描述:label类型;
*
枚举值"VERTEX"表示点
*
枚举值"EDGE"表示边
*
必选:是
*
默认值:无
*
**srcLabel**
*
描述:当label为边时,表示起点的点名称;srcLabel支持从源列中读取,如${0},表示取第一列字段作为label名。源列索引从0开始;
*
必选:labelType为边,srcIdTransRule为none时可不填写,否则必填;
*
默认值:无
*
**dstLabel**
*
描述:当label为边时,表示终点的点名称;dstLabel支持从源列中读取,如${0},表示取第一列字段作为label名。源列索引从0开始;
*
必选:labelType为边,dstIdTransRule为none时可不填写,否则必填;
*
默认值:无
*
**writeMode**
*
描述:导入id重复时的处理模式;
*
枚举值"INSERT"表示会报错,错误记录数加1;
*
枚举值"MERGE"表示更新属性值,不计入错误;
*
枚举值"SKIP"表示跳过,不计入错误
*
必选:是
*
默认值:INSERT
*
**idTransRule**
*
描述:主键id转换规则;
*
枚举值"labelPrefix"表示将映射的值转换为{label名}{源字段}
*
枚举值"none"表示映射的值不做转换
*
必选:是
*
默认值:"none"
*
**srcIdTransRule**
*
描述:当label为边时,表示起点的主键id转换规则;
*
枚举值"labelPrefix"表示映射的值转换为为{label名}{源字段}
*
枚举值"none"表示映射的值不做转换,此时srcLabel 可不填写
*
必选:label为边时必选
*
默认值:"none"
*
**dstIdTransRule**
*
描述:当label为边时,表示终点的主键id转换规则;
*
枚举值"labelPrefix"表示映射的值转换为为{label名}{源字段}
*
枚举值"none"表示映射的值不做转换,此时dstLabel 可不填写
*
必选:label为边时必选
*
默认值:"none"
*
**session**
*
描述:是否使用
`Gremlin Client`
的session模式写入数据
*
必选:否
*
默认值:false
*
**maxRecordsInBatch**
*
描述:使用
`Gremlin Client`
的session模式时,一次事务处理的记录数
*
必选:否
*
默认值:16
*
**column**
*
描述:点/边字段映射关系配置
*
必选:是
*
默认值:无
*
**column -> name**
*
描述:点/边映射关系的字段名
*
必选:是
*
默认值:无
*
**column -> value**
*
描述:点/边映射关系的字段值;
*
${N}表示直接映射源端值,N为源端column索引,从0开始;${0}表示映射源端column第1个字段;
*
test-${0} 表示源端值做拼接转换,${0}值前/后可添加固定字符串;
*
${0}-${1}表示做多字段拼接,也可在任意位置添加固定字符串,如test-${0}-test1-${1}-test2
*
必选:是
*
默认值:无
*
**column -> type**
*
描述:点/边映射关系的字段值类型;
*
主键id只支持string类型,GDBWriter插件会强制转换,源id必须保证可转换为string;
*
普通属性支持类型:int, long, float, double, boolean, string
*
必选:是
*
默认值:无
*
**column -> columnType**
*
描述:点/边映射关系字段对应到GDB点/边数据的类型,支持以下几类枚举值:
*
公共枚举值:
*
primaryKey:表示该字段是主键id
*
点枚举值:
*
vertexProperty:labelType为点时,表示该字段是点的普通属性
*
vertexJsonProperty:labelType为点时,表示是点json属性,value结构请见备注
**json properties示例**
,点配置最多只允许出现一个json属性;
*
边枚举值:
*
srcPrimaryKey:labelType为边时,表示该字段是起点主键id
*
dstPrimaryKey:labelType为边时,表示该字段是终点主键id
*
edgeProperty:labelType为边时,表示该字段是边的普通属性
*
edgeJsonProperty:labelType为边时,表示是边json属性,value结构请见备注
**json properties示例**
,边配置最多只允许出现一个json属性;
*
必选:是
*
默认值:无
*
备注:
**json properties示例**
> ```json
> {"properties":[
> {"k":"name","t":"string","v":"tom"},
> {"k":"age","t":"int","v":"20"},
> {"k":"sex","t":"string","v":"male"}
> ]}
> ```
## 4 性能报告
### 4.1 环境参数
GDB实例规格
-
16core 128GB, 1TB SSD
DataX压测机器
-
cpu: 4
*
Intel(R) Xeon(R) Platinum 8163 CPU @ 2.50GHz
-
mem: 16GB
-
net: 千兆双网卡
-
os: CentOS 7, 3.10.0-957.5.1.el7.x86_64
-
jvm: -Xms4g -Xmx4g
### 4.2 数据特征
```
{
id: random double(1~10000)
from: random long(1~40000000)
to: random long(1~40000000)
label: random long(20000000 ~ 20005000)
propertyKey: random string(len: 120~128)
propertyName: random string(len: 120~128)
}
```
-
点/边都有一个属性,属性key和value都是长度120~128字节的随机字符串
-
label是范围20000000 ~ 20005000的随机整数转换的字符串
-
id是浮点数转换的字符串,防止重复
-
边包含关联起点和终点,测试边时已经提前导入twitter数据集的点数据(4200W)
### 4.3 任务配置
分点和边的配置,具体配置与上述的示例配置相似,下面列出关键的差异点
-
增加并发任务数量
> "channel": 32
-
使用session模式
> "session": true
-
增加事务批量处理记录个数
> "maxRecordsInBatch": 128
### 4.4 测试结果
点导入性能:
-
任务平均流量: 4.07MB/s
-
任务总计耗时: 412s
-
记录写入速度: 15609rec/s
-
读出记录总数: 6400000
边导入性能:
-
任务平均流量: 2.76MB/s
-
任务总计耗时: 1602s
-
记录写入速度: 10000rec/s
-
读出记录总数: 16000000
## 5 约束限制
-
导入边记录前要求GDB中已经存在边关联的起点/终点
-
GDBWriter插件与用户查询DSL使用相同的GDB实例端口,导入时可能会影响查询性能
## FAQ
无
gdbwriter/pom.xml
0 → 100644
View file @
57c8dd86
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<parent>
<artifactId>
datax-all
</artifactId>
<groupId>
com.alibaba.datax
</groupId>
<version>
0.0.1-SNAPSHOT
</version>
</parent>
<artifactId>
gdbwriter
</artifactId>
<name>
gdbwriter
</name>
<packaging>
jar
</packaging>
<properties>
<maven.compiler.target>
1.8
</maven.compiler.target>
<maven.compiler.source>
1.8
</maven.compiler.source>
<gremlin.version>
3.4.1
</gremlin.version>
</properties>
<dependencies>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-common
</artifactId>
<version>
${datax-project-version}
</version>
<exclusions>
<exclusion>
<artifactId>
slf4j-log4j12
</artifactId>
<groupId>
org.slf4j
</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-core
</artifactId>
<version>
${datax-project-version}
</version>
<scope>
test
</scope>
<exclusions>
<exclusion>
<artifactId>
slf4j-log4j12
</artifactId>
<groupId>
org.slf4j
</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
</dependency>
<dependency>
<groupId>
ch.qos.logback
</groupId>
<artifactId>
logback-classic
</artifactId>
</dependency>
<dependency>
<groupId>
org.apache.tinkerpop
</groupId>
<artifactId>
gremlin-driver
</artifactId>
<version>
${gremlin.version}
</version>
</dependency>
<dependency>
<groupId>
org.projectlombok
</groupId>
<artifactId>
lombok
</artifactId>
<version>
1.18.8
</version>
</dependency>
<dependency>
<groupId>
com.github.ben-manes.caffeine
</groupId>
<artifactId>
caffeine
</artifactId>
<version>
2.4.0
</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>
maven-compiler-plugin
</artifactId>
<configuration>
<source>
1.8
</source>
<target>
1.8
</target>
<encoding>
${project-sourceEncoding}
</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>
maven-assembly-plugin
</artifactId>
<configuration>
<descriptors>
<descriptor>
src/main/assembly/package.xml
</descriptor>
</descriptors>
<finalName>
datax
</finalName>
</configuration>
<executions>
<execution>
<id>
dwzip
</id>
<phase>
package
</phase>
<goals>
<goal>
single
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
gdbwriter/src/main/assembly/package.xml
0 → 100644
View file @
57c8dd86
<assembly
xmlns=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"
>
<id></id>
<formats>
<format>
dir
</format>
</formats>
<includeBaseDirectory>
false
</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>
src/main/resources
</directory>
<includes>
<include>
plugin.json
</include>
<include>
plugin_job_template.json
</include>
</includes>
<outputDirectory>
plugin/writer/gdbwriter
</outputDirectory>
</fileSet>
<fileSet>
<directory>
target/
</directory>
<includes>
<include>
gdbwriter-0.0.1-SNAPSHOT.jar
</include>
</includes>
<outputDirectory>
plugin/writer/gdbwriter
</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>
false
</useProjectArtifact>
<outputDirectory>
plugin/writer/gdbwriter/libs
</outputDirectory>
<scope>
runtime
</scope>
</dependencySet>
</dependencySets>
</assembly>
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriter.java
0 → 100644
View file @
57c8dd86
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.concurrent.*
;
import
java.util.function.Function
;
import
com.alibaba.datax.common.element.Record
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.plugin.RecordReceiver
;
import
com.alibaba.datax.common.plugin.TaskPluginCollector
;
import
com.alibaba.datax.common.spi.Writer
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.writer.gdbwriter.client.GdbGraphManager
;
import
com.alibaba.datax.plugin.writer.gdbwriter.client.GdbWriterConfig
;
import
com.alibaba.datax.plugin.writer.gdbwriter.mapping.DefaultGdbMapper
;
import
com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRule
;
import
com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRuleFactory
;
import
com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement
;
import
com.alibaba.datax.plugin.writer.gdbwriter.model.GdbGraph
;
import
groovy.lang.Tuple2
;
import
io.netty.util.concurrent.DefaultThreadFactory
;
import
lombok.extern.slf4j.Slf4j
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
public
class
GdbWriter
extends
Writer
{
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
GdbWriter
.
class
);
private
static
Function
<
Record
,
GdbElement
>
mapper
=
null
;
private
static
GdbGraph
globalGraph
=
null
;
private
static
boolean
session
=
false
;
/**
* Job 中的方法仅执行一次,Task 中方法会由框架启动多个 Task 线程并行执行。
* <p/>
* 整个 Writer 执行流程是:
* <pre>
* Job类init-->prepare-->split
*
* Task类init-->prepare-->startWrite-->post-->destroy
* Task类init-->prepare-->startWrite-->post-->destroy
*
* Job类post-->destroy
* </pre>
*/
public
static
class
Job
extends
Writer
.
Job
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
Job
.
class
);
private
Configuration
jobConfig
=
null
;
@Override
public
void
init
()
{
LOG
.
info
(
"GDB datax plugin writer job init begin ..."
);
this
.
jobConfig
=
getPluginJobConf
();
GdbWriterConfig
.
of
(
this
.
jobConfig
);
LOG
.
info
(
"GDB datax plugin writer job init end."
);
/**
* 注意:此方法仅执行一次。
* 最佳实践:通常在这里对用户的配置进行校验:是否缺失必填项?有无错误值?有没有无关配置项?...
* 并给出清晰的报错/警告提示。校验通常建议采用静态工具类进行,以保证本类结构清晰。
*/
}
@Override
public
void
prepare
()
{
/**
* 注意:此方法仅执行一次。
* 最佳实践:如果 Job 中有需要进行数据同步之前的处理,可以在此处完成,如果没有必要则可以直接去掉。
*/
super
.
prepare
();
MappingRule
rule
=
MappingRuleFactory
.
getInstance
().
createV2
(
jobConfig
);
mapper
=
new
DefaultGdbMapper
().
getMapper
(
rule
);
session
=
jobConfig
.
getBool
(
Key
.
SESSION_STATE
,
false
);
/**
* client connect check before task
*/
try
{
globalGraph
=
GdbGraphManager
.
instance
().
getGraph
(
jobConfig
,
false
);
}
catch
(
RuntimeException
e
)
{
throw
DataXException
.
asDataXException
(
GdbWriterErrorCode
.
FAIL_CLIENT_CONNECT
,
e
.
getMessage
());
}
}
@Override
public
List
<
Configuration
>
split
(
int
mandatoryNumber
)
{
/**
* 注意:此方法仅执行一次。
* 最佳实践:通常采用工具静态类完成把 Job 配置切分成多个 Task 配置的工作。
* 这里的 mandatoryNumber 是强制必须切分的份数。
*/
LOG
.
info
(
"split begin..."
);
List
<
Configuration
>
configurationList
=
new
ArrayList
<
Configuration
>();
for
(
int
i
=
0
;
i
<
mandatoryNumber
;
i
++)
{
configurationList
.
add
(
this
.
jobConfig
.
clone
());
}
LOG
.
info
(
"split end..."
);
return
configurationList
;
}
@Override
public
void
post
()
{
/**
* 注意:此方法仅执行一次。
* 最佳实践:如果 Job 中有需要进行数据同步之后的后续处理,可以在此处完成。
*/
globalGraph
.
close
();
}
@Override
public
void
destroy
()
{
/**
* 注意:此方法仅执行一次。
* 最佳实践:通常配合 Job 中的 post() 方法一起完成 Job 的资源释放。
*/
}
}
@Slf4j
public
static
class
Task
extends
Writer
.
Task
{
private
Configuration
taskConfig
;
private
int
failed
=
0
;
private
int
batchRecords
;
private
ExecutorService
submitService
=
null
;
private
GdbGraph
graph
;
@Override
public
void
init
()
{
/**
* 注意:此方法每个 Task 都会执行一次。
* 最佳实践:此处通过对 taskConfig 配置的读取,进而初始化一些资源为 startWrite()做准备。
*/
this
.
taskConfig
=
super
.
getPluginJobConf
();
batchRecords
=
taskConfig
.
getInt
(
Key
.
MAX_RECORDS_IN_BATCH
,
GdbWriterConfig
.
DEFAULT_RECORD_NUM_IN_BATCH
);
submitService
=
new
ThreadPoolExecutor
(
1
,
1
,
0L
,
TimeUnit
.
MILLISECONDS
,
new
LinkedBlockingDeque
<>(),
new
DefaultThreadFactory
(
"submit-dsl"
));
if
(!
session
)
{
graph
=
globalGraph
;
}
else
{
/**
* 分批创建session client,由于服务端groovy编译性能的限制
*/
try
{
Thread
.
sleep
((
getTaskId
()/
10
)*
10000
);
}
catch
(
Exception
e
)
{
// ...
}
graph
=
GdbGraphManager
.
instance
().
getGraph
(
taskConfig
,
session
);
}
}
@Override
public
void
prepare
()
{
/**
* 注意:此方法每个 Task 都会执行一次。
* 最佳实践:如果 Task 中有需要进行数据同步之前的处理,可以在此处完成,如果没有必要则可以直接去掉。
*/
super
.
prepare
();
}
@Override
public
void
startWrite
(
RecordReceiver
recordReceiver
)
{
/**
* 注意:此方法每个 Task 都会执行一次。
* 最佳实践:此处适当封装确保简洁清晰完成数据写入工作。
*/
Record
r
;
Future
<
Boolean
>
future
=
null
;
List
<
Tuple2
<
Record
,
GdbElement
>>
records
=
new
ArrayList
<>(
batchRecords
);
while
((
r
=
recordReceiver
.
getFromReader
())
!=
null
)
{
records
.
add
(
new
Tuple2
<>(
r
,
mapper
.
apply
(
r
)));
if
(
records
.
size
()
>=
batchRecords
)
{
wait4Submit
(
future
);
final
List
<
Tuple2
<
Record
,
GdbElement
>>
batch
=
records
;
future
=
submitService
.
submit
(()
->
batchCommitRecords
(
batch
));
records
=
new
ArrayList
<>(
batchRecords
);
}
}
wait4Submit
(
future
);
if
(!
records
.
isEmpty
())
{
final
List
<
Tuple2
<
Record
,
GdbElement
>>
batch
=
records
;
future
=
submitService
.
submit
(()
->
batchCommitRecords
(
batch
));
wait4Submit
(
future
);
}
}
private
void
wait4Submit
(
Future
<
Boolean
>
future
)
{
if
(
future
==
null
)
{
return
;
}
try
{
future
.
get
();
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
private
boolean
batchCommitRecords
(
final
List
<
Tuple2
<
Record
,
GdbElement
>>
records
)
{
TaskPluginCollector
collector
=
getTaskPluginCollector
();
try
{
List
<
Tuple2
<
Record
,
Exception
>>
errors
=
graph
.
add
(
records
);
errors
.
forEach
(
t
->
collector
.
collectDirtyRecord
(
t
.
getFirst
(),
t
.
getSecond
()));
failed
+=
errors
.
size
();
}
catch
(
Exception
e
)
{
records
.
forEach
(
t
->
collector
.
collectDirtyRecord
(
t
.
getFirst
(),
e
));
failed
+=
records
.
size
();
}
records
.
clear
();
return
true
;
}
@Override
public
void
post
()
{
/**
* 注意:此方法每个 Task 都会执行一次。
* 最佳实践:如果 Task 中有需要进行数据同步之后的后续处理,可以在此处完成。
*/
log
.
info
(
"Task done, dirty record count - {}"
,
failed
);
}
@Override
public
void
destroy
()
{
/**
* 注意:此方法每个 Task 都会执行一次。
* 最佳实践:通常配合Task 中的 post() 方法一起完成 Task 的资源释放。
*/
if
(
session
)
{
graph
.
close
();
}
submitService
.
shutdown
();
}
}
}
\ No newline at end of file
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/GdbWriterErrorCode.java
0 → 100644
View file @
57c8dd86
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
;
import
com.alibaba.datax.common.spi.ErrorCode
;
public
enum
GdbWriterErrorCode
implements
ErrorCode
{
BAD_CONFIG_VALUE
(
"GdbWriter-00"
,
"您配置的值不合法."
),
CONFIG_ITEM_MISS
(
"GdbWriter-01"
,
"您配置项缺失."
),
FAIL_CLIENT_CONNECT
(
"GdbWriter-02"
,
"GDB连接异常."
),;
private
final
String
code
;
private
final
String
description
;
private
GdbWriterErrorCode
(
String
code
,
String
description
)
{
this
.
code
=
code
;
this
.
description
=
description
;
}
@Override
public
String
getCode
()
{
return
this
.
code
;
}
@Override
public
String
getDescription
()
{
return
this
.
description
;
}
@Override
public
String
toString
()
{
return
String
.
format
(
"Code:[%s], Description:[%s]. "
,
this
.
code
,
this
.
description
);
}
}
\ No newline at end of file
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/Key.java
0 → 100644
View file @
57c8dd86
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
;
public
final
class
Key
{
/**
* 此处声明插件用到的需要插件使用者提供的配置项
*/
public
final
static
String
HOST
=
"host"
;
public
final
static
String
PORT
=
"port"
;
public
final
static
String
USERNAME
=
"username"
;
public
static
final
String
PASSWORD
=
"password"
;
/**
* import type and mode
*/
public
static
final
String
IMPORT_TYPE
=
"labelType"
;
public
static
final
String
UPDATE_MODE
=
"writeMode"
;
/**
* label prefix issue
*/
public
static
final
String
ID_TRANS_RULE
=
"idTransRule"
;
public
static
final
String
SRC_ID_TRANS_RULE
=
"srcIdTransRule"
;
public
static
final
String
DST_ID_TRANS_RULE
=
"dstIdTransRule"
;
public
static
final
String
LABEL
=
"label"
;
public
static
final
String
SRC_LABEL
=
"srcLabel"
;
public
static
final
String
DST_LABEL
=
"dstLabel"
;
public
static
final
String
MAPPING
=
"mapping"
;
/**
* column define in Gdb
*/
public
static
final
String
COLUMN
=
"column"
;
public
static
final
String
COLUMN_NAME
=
"name"
;
public
static
final
String
COLUMN_VALUE
=
"value"
;
public
static
final
String
COLUMN_TYPE
=
"type"
;
public
static
final
String
COLUMN_NODE_TYPE
=
"columnType"
;
/**
* Gdb Vertex/Edge elements
*/
public
static
final
String
ID
=
"id"
;
public
static
final
String
FROM
=
"from"
;
public
static
final
String
TO
=
"to"
;
public
static
final
String
PROPERTIES
=
"properties"
;
public
static
final
String
PROP_KEY
=
"name"
;
public
static
final
String
PROP_VALUE
=
"value"
;
public
static
final
String
PROP_TYPE
=
"type"
;
public
static
final
String
PROPERTIES_JSON_STR
=
"propertiesJsonStr"
;
public
static
final
String
MAX_PROPERTIES_BATCH_NUM
=
"maxPropertiesBatchNumber"
;
/**
* session less client configure for connect pool
*/
public
static
final
String
MAX_IN_PROCESS_PER_CONNECTION
=
"maxInProcessPerConnection"
;
public
static
final
String
MAX_CONNECTION_POOL_SIZE
=
"maxConnectionPoolSize"
;
public
static
final
String
MAX_SIMULTANEOUS_USAGE_PER_CONNECTION
=
"maxSimultaneousUsagePerConnection"
;
public
static
final
String
MAX_RECORDS_IN_BATCH
=
"maxRecordsInBatch"
;
public
static
final
String
SESSION_STATE
=
"session"
;
public
static
enum
ImportType
{
/**
* Import vertices
*/
VERTEX
,
/**
* Import edges
*/
EDGE
;
}
public
static
enum
UpdateMode
{
/**
* Insert new records, fail if exists
*/
INSERT
,
/**
* Skip this record if exists
*/
SKIP
,
/**
* Update property of this record if exists
*/
MERGE
;
}
public
static
enum
ColumnType
{
/**
* vertex or edge id
*/
primaryKey
,
/**
* vertex property
*/
vertexProperty
,
/**
* start vertex id of edge
*/
srcPrimaryKey
,
/**
* end vertex id of edge
*/
dstPrimaryKey
,
/**
* edge property
*/
edgeProperty
,
/**
* vertex json style property
*/
vertexJsonProperty
,
/**
* edge json style property
*/
edgeJsonProperty
}
public
static
enum
IdTransRule
{
/**
* vertex or edge id with 'label' prefix
*/
labelPrefix
,
/**
* vertex or edge id raw
*/
none
}
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbGraphManager.java
0 → 100644
View file @
57c8dd86
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
client
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.writer.gdbwriter.model.GdbGraph
;
import
com.alibaba.datax.plugin.writer.gdbwriter.model.ScriptGdbGraph
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* @author jerrywang
*
*/
public
class
GdbGraphManager
implements
AutoCloseable
{
private
static
final
GdbGraphManager
instance
=
new
GdbGraphManager
();
private
List
<
GdbGraph
>
graphs
=
new
ArrayList
<>();
public
static
GdbGraphManager
instance
()
{
return
instance
;
}
public
GdbGraph
getGraph
(
Configuration
config
,
boolean
session
)
{
GdbGraph
graph
=
new
ScriptGdbGraph
(
config
,
session
);
graphs
.
add
(
graph
);
return
graph
;
}
@Override
public
void
close
()
{
for
(
GdbGraph
graph
:
graphs
)
{
graph
.
close
();
}
graphs
.
clear
();
}
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/client/GdbWriterConfig.java
0 → 100644
View file @
57c8dd86
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
client
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.writer.gdbwriter.Key
;
import
static
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
util
.
ConfigHelper
.*;
/**
* @author jerrywang
*
*/
public
class
GdbWriterConfig
{
public
static
final
int
DEFAULT_MAX_IN_PROCESS_PER_CONNECTION
=
4
;
public
static
final
int
DEFAULT_MAX_CONNECTION_POOL_SIZE
=
8
;
public
static
final
int
DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION
=
8
;
public
static
final
int
DEFAULT_BATCH_PROPERTY_NUM
=
30
;
public
static
final
int
DEFAULT_RECORD_NUM_IN_BATCH
=
16
;
private
Configuration
config
;
private
GdbWriterConfig
(
Configuration
config
)
{
this
.
config
=
config
;
validate
();
}
private
void
validate
()
{
assertHasContent
(
config
,
Key
.
HOST
);
assertConfig
(
Key
.
PORT
,
()
->
config
.
getInt
(
Key
.
PORT
)
>
0
);
assertHasContent
(
config
,
Key
.
USERNAME
);
assertHasContent
(
config
,
Key
.
PASSWORD
);
}
public
static
GdbWriterConfig
of
(
Configuration
config
)
{
return
new
GdbWriterConfig
(
config
);
}
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/DefaultGdbMapper.java
0 → 100644
View file @
57c8dd86
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
mapping
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.UUID
;
import
java.util.function.BiConsumer
;
import
java.util.function.Function
;
import
java.util.regex.Matcher
;
import
java.util.regex.Pattern
;
import
com.alibaba.datax.common.element.Record
;
import
com.alibaba.fastjson.JSONArray
;
import
com.alibaba.fastjson.JSONObject
;
import
com.alibaba.datax.plugin.writer.gdbwriter.Key
;
import
com.alibaba.datax.plugin.writer.gdbwriter.model.GdbEdge
;
import
com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement
;
import
com.alibaba.datax.plugin.writer.gdbwriter.model.GdbVertex
;
import
lombok.extern.slf4j.Slf4j
;
import
static
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
Key
.
ImportType
.
VERTEX
;
/**
* @author jerrywang
*
*/
@Slf4j
public
class
DefaultGdbMapper
implements
GdbMapper
{
private
static
final
Pattern
STR_PATTERN
=
Pattern
.
compile
(
"\\$\\{(\\d+)}"
);
private
static
final
Pattern
NORMAL_PATTERN
=
Pattern
.
compile
(
"^\\$\\{(\\d+)}$"
);
@Override
public
Function
<
Record
,
GdbElement
>
getMapper
(
MappingRule
rule
)
{
return
r
->
{
GdbElement
e
=
(
rule
.
getImportType
()
==
VERTEX
)
?
new
GdbVertex
()
:
new
GdbEdge
();
forElement
(
rule
).
accept
(
r
,
e
);
return
e
;
};
}
private
static
BiConsumer
<
Record
,
GdbElement
>
forElement
(
MappingRule
rule
)
{
List
<
BiConsumer
<
Record
,
GdbElement
>>
properties
=
new
ArrayList
<>();
for
(
MappingRule
.
PropertyMappingRule
propRule
:
rule
.
getProperties
())
{
Function
<
Record
,
String
>
keyFunc
=
forStrColumn
(
propRule
.
getKey
());
if
(
propRule
.
getValueType
()
==
ValueType
.
STRING
)
{
final
Function
<
Record
,
String
>
valueFunc
=
forStrColumn
(
propRule
.
getValue
());
properties
.
add
((
r
,
e
)
->
{
String
k
=
keyFunc
.
apply
(
r
);
String
v
=
valueFunc
.
apply
(
r
);
if
(
k
!=
null
&&
v
!=
null
)
{
e
.
getProperties
().
put
(
k
,
v
);
}
});
}
else
{
final
Function
<
Record
,
Object
>
valueFunc
=
forObjColumn
(
propRule
.
getValue
(),
propRule
.
getValueType
());
properties
.
add
((
r
,
e
)
->
{
String
k
=
keyFunc
.
apply
(
r
);
Object
v
=
valueFunc
.
apply
(
r
);
if
(
k
!=
null
&&
v
!=
null
)
{
e
.
getProperties
().
put
(
k
,
v
);
}
});
}
}
if
(
rule
.
getPropertiesJsonStr
()
!=
null
)
{
Function
<
Record
,
String
>
jsonFunc
=
forStrColumn
(
rule
.
getPropertiesJsonStr
());
properties
.
add
((
r
,
e
)
->
{
String
propertiesStr
=
jsonFunc
.
apply
(
r
);
JSONObject
root
=
(
JSONObject
)
JSONObject
.
parse
(
propertiesStr
);
JSONArray
propertiesList
=
root
.
getJSONArray
(
"properties"
);
for
(
Object
object
:
propertiesList
)
{
JSONObject
jsonObject
=
(
JSONObject
)
object
;
String
key
=
jsonObject
.
getString
(
"k"
);
String
name
=
jsonObject
.
getString
(
"v"
);
String
type
=
jsonObject
.
getString
(
"t"
);
if
(
key
==
null
||
name
==
null
)
{
continue
;
}
addToProperties
(
e
,
key
,
name
,
type
);
}
});
}
BiConsumer
<
Record
,
GdbElement
>
ret
=
(
r
,
e
)
->
{
String
label
=
forStrColumn
(
rule
.
getLabel
()).
apply
(
r
);
String
id
=
forStrColumn
(
rule
.
getId
()).
apply
(
r
);
if
(
rule
.
getImportType
()
==
Key
.
ImportType
.
EDGE
)
{
String
to
=
forStrColumn
(
rule
.
getTo
()).
apply
(
r
);
String
from
=
forStrColumn
(
rule
.
getFrom
()).
apply
(
r
);
if
(
to
==
null
||
from
==
null
)
{
log
.
error
(
"invalid record to: {} , from: {}"
,
to
,
from
);
throw
new
IllegalArgumentException
(
"to or from missed in edge"
);
}
((
GdbEdge
)
e
).
setTo
(
to
);
((
GdbEdge
)
e
).
setFrom
(
from
);
// generate UUID for edge
if
(
id
==
null
)
{
id
=
UUID
.
randomUUID
().
toString
();
}
}
if
(
id
==
null
||
label
==
null
)
{
log
.
error
(
"invalid record id: {} , label: {}"
,
id
,
label
);
throw
new
IllegalArgumentException
(
"id or label missed"
);
}
e
.
setId
(
id
);
e
.
setLabel
(
label
);
properties
.
forEach
(
p
->
p
.
accept
(
r
,
e
));
};
return
ret
;
}
static
Function
<
Record
,
Object
>
forObjColumn
(
String
rule
,
ValueType
type
)
{
Matcher
m
=
NORMAL_PATTERN
.
matcher
(
rule
);
if
(
m
.
matches
())
{
int
index
=
Integer
.
valueOf
(
m
.
group
(
1
));
return
r
->
type
.
applyColumn
(
r
.
getColumn
(
index
));
}
else
{
return
r
->
type
.
fromStrFunc
(
rule
);
}
}
static
Function
<
Record
,
String
>
forStrColumn
(
String
rule
)
{
List
<
BiConsumer
<
StringBuilder
,
Record
>>
list
=
new
ArrayList
<>();
Matcher
m
=
STR_PATTERN
.
matcher
(
rule
);
int
last
=
0
;
while
(
m
.
find
())
{
String
index
=
m
.
group
(
1
);
// as simple integer index.
int
i
=
Integer
.
parseInt
(
index
);
final
int
tmp
=
last
;
final
int
start
=
m
.
start
();
list
.
add
((
sb
,
record
)
->
{
sb
.
append
(
rule
.
subSequence
(
tmp
,
start
));
if
(
record
.
getColumn
(
i
)
!=
null
&&
record
.
getColumn
(
i
).
getByteSize
()
>
0
)
{
sb
.
append
(
record
.
getColumn
(
i
).
asString
());
}
});
last
=
m
.
end
();
}
final
int
tmp
=
last
;
list
.
add
((
sb
,
record
)
->
{
sb
.
append
(
rule
.
subSequence
(
tmp
,
rule
.
length
()));
});
return
r
->
{
StringBuilder
sb
=
new
StringBuilder
();
list
.
forEach
(
c
->
c
.
accept
(
sb
,
r
));
String
res
=
sb
.
toString
();
return
res
.
isEmpty
()
?
null
:
res
;
};
}
static
boolean
addToProperties
(
GdbElement
e
,
String
key
,
String
value
,
String
type
)
{
ValueType
valueType
=
ValueType
.
fromShortName
(
type
);
if
(
valueType
==
ValueType
.
STRING
)
{
e
.
getProperties
().
put
(
key
,
value
);
}
else
if
(
valueType
==
ValueType
.
INT
)
{
e
.
getProperties
().
put
(
key
,
Integer
.
valueOf
(
value
));
}
else
if
(
valueType
==
ValueType
.
LONG
)
{
e
.
getProperties
().
put
(
key
,
Long
.
valueOf
(
value
));
}
else
if
(
valueType
==
ValueType
.
DOUBLE
)
{
e
.
getProperties
().
put
(
key
,
Double
.
valueOf
(
value
));
}
else
if
(
valueType
==
ValueType
.
FLOAT
)
{
e
.
getProperties
().
put
(
key
,
Float
.
valueOf
(
value
));
}
else
if
(
valueType
==
ValueType
.
BOOLEAN
)
{
e
.
getProperties
().
put
(
key
,
Boolean
.
valueOf
(
value
));
}
else
{
log
.
error
(
"invalid property key {}, value {}, type {}"
,
key
,
value
,
type
);
return
false
;
}
return
true
;
}
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/GdbMapper.java
0 → 100644
View file @
57c8dd86
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
mapping
;
import
java.util.function.Function
;
import
com.alibaba.datax.common.element.Record
;
import
com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement
;
/**
* @author jerrywang
*
*/
public
interface
GdbMapper
{
Function
<
Record
,
GdbElement
>
getMapper
(
MappingRule
rule
);
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRule.java
0 → 100644
View file @
57c8dd86
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
mapping
;
import
java.util.ArrayList
;
import
java.util.List
;
import
com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType
;
import
lombok.Data
;
/**
* @author jerrywang
*
*/
@Data
public
class
MappingRule
{
private
String
id
=
null
;
private
String
label
=
null
;
private
ImportType
importType
=
null
;
private
String
from
=
null
;
private
String
to
=
null
;
private
List
<
PropertyMappingRule
>
properties
=
new
ArrayList
<>();
private
String
propertiesJsonStr
=
null
;
@Data
public
static
class
PropertyMappingRule
{
private
String
key
=
null
;
private
String
value
=
null
;
private
ValueType
valueType
=
null
;
}
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/MappingRuleFactory.java
0 → 100644
View file @
57c8dd86
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
mapping
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.writer.gdbwriter.GdbWriterErrorCode
;
import
com.alibaba.datax.plugin.writer.gdbwriter.Key
;
import
com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType
;
import
com.alibaba.datax.plugin.writer.gdbwriter.Key.IdTransRule
;
import
com.alibaba.datax.plugin.writer.gdbwriter.Key.ColumnType
;
import
com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRule.PropertyMappingRule
;
import
com.alibaba.datax.plugin.writer.gdbwriter.util.ConfigHelper
;
import
lombok.extern.slf4j.Slf4j
;
import
java.util.List
;
/**
* @author jerrywang
*
*/
@Slf4j
public
class
MappingRuleFactory
{
private
static
final
MappingRuleFactory
instance
=
new
MappingRuleFactory
();
public
static
final
MappingRuleFactory
getInstance
()
{
return
instance
;
}
@Deprecated
public
MappingRule
create
(
Configuration
config
,
ImportType
type
)
{
MappingRule
rule
=
new
MappingRule
();
rule
.
setId
(
config
.
getString
(
Key
.
ID
));
rule
.
setLabel
(
config
.
getString
(
Key
.
LABEL
));
if
(
type
==
ImportType
.
EDGE
)
{
rule
.
setFrom
(
config
.
getString
(
Key
.
FROM
));
rule
.
setTo
(
config
.
getString
(
Key
.
TO
));
}
rule
.
setImportType
(
type
);
List
<
Configuration
>
configurations
=
config
.
getListConfiguration
(
Key
.
PROPERTIES
);
if
(
configurations
!=
null
)
{
for
(
Configuration
prop
:
config
.
getListConfiguration
(
Key
.
PROPERTIES
))
{
PropertyMappingRule
propRule
=
new
PropertyMappingRule
();
propRule
.
setKey
(
prop
.
getString
(
Key
.
PROP_KEY
));
propRule
.
setValue
(
prop
.
getString
(
Key
.
PROP_VALUE
));
propRule
.
setValueType
(
ValueType
.
fromShortName
(
prop
.
getString
(
Key
.
PROP_TYPE
).
toLowerCase
()));
rule
.
getProperties
().
add
(
propRule
);
}
}
String
propertiesJsonStr
=
config
.
getString
(
Key
.
PROPERTIES_JSON_STR
,
null
);
if
(
propertiesJsonStr
!=
null
)
{
rule
.
setPropertiesJsonStr
(
propertiesJsonStr
);
}
return
rule
;
}
public
MappingRule
createV2
(
Configuration
config
)
{
try
{
ImportType
type
=
ImportType
.
valueOf
(
config
.
getString
(
Key
.
IMPORT_TYPE
));
return
createV2
(
config
,
type
);
}
catch
(
NullPointerException
e
)
{
throw
DataXException
.
asDataXException
(
GdbWriterErrorCode
.
CONFIG_ITEM_MISS
,
Key
.
IMPORT_TYPE
);
}
catch
(
IllegalArgumentException
e
)
{
throw
DataXException
.
asDataXException
(
GdbWriterErrorCode
.
BAD_CONFIG_VALUE
,
Key
.
IMPORT_TYPE
);
}
}
public
MappingRule
createV2
(
Configuration
config
,
ImportType
type
)
{
MappingRule
rule
=
new
MappingRule
();
ConfigHelper
.
assertHasContent
(
config
,
Key
.
LABEL
);
rule
.
setLabel
(
config
.
getString
(
Key
.
LABEL
));
rule
.
setImportType
(
type
);
IdTransRule
srcTransRule
=
IdTransRule
.
none
;
IdTransRule
dstTransRule
=
IdTransRule
.
none
;
if
(
type
==
ImportType
.
EDGE
)
{
ConfigHelper
.
assertHasContent
(
config
,
Key
.
SRC_ID_TRANS_RULE
);
ConfigHelper
.
assertHasContent
(
config
,
Key
.
DST_ID_TRANS_RULE
);
srcTransRule
=
IdTransRule
.
valueOf
(
config
.
getString
(
Key
.
SRC_ID_TRANS_RULE
));
dstTransRule
=
IdTransRule
.
valueOf
(
config
.
getString
(
Key
.
DST_ID_TRANS_RULE
));
if
(
srcTransRule
==
IdTransRule
.
labelPrefix
)
{
ConfigHelper
.
assertHasContent
(
config
,
Key
.
SRC_LABEL
);
}
if
(
dstTransRule
==
IdTransRule
.
labelPrefix
)
{
ConfigHelper
.
assertHasContent
(
config
,
Key
.
DST_LABEL
);
}
}
ConfigHelper
.
assertHasContent
(
config
,
Key
.
ID_TRANS_RULE
);
IdTransRule
transRule
=
IdTransRule
.
valueOf
(
config
.
getString
(
Key
.
ID_TRANS_RULE
));
List
<
Configuration
>
configurationList
=
config
.
getListConfiguration
(
Key
.
COLUMN
);
ConfigHelper
.
assertConfig
(
Key
.
COLUMN
,
()
->
(
configurationList
!=
null
&&
!
configurationList
.
isEmpty
()));
for
(
Configuration
column
:
configurationList
)
{
ConfigHelper
.
assertHasContent
(
column
,
Key
.
COLUMN_NAME
);
ConfigHelper
.
assertHasContent
(
column
,
Key
.
COLUMN_VALUE
);
ConfigHelper
.
assertHasContent
(
column
,
Key
.
COLUMN_TYPE
);
ConfigHelper
.
assertHasContent
(
column
,
Key
.
COLUMN_NODE_TYPE
);
String
columnValue
=
column
.
getString
(
Key
.
COLUMN_VALUE
);
ColumnType
columnType
=
ColumnType
.
valueOf
(
column
.
getString
(
Key
.
COLUMN_NODE_TYPE
));
if
(
columnValue
==
null
||
columnValue
.
isEmpty
())
{
// only allow edge empty id
ConfigHelper
.
assertConfig
(
"empty column value"
,
()
->
(
type
==
ImportType
.
EDGE
&&
columnType
==
ColumnType
.
primaryKey
));
}
if
(
columnType
==
ColumnType
.
primaryKey
)
{
ValueType
propType
=
ValueType
.
fromShortName
(
column
.
getString
(
Key
.
COLUMN_TYPE
));
ConfigHelper
.
assertConfig
(
"only string is allowed in primary key"
,
()
->
(
propType
==
ValueType
.
STRING
));
if
(
transRule
==
IdTransRule
.
labelPrefix
)
{
rule
.
setId
(
config
.
getString
(
Key
.
LABEL
)
+
columnValue
);
}
else
{
rule
.
setId
(
columnValue
);
}
}
else
if
(
columnType
==
ColumnType
.
edgeJsonProperty
||
columnType
==
ColumnType
.
vertexJsonProperty
)
{
// only support one json property in column
ConfigHelper
.
assertConfig
(
"multi JsonProperty"
,
()
->
(
rule
.
getPropertiesJsonStr
()
==
null
));
rule
.
setPropertiesJsonStr
(
columnValue
);
}
else
if
(
columnType
==
ColumnType
.
vertexProperty
||
columnType
==
ColumnType
.
edgeProperty
)
{
PropertyMappingRule
propertyMappingRule
=
new
PropertyMappingRule
();
propertyMappingRule
.
setKey
(
column
.
getString
(
Key
.
COLUMN_NAME
));
propertyMappingRule
.
setValue
(
columnValue
);
ValueType
propType
=
ValueType
.
fromShortName
(
column
.
getString
(
Key
.
COLUMN_TYPE
));
ConfigHelper
.
assertConfig
(
"unsupported property type"
,
()
->
propType
!=
null
);
propertyMappingRule
.
setValueType
(
propType
);
rule
.
getProperties
().
add
(
propertyMappingRule
);
}
else
if
(
columnType
==
ColumnType
.
srcPrimaryKey
)
{
if
(
type
!=
ImportType
.
EDGE
)
{
continue
;
}
ValueType
propType
=
ValueType
.
fromShortName
(
column
.
getString
(
Key
.
COLUMN_TYPE
));
ConfigHelper
.
assertConfig
(
"only string is allowed in primary key"
,
()
->
(
propType
==
ValueType
.
STRING
));
if
(
srcTransRule
==
IdTransRule
.
labelPrefix
)
{
rule
.
setFrom
(
config
.
getString
(
Key
.
SRC_LABEL
)
+
columnValue
);
}
else
{
rule
.
setFrom
(
columnValue
);
}
}
else
if
(
columnType
==
ColumnType
.
dstPrimaryKey
)
{
if
(
type
!=
ImportType
.
EDGE
)
{
continue
;
}
ValueType
propType
=
ValueType
.
fromShortName
(
column
.
getString
(
Key
.
COLUMN_TYPE
));
ConfigHelper
.
assertConfig
(
"only string is allowed in primary key"
,
()
->
(
propType
==
ValueType
.
STRING
));
if
(
dstTransRule
==
IdTransRule
.
labelPrefix
)
{
rule
.
setTo
(
config
.
getString
(
Key
.
DST_LABEL
)
+
columnValue
);
}
else
{
rule
.
setTo
(
columnValue
);
}
}
}
if
(
rule
.
getImportType
()
==
ImportType
.
EDGE
)
{
if
(
rule
.
getId
()
==
null
)
{
rule
.
setId
(
""
);
log
.
info
(
"edge id is missed, uuid be default"
);
}
ConfigHelper
.
assertConfig
(
"to needed in edge"
,
()
->
(
rule
.
getTo
()
!=
null
));
ConfigHelper
.
assertConfig
(
"from needed in edge"
,
()
->
(
rule
.
getFrom
()
!=
null
));
}
ConfigHelper
.
assertConfig
(
"id needed"
,
()
->
(
rule
.
getId
()
!=
null
));
return
rule
;
}
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/mapping/ValueType.java
0 → 100644
View file @
57c8dd86
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
mapping
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.function.Function
;
import
com.alibaba.datax.common.element.Column
;
import
lombok.extern.slf4j.Slf4j
;
/**
* @author jerrywang
*
*/
@Slf4j
public
enum
ValueType
{
INT
(
Integer
.
class
,
"int"
,
Column:
:
asLong
,
Integer:
:
valueOf
),
LONG
(
Long
.
class
,
"long"
,
Column:
:
asLong
,
Long:
:
valueOf
),
DOUBLE
(
Double
.
class
,
"double"
,
Column:
:
asDouble
,
Double:
:
valueOf
),
FLOAT
(
Float
.
class
,
"float"
,
Column:
:
asDouble
,
Float:
:
valueOf
),
BOOLEAN
(
Boolean
.
class
,
"boolean"
,
Column:
:
asBoolean
,
Boolean:
:
valueOf
),
STRING
(
String
.
class
,
"string"
,
Column:
:
asString
,
String:
:
valueOf
);
private
Class
<?>
type
=
null
;
private
String
shortName
=
null
;
private
Function
<
Column
,
Object
>
columnFunc
=
null
;
private
Function
<
String
,
Object
>
fromStrFunc
=
null
;
private
ValueType
(
Class
<?>
type
,
String
name
,
Function
<
Column
,
Object
>
columnFunc
,
Function
<
String
,
Object
>
fromStrFunc
)
{
this
.
type
=
type
;
this
.
shortName
=
name
;
this
.
columnFunc
=
columnFunc
;
this
.
fromStrFunc
=
fromStrFunc
;
ValueTypeHolder
.
shortName2type
.
put
(
name
,
this
);
}
public
static
ValueType
fromShortName
(
String
name
)
{
return
ValueTypeHolder
.
shortName2type
.
get
(
name
);
}
public
Class
<?>
type
()
{
return
this
.
type
;
}
public
String
shortName
()
{
return
this
.
shortName
;
}
public
Object
applyColumn
(
Column
column
)
{
try
{
if
(
column
==
null
)
{
return
null
;
}
return
columnFunc
.
apply
(
column
);
}
catch
(
Exception
e
)
{
log
.
error
(
"applyColumn error {}, column {}"
,
e
.
toString
(),
column
);
throw
e
;
}
}
public
Object
fromStrFunc
(
String
str
)
{
return
fromStrFunc
.
apply
(
str
);
}
private
static
class
ValueTypeHolder
{
private
static
Map
<
String
,
ValueType
>
shortName2type
=
new
HashMap
<>();
}
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/AbstractGdbGraph.java
0 → 100644
View file @
57c8dd86
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
model
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.writer.gdbwriter.Key
;
import
com.alibaba.datax.plugin.writer.gdbwriter.client.GdbWriterConfig
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.tinkerpop.gremlin.driver.Client
;
import
org.apache.tinkerpop.gremlin.driver.Cluster
;
import
org.apache.tinkerpop.gremlin.driver.RequestOptions
;
import
org.apache.tinkerpop.gremlin.driver.ResultSet
;
import
org.apache.tinkerpop.gremlin.driver.ser.Serializers
;
import
java.util.Map
;
import
java.util.UUID
;
import
java.util.concurrent.TimeUnit
;
/**
* @author jerrywang
*
*/
@Slf4j
public
abstract
class
AbstractGdbGraph
implements
GdbGraph
{
private
final
static
int
DEFAULT_TIMEOUT
=
30000
;
protected
Client
client
=
null
;
protected
Key
.
UpdateMode
updateMode
=
Key
.
UpdateMode
.
INSERT
;
protected
int
propertiesBatchNum
=
GdbWriterConfig
.
DEFAULT_BATCH_PROPERTY_NUM
;
protected
boolean
session
=
false
;
protected
AbstractGdbGraph
()
{}
protected
AbstractGdbGraph
(
Configuration
config
,
boolean
session
)
{
initClient
(
config
,
session
);
}
protected
void
initClient
(
Configuration
config
,
boolean
session
)
{
updateMode
=
Key
.
UpdateMode
.
valueOf
(
config
.
getString
(
Key
.
UPDATE_MODE
,
"INSERT"
));
log
.
info
(
"init graphdb client"
);
String
host
=
config
.
getString
(
Key
.
HOST
);
int
port
=
config
.
getInt
(
Key
.
PORT
);
String
username
=
config
.
getString
(
Key
.
USERNAME
);
String
password
=
config
.
getString
(
Key
.
PASSWORD
);
int
maxDepthPerConnection
=
config
.
getInt
(
Key
.
MAX_IN_PROCESS_PER_CONNECTION
,
GdbWriterConfig
.
DEFAULT_MAX_IN_PROCESS_PER_CONNECTION
);
int
maxConnectionPoolSize
=
config
.
getInt
(
Key
.
MAX_CONNECTION_POOL_SIZE
,
GdbWriterConfig
.
DEFAULT_MAX_CONNECTION_POOL_SIZE
);
int
maxSimultaneousUsagePerConnection
=
config
.
getInt
(
Key
.
MAX_SIMULTANEOUS_USAGE_PER_CONNECTION
,
GdbWriterConfig
.
DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION
);
this
.
session
=
session
;
if
(
this
.
session
)
{
maxConnectionPoolSize
=
GdbWriterConfig
.
DEFAULT_MAX_CONNECTION_POOL_SIZE
;
maxDepthPerConnection
=
GdbWriterConfig
.
DEFAULT_MAX_IN_PROCESS_PER_CONNECTION
;
maxSimultaneousUsagePerConnection
=
GdbWriterConfig
.
DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION
;
}
try
{
Cluster
cluster
=
Cluster
.
build
(
host
).
port
(
port
).
credentials
(
username
,
password
)
.
serializer
(
Serializers
.
GRAPHBINARY_V1D0
)
.
maxContentLength
(
1048576
)
.
maxInProcessPerConnection
(
maxDepthPerConnection
)
.
minInProcessPerConnection
(
0
)
.
maxConnectionPoolSize
(
maxConnectionPoolSize
)
.
minConnectionPoolSize
(
maxConnectionPoolSize
)
.
maxSimultaneousUsagePerConnection
(
maxSimultaneousUsagePerConnection
)
.
resultIterationBatchSize
(
64
)
.
create
();
client
=
session
?
cluster
.
connect
(
UUID
.
randomUUID
().
toString
()).
init
()
:
cluster
.
connect
().
init
();
warmClient
(
maxConnectionPoolSize
*
maxDepthPerConnection
);
}
catch
(
RuntimeException
e
)
{
log
.
error
(
"Failed to connect to GDB {}:{}, due to {}"
,
host
,
port
,
e
);
throw
e
;
}
propertiesBatchNum
=
config
.
getInt
(
Key
.
MAX_PROPERTIES_BATCH_NUM
,
GdbWriterConfig
.
DEFAULT_BATCH_PROPERTY_NUM
);
}
/**
* @param dsl
* @param parameters
*/
protected
void
runInternal
(
String
dsl
,
final
Map
<
String
,
Object
>
parameters
)
throws
Exception
{
RequestOptions
.
Builder
options
=
RequestOptions
.
build
().
timeout
(
DEFAULT_TIMEOUT
);
if
(
parameters
!=
null
&&
!
parameters
.
isEmpty
())
{
parameters
.
forEach
(
options:
:
addParameter
);
}
ResultSet
results
=
client
.
submitAsync
(
dsl
,
options
.
create
()).
get
(
DEFAULT_TIMEOUT
,
TimeUnit
.
MILLISECONDS
);
results
.
all
().
get
(
DEFAULT_TIMEOUT
+
1000
,
TimeUnit
.
MILLISECONDS
);
}
void
beginTx
()
{
if
(!
session
)
{
return
;
}
String
dsl
=
"g.tx().open()"
;
client
.
submit
(
dsl
).
all
().
join
();
}
void
doCommit
()
{
if
(!
session
)
{
return
;
}
try
{
String
dsl
=
"g.tx().commit()"
;
client
.
submit
(
dsl
).
all
().
join
();
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
e
);
}
}
void
doRollback
()
{
if
(!
session
)
{
return
;
}
String
dsl
=
"g.tx().rollback()"
;
client
.
submit
(
dsl
).
all
().
join
();
}
private
void
warmClient
(
int
num
)
{
try
{
beginTx
();
runInternal
(
"g.V('test')"
,
null
);
doCommit
();
log
.
info
(
"warm graphdb client over"
);
}
catch
(
Exception
e
)
{
log
.
error
(
"warmClient error"
);
doRollback
();
throw
new
RuntimeException
(
e
);
}
}
@Override
public
void
close
()
{
if
(
client
!=
null
)
{
log
.
info
(
"close graphdb client"
);
client
.
close
();
}
}
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbEdge.java
0 → 100644
View file @
57c8dd86
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
model
;
import
lombok.Data
;
import
lombok.EqualsAndHashCode
;
import
lombok.ToString
;
/**
* @author jerrywang
*
*/
@Data
@EqualsAndHashCode
(
callSuper
=
true
)
@ToString
(
callSuper
=
true
)
public
class
GdbEdge
extends
GdbElement
{
private
String
from
=
null
;
private
String
to
=
null
;
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbElement.java
0 → 100644
View file @
57c8dd86
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
model
;
import
java.util.HashMap
;
import
java.util.Map
;
import
lombok.Data
;
/**
* @author jerrywang
*
*/
@Data
public
class
GdbElement
{
String
id
=
null
;
String
label
=
null
;
Map
<
String
,
Object
>
properties
=
new
HashMap
<>();
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbGraph.java
0 → 100644
View file @
57c8dd86
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
model
;
import
com.alibaba.datax.common.element.Record
;
import
groovy.lang.Tuple2
;
import
java.util.List
;
/**
* @author jerrywang
*
*/
public
interface
GdbGraph
extends
AutoCloseable
{
List
<
Tuple2
<
Record
,
Exception
>>
add
(
List
<
Tuple2
<
Record
,
GdbElement
>>
records
);
@Override
void
close
();
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/GdbVertex.java
0 → 100644
View file @
57c8dd86
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
model
;
import
lombok.EqualsAndHashCode
;
import
lombok.ToString
;
/**
* @author jerrywang
*
*/
@EqualsAndHashCode
(
callSuper
=
true
)
@ToString
(
callSuper
=
true
)
public
class
GdbVertex
extends
GdbElement
{
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/model/ScriptGdbGraph.java
0 → 100644
View file @
57c8dd86
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
model
;
import
java.util.*
;
import
com.alibaba.datax.common.element.Record
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.writer.gdbwriter.Key
;
import
com.alibaba.datax.plugin.writer.gdbwriter.util.GdbDuplicateIdException
;
import
com.github.benmanes.caffeine.cache.Cache
;
import
com.github.benmanes.caffeine.cache.Caffeine
;
import
groovy.lang.Tuple2
;
import
lombok.extern.slf4j.Slf4j
;
/**
* @author jerrywang
*
*/
@Slf4j
public
class
ScriptGdbGraph
extends
AbstractGdbGraph
{
private
static
final
String
VAR_PREFIX
=
"GDB___"
;
private
static
final
String
VAR_ID
=
VAR_PREFIX
+
"id"
;
private
static
final
String
VAR_LABEL
=
VAR_PREFIX
+
"label"
;
private
static
final
String
VAR_FROM
=
VAR_PREFIX
+
"from"
;
private
static
final
String
VAR_TO
=
VAR_PREFIX
+
"to"
;
private
static
final
String
VAR_PROP_KEY
=
VAR_PREFIX
+
"PK"
;
private
static
final
String
VAR_PROP_VALUE
=
VAR_PREFIX
+
"PV"
;
private
static
final
String
ADD_V_START
=
"g.addV("
+
VAR_LABEL
+
").property(id, "
+
VAR_ID
+
")"
;
private
static
final
String
ADD_E_START
=
"g.addE("
+
VAR_LABEL
+
").property(id, "
+
VAR_ID
+
").from(V("
+
VAR_FROM
+
")).to(V("
+
VAR_TO
+
"))"
;
private
static
final
String
UPDATE_V_START
=
"g.V("
+
VAR_ID
+
")"
;
private
static
final
String
UPDATE_E_START
=
"g.E("
+
VAR_ID
+
")"
;
private
Cache
<
Integer
,
String
>
propertyCache
;
private
Random
random
;
public
ScriptGdbGraph
()
{
propertyCache
=
Caffeine
.
newBuilder
().
maximumSize
(
1024
).
build
();
random
=
new
Random
();
}
public
ScriptGdbGraph
(
Configuration
config
,
boolean
session
)
{
super
(
config
,
session
);
propertyCache
=
Caffeine
.
newBuilder
().
maximumSize
(
1024
).
build
();
random
=
new
Random
();
log
.
info
(
"Init as ScriptGdbGraph."
);
}
/**
* Apply list of {@link GdbElement} to GDB, return the failed records
* @param records list of element to apply
* @return
*/
@Override
public
List
<
Tuple2
<
Record
,
Exception
>>
add
(
List
<
Tuple2
<
Record
,
GdbElement
>>
records
)
{
List
<
Tuple2
<
Record
,
Exception
>>
errors
=
new
ArrayList
<>();
try
{
beginTx
();
for
(
Tuple2
<
Record
,
GdbElement
>
elementTuple2
:
records
)
{
try
{
addInternal
(
elementTuple2
.
getSecond
());
}
catch
(
Exception
e
)
{
errors
.
add
(
new
Tuple2
<>(
elementTuple2
.
getFirst
(),
e
));
}
}
doCommit
();
}
catch
(
Exception
ex
)
{
doRollback
();
throw
new
RuntimeException
(
ex
);
}
return
errors
;
}
private
void
addInternal
(
GdbElement
element
)
{
try
{
addInternal
(
element
,
false
);
}
catch
(
GdbDuplicateIdException
e
)
{
if
(
updateMode
==
Key
.
UpdateMode
.
SKIP
)
{
log
.
debug
(
"Skip duplicate id {}"
,
element
.
getId
());
}
else
if
(
updateMode
==
Key
.
UpdateMode
.
INSERT
)
{
throw
new
RuntimeException
(
e
);
}
else
if
(
updateMode
==
Key
.
UpdateMode
.
MERGE
)
{
if
(
element
.
getProperties
().
isEmpty
())
{
return
;
}
try
{
addInternal
(
element
,
true
);
}
catch
(
GdbDuplicateIdException
e1
)
{
log
.
error
(
"duplicate id {} while update..."
,
element
.
getId
());
throw
new
RuntimeException
(
e1
);
}
}
}
}
private
void
addInternal
(
GdbElement
element
,
boolean
update
)
throws
GdbDuplicateIdException
{
Map
<
String
,
Object
>
params
=
element
.
getProperties
();
Map
<
String
,
Object
>
subParams
=
new
HashMap
<>(
propertiesBatchNum
);
boolean
firstAdd
=
!
update
;
boolean
isVertex
=
(
element
instanceof
GdbVertex
);
for
(
Map
.
Entry
<
String
,
Object
>
entry
:
params
.
entrySet
())
{
subParams
.
put
(
entry
.
getKey
(),
entry
.
getValue
());
if
(
subParams
.
size
()
>=
propertiesBatchNum
)
{
setGraphDbElement
(
element
,
subParams
,
isVertex
,
firstAdd
);
firstAdd
=
false
;
subParams
.
clear
();
}
}
if
(!
subParams
.
isEmpty
()
||
firstAdd
)
{
setGraphDbElement
(
element
,
subParams
,
isVertex
,
firstAdd
);
}
}
private
Tuple2
<
String
,
Map
<
String
,
Object
>>
buildDsl
(
GdbElement
element
,
Map
<
String
,
Object
>
properties
,
boolean
isVertex
,
boolean
firstAdd
)
{
Map
<
String
,
Object
>
params
=
new
HashMap
<>();
String
dslPropertyPart
=
propertyCache
.
get
(
properties
.
size
(),
keys
->
{
final
StringBuilder
sb
=
new
StringBuilder
();
for
(
int
i
=
0
;
i
<
keys
;
i
++)
{
sb
.
append
(
".property("
).
append
(
VAR_PROP_KEY
).
append
(
i
)
.
append
(
", "
).
append
(
VAR_PROP_VALUE
).
append
(
i
).
append
(
")"
);
}
return
sb
.
toString
();
});
String
dsl
;
if
(
isVertex
)
{
dsl
=
(
firstAdd
?
ADD_V_START
:
UPDATE_V_START
)
+
dslPropertyPart
;
}
else
{
dsl
=
(
firstAdd
?
ADD_E_START
:
UPDATE_E_START
)
+
dslPropertyPart
;
if
(
firstAdd
)
{
params
.
put
(
VAR_FROM
,
((
GdbEdge
)
element
).
getFrom
());
params
.
put
(
VAR_TO
,
((
GdbEdge
)
element
).
getTo
());
}
}
int
index
=
0
;
for
(
Map
.
Entry
<
String
,
Object
>
entry
:
properties
.
entrySet
())
{
params
.
put
(
VAR_PROP_KEY
+
index
,
entry
.
getKey
());
params
.
put
(
VAR_PROP_VALUE
+
index
,
entry
.
getValue
());
index
++;
}
if
(
firstAdd
)
{
params
.
put
(
VAR_LABEL
,
element
.
getLabel
());
}
params
.
put
(
VAR_ID
,
element
.
getId
());
return
new
Tuple2
<>(
dsl
,
params
);
}
private
void
setGraphDbElement
(
GdbElement
element
,
Map
<
String
,
Object
>
properties
,
boolean
isVertex
,
boolean
firstAdd
)
throws
GdbDuplicateIdException
{
int
retry
=
10
;
int
idleTime
=
random
.
nextInt
(
10
)
+
10
;
Tuple2
<
String
,
Map
<
String
,
Object
>>
elementDsl
=
buildDsl
(
element
,
properties
,
isVertex
,
firstAdd
);
while
(
retry
>
0
)
{
try
{
runInternal
(
elementDsl
.
getFirst
(),
elementDsl
.
getSecond
());
log
.
debug
(
"AddElement {}"
,
element
.
getId
());
return
;
}
catch
(
Exception
e
)
{
String
cause
=
e
.
getCause
()
==
null
?
""
:
e
.
getCause
().
toString
();
if
(
cause
.
contains
(
"rejected from"
))
{
retry
--;
try
{
Thread
.
sleep
(
idleTime
);
}
catch
(
InterruptedException
e1
)
{
// ...
}
idleTime
=
Math
.
min
(
idleTime
*
2
,
2000
);
continue
;
}
else
if
(
firstAdd
&&
cause
.
contains
(
"GraphDB id exists"
))
{
throw
new
GdbDuplicateIdException
(
e
);
}
log
.
error
(
"Add Failed id {}, dsl {}, params {}, e {}"
,
element
.
getId
(),
elementDsl
.
getFirst
(),
elementDsl
.
getSecond
(),
e
);
throw
new
RuntimeException
(
e
);
}
}
log
.
error
(
"Add Failed id {}, dsl {}, params {}"
,
element
.
getId
(),
elementDsl
.
getFirst
(),
elementDsl
.
getSecond
());
throw
new
RuntimeException
(
"failed to queue new element to server"
);
}
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/ConfigHelper.java
0 → 100644
View file @
57c8dd86
/**
*
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
util
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
java.util.function.Supplier
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.writer.gdbwriter.GdbWriterErrorCode
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
org.apache.commons.lang3.StringUtils
;
/**
* @author jerrywang
*
*/
public
interface
ConfigHelper
{
static
void
assertConfig
(
String
key
,
Supplier
<
Boolean
>
f
)
{
if
(!
f
.
get
())
{
throw
DataXException
.
asDataXException
(
GdbWriterErrorCode
.
BAD_CONFIG_VALUE
,
key
);
}
}
static
void
assertHasContent
(
Configuration
config
,
String
key
)
{
assertConfig
(
key
,
()
->
StringUtils
.
isNotBlank
(
config
.
getString
(
key
)));
}
/**
* NOTE: {@code Configuration::get(String, Class<T>)} doesn't work.
*
* @param conf Configuration
* @param key key path to configuration
* @param cls Class of result type
* @return the target configuration object of type T
*/
static
<
T
>
T
getConfig
(
Configuration
conf
,
String
key
,
Class
<
T
>
cls
)
{
JSONObject
j
=
(
JSONObject
)
conf
.
get
(
key
);
return
JSON
.
toJavaObject
(
j
,
cls
);
}
/**
* Create a configuration from the specified file on the classpath.
*
* @param name file name
* @return Configuration instance.
*/
static
Configuration
fromClasspath
(
String
name
)
{
try
(
InputStream
is
=
Thread
.
currentThread
().
getContextClassLoader
().
getResourceAsStream
(
name
))
{
return
Configuration
.
from
(
is
);
}
catch
(
IOException
e
)
{
throw
new
IllegalArgumentException
(
"File not found: "
+
name
);
}
}
}
gdbwriter/src/main/java/com/alibaba/datax/plugin/writer/gdbwriter/util/GdbDuplicateIdException.java
0 → 100644
View file @
57c8dd86
/*
* (C) 2019-present Alibaba Group Holding Limited.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*/
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
gdbwriter
.
util
;
/**
* @author : Liu Jianping
* @date : 2019/8/3
*/
public
class
GdbDuplicateIdException
extends
Exception
{
public
GdbDuplicateIdException
(
Exception
e
)
{
super
(
e
);
}
public
GdbDuplicateIdException
()
{
super
();
}
}
gdbwriter/src/main/resources/plugin.json
0 → 100644
View file @
57c8dd86
{
"name"
:
"gdbwriter"
,
"class"
:
"com.alibaba.datax.plugin.writer.gdbwriter.GdbWriter"
,
"description"
:
"useScene: prod. mechanism: connect GDB with gremlin-client, execute DSL as 'g.addV() or g.addE()' to write record"
,
"developer"
:
"alibaba"
}
gdbwriter/src/main/resources/plugin_job_template.json
0 → 100644
View file @
57c8dd86
{
"job"
:
{
"setting"
:
{
"speed"
:
{
"channel"
:
1
}
},
"content"
:
[
{
"reader"
:
{
"name"
:
"odpsreader"
},
"writer"
:
{
"name"
:
"gdbwriter"
,
"parameter"
:
{
"host"
:
"localhost"
,
"port"
:
8182
,
"username"
:
"username"
,
"password"
:
"password"
,
"label"
:
"test-label"
,
"srcLabel"
:
"test-srcLabel-"
,
"dstLabel"
:
"test-dstLabel-"
,
"labelType"
:
"EDGE"
,
"writeMode"
:
"INSERT"
,
"idTransRule"
:
"labelPrefix"
,
"srcIdTransRule"
:
"labelPrefix"
,
"dstIdTransRule"
:
"labelPrefix"
,
"column"
:
[
{
"name"
:
"id"
,
"value"
:
"-test-${0}"
,
"type"
:
"string"
,
"columnType"
:
"primaryKey"
},
{
"name"
:
"id"
,
"value"
:
"from-id-${2}"
,
"type"
:
"string"
,
"columnType"
:
"srcPrimaryKey"
},
{
"name"
:
"id"
,
"value"
:
"to-id-${3}"
,
"type"
:
"string"
,
"columnType"
:
"dstPrimaryKey"
},
{
"name"
:
"strValue-${2}-key"
,
"value"
:
"strValue-${2}-value"
,
"type"
:
"string"
,
"columnType"
:
"edgeProperty"
},
{
"name"
:
"intProp"
,
"value"
:
"${3}"
,
"type"
:
"int"
,
"columnType"
:
"edgeProperty"
},
{
"name"
:
"booleanProp"
,
"value"
:
"${5}"
,
"type"
:
"boolean"
,
"columnType"
:
"edgeProperty"
}
]
}
}
}
]
}
}
pom.xml
View file @
57c8dd86
...
...
@@ -88,6 +88,7 @@
<module>
elasticsearchwriter
</module>
<module>
tsdbwriter
</module>
<module>
adbpgwriter
</module>
<module>
gdbwriter
</module>
<!-- common support module -->
<module>
plugin-rdbms-util
</module>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment