Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
D
DataX
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
risk-feature
DataX
Commits
05d1851d
Commit
05d1851d
authored
Apr 13, 2020
by
Liu Jianping
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
gdbreader: reader for Aliyun GDB
parent
643b6e9c
Changes
18
Hide whitespace changes
Inline
Side-by-side
Showing
18 changed files
with
1755 additions
and
0 deletions
+1755
-0
gdbreader.md
gdbreader/doc/gdbreader.md
+260
-0
pom.xml
gdbreader/pom.xml
+125
-0
package.xml
gdbreader/src/main/assembly/package.xml
+35
-0
GdbReader.java
.../com/alibaba/datax/plugin/reader/gdbreader/GdbReader.java
+231
-0
GdbReaderErrorCode.java
...aba/datax/plugin/reader/gdbreader/GdbReaderErrorCode.java
+39
-0
Key.java
...n/java/com/alibaba/datax/plugin/reader/gdbreader/Key.java
+86
-0
DefaultGdbMapper.java
...tax/plugin/reader/gdbreader/mapping/DefaultGdbMapper.java
+150
-0
MappingRule.java
...ba/datax/plugin/reader/gdbreader/mapping/MappingRule.java
+79
-0
MappingRuleFactory.java
...x/plugin/reader/gdbreader/mapping/MappingRuleFactory.java
+76
-0
ValueType.java
...baba/datax/plugin/reader/gdbreader/mapping/ValueType.java
+128
-0
AbstractGdbGraph.java
...datax/plugin/reader/gdbreader/model/AbstractGdbGraph.java
+89
-0
GdbElement.java
...ibaba/datax/plugin/reader/gdbreader/model/GdbElement.java
+39
-0
GdbGraph.java
...alibaba/datax/plugin/reader/gdbreader/model/GdbGraph.java
+65
-0
ScriptGdbGraph.java
...a/datax/plugin/reader/gdbreader/model/ScriptGdbGraph.java
+192
-0
ConfigHelper.java
...baba/datax/plugin/reader/gdbreader/util/ConfigHelper.java
+77
-0
plugin.json
gdbreader/src/main/resources/plugin.json
+6
-0
plugin_job_template.json
gdbreader/src/main/resources/plugin_job_template.json
+77
-0
pom.xml
pom.xml
+1
-0
No files found.
gdbreader/doc/gdbreader.md
0 → 100644
View file @
05d1851d
# DataX GDBReader
## 1. 快速介绍
GDBReader插件实现读取GDB实例数据的功能,通过
`Gremlin Client`
连接远程GDB实例,按配置提供的
`label`
生成查询DSL,遍历点或边数据,包括属性数据,并将数据写入到Record中给到Writer使用。
## 2. 实现原理
GDBReader使用
`Gremlin Client`
连接GDB实例,按
`label`
分不同Task取点或边数据。
单个Task中按
`label`
遍历点或边的id,再切分范围分多次请求查询点或边和属性数据,最后将点或边数据根据配置转换成指定格式记录发送给下游写插件。
GDBReader按
`label`
切分多个Task并发,同一个
`label`
的数据批量异步获取来加快读取速度。如果配置读取的
`label`
列表为空,任务启动前会从GDB查询所有
`label`
再切分Task。
## 3. 功能说明
GDB中点和边不同,读取需要区分点和边点配置。
### 3.1 点配置样例
```
{
"job": {
"setting": {
"speed": {
"channel": 1
}
"errorLimit": {
"record": 1
}
},
"content": [
{
"reader": {
"name": "gdbreader",
"parameter": {
"host": "10.218.145.24",
"port": 8182,
"username": "***",
"password": "***",
"fetchBatchSize": 100,
"rangeSplitSize": 1000,
"labelType": "VERTEX",
"labels": ["label1", "label2"],
"column": [
{
"name": "id",
"type": "string",
"columnType": "primaryKey"
},
{
"name": "label",
"type": "string",
"columnType": "primaryLabel"
},
{
"name": "age",
"type": "int",
"columnType": "vertexProperty"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
]
}
}
```
### 3.2 边配置样例
```
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 1
}
},
"content": [
{
"reader": {
"name": "gdbreader",
"parameter": {
"host": "10.218.145.24",
"port": 8182,
"username": "***",
"password": "***",
"fetchBatchSize": 100,
"rangeSplitSize": 1000,
"labelType": "EDGE",
"labels": ["label1", "label2"],
"column": [
{
"name": "id",
"type": "string",
"columnType": "primaryKey"
},
{
"name": "label",
"type": "string",
"columnType": "primaryLabel"
},
{
"name": "srcId",
"type": "string",
"columnType": "srcPrimaryKey"
},
{
"name": "srcLabel",
"type": "string",
"columnType": "srcPrimaryLabel"
},
{
"name": "dstId",
"type": "string",
"columnType": "srcPrimaryKey"
},
{
"name": "dstLabel",
"type": "string",
"columnType": "srcPrimaryLabel"
},
{
"name": "name",
"type": "string",
"columnType": "edgeProperty"
},
{
"name": "weight",
"type": "double",
"columnType": "edgeProperty"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
]
}
}
```
### 3.3 参数说明
*
**host**
*
描述:GDB实例连接地址,对应'实例管理'->'基本信息'页面的网络地址
*
必选:是
*
默认值:无
*
**port**
*
描述:GDB实例连接地址对应的端口
*
必选:是
*
默认值:8182
*
**username**
*
描述:GDB实例账号名
*
必选:是
*
默认值:无
*
**password**
*
描述:GDB实例账号名对应的密码
*
必选:是
*
默认值:无
*
**fetchBatchSize**
*
描述:一次GDB请求读取点或边的数量,响应包含点或边以及属性
*
必选:是
*
默认值:100
*
**rangeSplitSize**
*
描述:id遍历,一次遍历请求扫描的id个数
*
必选:是
*
默认值:10
\*
fetchBatchSize
*
**labels**
*
描述:标签数组,即需要导出的点或边标签,支持读取多个标签,用数组表示。如果留空(
[]
),表示GDB中所有点或边标签
*
必选:是
*
默认值:无
*
**labelType**
*
描述:数据标签类型,支持点、边两种枚举值
*
VERTEX:表示点
*
EDGE:表示边
*
必选:是
*
默认值:无
*
**column**
*
描述:点或边字段映射关系配置
*
必选:是
*
默认值:无
*
**column -> name**
*
描述:点或边映射关系的字段名,指定属性时表示读取的属性名,读取其他字段时会被忽略
*
必选:是
*
默认值:无
*
**column -> type**
*
描述:点或边映射关系的字段类型
*
id, label在GDB中都是string类型,配置非string类型时可能会转换失败
*
普通属性支持基础类型,包括int, long, float, double, boolean, string
*
GDBReader尽量将读取到的数据转换成配置要求的类型,但转换失败会导致该条记录错误
*
必选:是
*
默认值:无
*
**column -> columnType**
*
描述:GDB点或边数据到列数据的映射关系,支持以下枚举值:
*
primaryKey: 表示该字段是点或边的id
*
primaryLabel: 表示该字段是点或边的label
*
srcPrimaryKey: 表示该字段是边关联的起点id,只在读取边时使用
*
srcPrimaryLabel: 表示该字段是边关联的起点label,只在读取边时使用
*
dstPrimaryKey: 表示该字段是边关联的终点id,只在读取边时使用
*
dstPrimaryLabel: 表示该字段是边关联的终点label,只在读取边时使用
*
vertexProperty: 表示该字段是点的属性,只在读取点时使用,应用到SET属性时只读取其中的一个属性值
*
vertexJsonProperty: 表示该字段是点的属性集合,只在读取点时使用。属性集合使用JSON格式输出,包含所有的属性,不能与其他vertexProperty配置一起使用
*
edgeProperty: 表示该字段是边的属性,只在读取边时使用
*
edgeJsonProperty: 表示该字段是边的属性集合,只在读取边时使用。属性集合使用JSON格式输出,包含所有的属性,不能与其他edgeProperty配置一起使用
*
必选:是
*
默认值:无
*
vertexJsonProperty格式示例,新增
`c`
字段区分SET属性,但是SET属性只包含单个属性值时会标记成普通属性
```
{"properties":[
{"k":"name","t","string","v":"Jack","c":"set"},
{"k":"name","t","string","v":"Luck","c":"set"},
{"k":"age","t","int","v":"20","c":"single"}
]}
```
*
edgeJsonProperty格式示例,边不支持多值属性
```
{"properties":[
{"k":"created_at","t","long","v":"153498653"},
{"k":"weight","t","double","v":"3.14"}
]}
## 4 性能报告
(TODO)
## 5 使用约束
无
## 6 FAQ
无
gdbreader/pom.xml
0 → 100644
View file @
05d1851d
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<parent>
<artifactId>
datax-all
</artifactId>
<groupId>
com.alibaba.datax
</groupId>
<version>
0.0.1-SNAPSHOT
</version>
</parent>
<modelVersion>
4.0.0
</modelVersion>
<artifactId>
gdbreader
</artifactId>
<groupId>
com.alibaba.datax
</groupId>
<version>
0.0.1-SNAPSHOT
</version>
<dependencies>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-common
</artifactId>
<version>
${datax-project-version}
</version>
<exclusions>
<exclusion>
<artifactId>
slf4j-log4j12
</artifactId>
<groupId>
org.slf4j
</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-core
</artifactId>
<version>
${datax-project-version}
</version>
<scope>
test
</scope>
<exclusions>
<exclusion>
<artifactId>
slf4j-log4j12
</artifactId>
<groupId>
org.slf4j
</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
</dependency>
<dependency>
<groupId>
ch.qos.logback
</groupId>
<artifactId>
logback-classic
</artifactId>
</dependency>
<dependency>
<groupId>
org.apache.tinkerpop
</groupId>
<artifactId>
gremlin-driver
</artifactId>
<version>
3.4.1
</version>
</dependency>
<dependency>
<groupId>
org.projectlombok
</groupId>
<artifactId>
lombok
</artifactId>
<version>
1.18.8
</version>
</dependency>
<dependency>
<groupId>
org.junit.jupiter
</groupId>
<artifactId>
junit-jupiter-api
</artifactId>
<version>
5.4.0
</version>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.junit.jupiter
</groupId>
<artifactId>
junit-jupiter-engine
</artifactId>
<version>
5.4.0
</version>
<scope>
test
</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>
maven-compiler-plugin
</artifactId>
<configuration>
<source>
1.6
</source>
<target>
1.6
</target>
<encoding>
${project-sourceEncoding}
</encoding>
</configuration>
</plugin>
<!-- test case plugin -->
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-surefire-plugin
</artifactId>
<version>
2.22.0
</version>
<configuration>
<includes>
<include>
**/*Test*.class
</include>
</includes>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>
maven-assembly-plugin
</artifactId>
<configuration>
<descriptors>
<descriptor>
src/main/assembly/package.xml
</descriptor>
</descriptors>
<finalName>
datax
</finalName>
</configuration>
<executions>
<execution>
<id>
dwzip
</id>
<phase>
package
</phase>
<goals>
<goal>
single
</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-compiler-plugin
</artifactId>
<configuration>
<source>
8
</source>
<target>
8
</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
gdbreader/src/main/assembly/package.xml
0 → 100644
View file @
05d1851d
<assembly
xmlns=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"
>
<id></id>
<formats>
<format>
dir
</format>
</formats>
<includeBaseDirectory>
false
</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>
src/main/resources
</directory>
<includes>
<include>
plugin.json
</include>
<include>
plugin_job_template.json
</include>
</includes>
<outputDirectory>
plugin/reader/gdbreader
</outputDirectory>
</fileSet>
<fileSet>
<directory>
target/
</directory>
<includes>
<include>
gdbreader-0.0.1-SNAPSHOT.jar
</include>
</includes>
<outputDirectory>
plugin/reader/gdbreader
</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>
false
</useProjectArtifact>
<outputDirectory>
plugin/reader/gdbreader/libs
</outputDirectory>
<scope>
runtime
</scope>
</dependencySet>
</dependencySets>
</assembly>
gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/GdbReader.java
0 → 100644
View file @
05d1851d
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
gdbreader
;
import
com.alibaba.datax.common.element.Record
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.plugin.RecordSender
;
import
com.alibaba.datax.common.spi.Reader
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.reader.gdbreader.mapping.DefaultGdbMapper
;
import
com.alibaba.datax.plugin.reader.gdbreader.mapping.MappingRule
;
import
com.alibaba.datax.plugin.reader.gdbreader.mapping.MappingRuleFactory
;
import
com.alibaba.datax.plugin.reader.gdbreader.model.GdbElement
;
import
com.alibaba.datax.plugin.reader.gdbreader.model.GdbGraph
;
import
com.alibaba.datax.plugin.reader.gdbreader.model.ScriptGdbGraph
;
import
com.alibaba.datax.plugin.reader.gdbreader.util.ConfigHelper
;
import
org.apache.tinkerpop.gremlin.driver.ResultSet
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.LinkedList
;
import
java.util.List
;
public
class
GdbReader
extends
Reader
{
private
final
static
int
DEFAULT_FETCH_BATCH_SIZE
=
200
;
private
static
GdbGraph
graph
;
private
static
Key
.
ExportType
exportType
;
/**
* Job 中的方法仅执行一次,Task 中方法会由框架启动多个 Task 线程并行执行。
* <p/>
* 整个 Reader 执行流程是:
* <pre>
* Job类init-->prepare-->split
*
* Task类init-->prepare-->startRead-->post-->destroy
* Task类init-->prepare-->startRead-->post-->destroy
*
* Job类post-->destroy
* </pre>
*/
public
static
class
Job
extends
Reader
.
Job
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
Job
.
class
);
private
Configuration
jobConfig
=
null
;
@Override
public
void
init
()
{
this
.
jobConfig
=
super
.
getPluginJobConf
();
/**
* 注意:此方法仅执行一次。
* 最佳实践:通常在这里对用户的配置进行校验:是否缺失必填项?有无错误值?有没有无关配置项?...
* 并给出清晰的报错/警告提示。校验通常建议采用静态工具类进行,以保证本类结构清晰。
*/
ConfigHelper
.
assertGdbClient
(
jobConfig
);
ConfigHelper
.
assertLabels
(
jobConfig
);
try
{
exportType
=
Key
.
ExportType
.
valueOf
(
jobConfig
.
getString
(
Key
.
EXPORT_TYPE
));
}
catch
(
NullPointerException
|
IllegalArgumentException
e
)
{
throw
DataXException
.
asDataXException
(
GdbReaderErrorCode
.
BAD_CONFIG_VALUE
,
Key
.
EXPORT_TYPE
);
}
}
@Override
public
void
prepare
()
{
/**
* 注意:此方法仅执行一次。
* 最佳实践:如果 Job 中有需要进行数据同步之前的处理,可以在此处完成,如果没有必要则可以直接去掉。
*/
try
{
graph
=
new
ScriptGdbGraph
(
jobConfig
,
exportType
);
}
catch
(
RuntimeException
e
)
{
throw
DataXException
.
asDataXException
(
GdbReaderErrorCode
.
FAIL_CLIENT_CONNECT
,
e
.
getMessage
());
}
}
@Override
public
List
<
Configuration
>
split
(
int
adviceNumber
)
{
/**
* 注意:此方法仅执行一次。
* 最佳实践:通常采用工具静态类完成把 Job 配置切分成多个 Task 配置的工作。
* 这里的 adviceNumber 是框架根据用户的同步速度的要求建议的切分份数,仅供参考,不是强制必须切分的份数。
*/
List
<
String
>
labels
=
ConfigHelper
.
assertLabels
(
jobConfig
);
/**
* 配置label列表为空时,尝试查询GDB中所有label,添加到读取列表
*/
if
(
labels
.
isEmpty
())
{
try
{
labels
.
addAll
(
graph
.
getLabels
().
keySet
());
}
catch
(
RuntimeException
ex
)
{
throw
DataXException
.
asDataXException
(
GdbReaderErrorCode
.
FAIL_FETCH_LABELS
,
ex
.
getMessage
());
}
}
if
(
labels
.
isEmpty
())
{
throw
DataXException
.
asDataXException
(
GdbReaderErrorCode
.
FAIL_FETCH_LABELS
,
"none labels to read"
);
}
return
ConfigHelper
.
splitConfig
(
jobConfig
,
labels
);
}
@Override
public
void
post
()
{
/**
* 注意:此方法仅执行一次。
* 最佳实践:如果 Job 中有需要进行数据同步之后的后续处理,可以在此处完成。
*/
}
@Override
public
void
destroy
()
{
/**
* 注意:此方法仅执行一次。
* 最佳实践:通常配合 Job 中的 post() 方法一起完成 Job 的资源释放。
*/
try
{
graph
.
close
();
}
catch
(
Exception
ex
)
{
LOG
.
error
(
"Failed to close client : {}"
,
ex
);
}
}
}
public
static
class
Task
extends
Reader
.
Task
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
Task
.
class
);
private
static
MappingRule
rule
;
private
Configuration
taskConfig
;
private
String
fetchLabel
=
null
;
private
int
rangeSplitSize
;
private
int
fetchBatchSize
;
@Override
public
void
init
()
{
this
.
taskConfig
=
super
.
getPluginJobConf
();
/**
* 注意:此方法每个 Task 都会执行一次。
* 最佳实践:此处通过对 taskConfig 配置的读取,进而初始化一些资源为 startRead()做准备。
*/
fetchLabel
=
taskConfig
.
getString
(
Key
.
LABEL
);
fetchBatchSize
=
taskConfig
.
getInt
(
Key
.
FETCH_BATCH_SIZE
,
DEFAULT_FETCH_BATCH_SIZE
);
rangeSplitSize
=
taskConfig
.
getInt
(
Key
.
RANGE_SPLIT_SIZE
,
fetchBatchSize
*
10
);
rule
=
MappingRuleFactory
.
getInstance
().
create
(
taskConfig
,
exportType
);
}
@Override
public
void
prepare
()
{
/**
* 注意:此方法仅执行一次。
* 最佳实践:如果 Job 中有需要进行数据同步之后的处理,可以在此处完成,如果没有必要则可以直接去掉。
*/
}
@Override
public
void
startRead
(
RecordSender
recordSender
)
{
/**
* 注意:此方法每个 Task 都会执行一次。
* 最佳实践:此处适当封装确保简洁清晰完成数据读取工作。
*/
String
start
=
""
;
while
(
true
)
{
List
<
String
>
ids
;
try
{
ids
=
graph
.
fetchIds
(
fetchLabel
,
start
,
rangeSplitSize
);
if
(
ids
.
isEmpty
())
{
break
;
}
start
=
ids
.
get
(
ids
.
size
()
-
1
);
}
catch
(
Exception
ex
)
{
throw
DataXException
.
asDataXException
(
GdbReaderErrorCode
.
FAIL_FETCH_IDS
,
ex
.
getMessage
());
}
// send range fetch async
int
count
=
ids
.
size
();
List
<
ResultSet
>
resultSets
=
new
LinkedList
<>();
for
(
int
pos
=
0
;
pos
<
count
;
pos
+=
fetchBatchSize
)
{
int
rangeSize
=
Math
.
min
(
fetchBatchSize
,
count
-
pos
);
String
endId
=
ids
.
get
(
pos
+
rangeSize
-
1
);
String
beginId
=
ids
.
get
(
pos
);
List
<
String
>
propNames
=
rule
.
isHasProperty
()
?
rule
.
getPropertyNames
()
:
null
;
try
{
resultSets
.
add
(
graph
.
fetchElementsAsync
(
fetchLabel
,
beginId
,
endId
,
propNames
));
}
catch
(
Exception
ex
)
{
// just print error logs and continues
LOG
.
error
(
"failed to request label: {}, start: {}, end: {}, e: {}"
,
fetchLabel
,
beginId
,
endId
,
ex
);
}
}
// get range fetch dsl results
resultSets
.
forEach
(
results
->
{
try
{
List
<
GdbElement
>
elements
=
graph
.
getElement
(
results
);
elements
.
forEach
(
element
->
{
Record
record
=
recordSender
.
createRecord
();
DefaultGdbMapper
.
getMapper
(
rule
).
accept
(
element
,
record
);
recordSender
.
sendToWriter
(
record
);
});
recordSender
.
flush
();
}
catch
(
Exception
ex
)
{
LOG
.
error
(
"failed to send records e {}"
,
ex
);
}
});
}
}
@Override
public
void
post
()
{
/**
* 注意:此方法每个 Task 都会执行一次。
* 最佳实践:如果 Task 中有需要进行数据同步之后的后续处理,可以在此处完成。
*/
}
@Override
public
void
destroy
()
{
/**
* 注意:此方法每个 Task 都会执行一次。
* 最佳实践:通常配合Task 中的 post() 方法一起完成 Task 的资源释放。
*/
}
}
}
\ No newline at end of file
gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/GdbReaderErrorCode.java
0 → 100644
View file @
05d1851d
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
gdbreader
;
import
com.alibaba.datax.common.spi.ErrorCode
;
public
enum
GdbReaderErrorCode
implements
ErrorCode
{
/**
*
*/
BAD_CONFIG_VALUE
(
"GdbReader-00"
,
"The value you configured is invalid."
),
FAIL_CLIENT_CONNECT
(
"GdbReader-02"
,
"GDB connection is abnormal."
),
UNSUPPORTED_TYPE
(
"GdbReader-03"
,
"Unsupported data type conversion."
),
FAIL_FETCH_LABELS
(
"GdbReader-04"
,
"Error pulling all labels, it is recommended to configure the specified label pull."
),
FAIL_FETCH_IDS
(
"GdbReader-05"
,
"Pull range id error."
),
;
private
final
String
code
;
private
final
String
description
;
private
GdbReaderErrorCode
(
String
code
,
String
description
)
{
this
.
code
=
code
;
this
.
description
=
description
;
}
@Override
public
String
getCode
()
{
return
this
.
code
;
}
@Override
public
String
getDescription
()
{
return
this
.
description
;
}
@Override
public
String
toString
()
{
return
String
.
format
(
"Code:[%s], Description:[%s]. "
,
this
.
code
,
this
.
description
);
}
}
\ No newline at end of file
gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/Key.java
0 → 100644
View file @
05d1851d
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
gdbreader
;
public
final
class
Key
{
/**
* 此处声明插件用到的需要插件使用者提供的配置项
*/
public
final
static
String
HOST
=
"host"
;
public
final
static
String
PORT
=
"port"
;
public
final
static
String
USERNAME
=
"username"
;
public
static
final
String
PASSWORD
=
"password"
;
public
static
final
String
LABEL
=
"labels"
;
public
static
final
String
EXPORT_TYPE
=
"labelType"
;
public
static
final
String
RANGE_SPLIT_SIZE
=
"RangeSplitSize"
;
public
static
final
String
FETCH_BATCH_SIZE
=
"fetchBatchSize"
;
public
static
final
String
COLUMN
=
"column"
;
public
static
final
String
COLUMN_NAME
=
"name"
;
public
static
final
String
COLUMN_TYPE
=
"type"
;
public
static
final
String
COLUMN_NODE_TYPE
=
"columnType"
;
public
enum
ExportType
{
/**
* Import vertices
*/
VERTEX
,
/**
* Import edges
*/
EDGE
}
public
enum
ColumnType
{
/**
* vertex or edge id
*/
primaryKey
,
/**
* vertex or edge label
*/
primaryLabel
,
/**
* vertex property
*/
vertexProperty
,
/**
* collects all vertex property to Json list
*/
vertexJsonProperty
,
/**
* start vertex id of edge
*/
srcPrimaryKey
,
/**
* start vertex label of edge
*/
srcPrimaryLabel
,
/**
* end vertex id of edge
*/
dstPrimaryKey
,
/**
* end vertex label of edge
*/
dstPrimaryLabel
,
/**
* edge property
*/
edgeProperty
,
/**
* collects all edge property to Json list
*/
edgeJsonProperty
,
}
}
gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/mapping/DefaultGdbMapper.java
0 → 100644
View file @
05d1851d
/*
* (C) 2019-present Alibaba Group Holding Limited.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*/
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
gdbreader
.
mapping
;
import
com.alibaba.datax.common.element.Record
;
import
com.alibaba.datax.plugin.reader.gdbreader.model.GdbElement
;
import
org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceProperty
;
import
org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertexProperty
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.function.BiConsumer
;
import
java.util.function.Function
;
import
java.util.stream.Collectors
;
/**
* @author : Liu Jianping
* @date : 2019/9/6
*/
public
class
DefaultGdbMapper
{
public
static
BiConsumer
<
GdbElement
,
Record
>
getMapper
(
MappingRule
rule
)
{
return
(
gdbElement
,
record
)
->
rule
.
getColumns
().
forEach
(
columnMappingRule
->
{
Object
value
=
null
;
ValueType
type
=
columnMappingRule
.
getValueType
();
String
name
=
columnMappingRule
.
getName
();
Map
<
String
,
Object
>
props
=
gdbElement
.
getProperties
();
switch
(
columnMappingRule
.
getColumnType
())
{
case
dstPrimaryKey:
value
=
gdbElement
.
getTo
();
break
;
case
srcPrimaryKey:
value
=
gdbElement
.
getFrom
();
break
;
case
primaryKey:
value
=
gdbElement
.
getId
();
break
;
case
primaryLabel:
value
=
gdbElement
.
getLabel
();
break
;
case
dstPrimaryLabel:
value
=
gdbElement
.
getToLabel
();
break
;
case
srcPrimaryLabel:
value
=
gdbElement
.
getFromLabel
();
break
;
case
vertexProperty:
value
=
forVertexOnePropertyValue
().
apply
(
props
.
get
(
name
));
break
;
case
edgeProperty:
value
=
forEdgePropertyValue
().
apply
(
props
.
get
(
name
));
break
;
case
edgeJsonProperty:
value
=
forEdgeJsonProperties
().
apply
(
props
);
break
;
case
vertexJsonProperty:
value
=
forVertexJsonProperties
().
apply
(
props
);
break
;
default
:
break
;
}
record
.
addColumn
(
type
.
applyObject
(
value
));
});
}
/**
* parser ReferenceProperty value for edge
*
* @return property value
*/
private
static
Function
<
Object
,
Object
>
forEdgePropertyValue
()
{
return
prop
->
{
if
(
prop
instanceof
ReferenceProperty
)
{
return
((
ReferenceProperty
)
prop
).
value
();
}
return
null
;
};
}
/**
* parser ReferenceVertexProperty value for vertex
*
* @return the first property value in list
*/
private
static
Function
<
Object
,
Object
>
forVertexOnePropertyValue
()
{
return
props
->
{
if
(
props
instanceof
List
<?>)
{
// get the first one property if more than one
Object
o
=
((
List
)
props
).
get
(
0
);
if
(
o
instanceof
ReferenceVertexProperty
)
{
return
((
ReferenceVertexProperty
)
o
).
value
();
}
}
return
null
;
};
}
/**
* parser all edge properties to json string
*
* @return json string
*/
private
static
Function
<
Map
<
String
,
Object
>,
String
>
forEdgeJsonProperties
()
{
return
props
->
"{\"properties\":["
+
props
.
entrySet
().
stream
().
filter
(
p
->
p
.
getValue
()
instanceof
ReferenceProperty
)
.
map
(
p
->
"{\"k\":\""
+
((
ReferenceProperty
)
p
.
getValue
()).
key
()
+
"\","
+
"\"t\":\""
+
((
ReferenceProperty
)
p
.
getValue
()).
value
().
getClass
().
getSimpleName
().
toLowerCase
()
+
"\","
+
"\"v\":\""
+
String
.
valueOf
(((
ReferenceProperty
)
p
.
getValue
()).
value
())
+
"\"}"
)
.
collect
(
Collectors
.
joining
(
","
))
+
"]}"
;
}
/**
* parser all vertex properties to json string, include set-property
*
* @return json string
*/
private
static
Function
<
Map
<
String
,
Object
>,
String
>
forVertexJsonProperties
()
{
return
props
->
"{\"properties\":["
+
props
.
entrySet
().
stream
().
filter
(
p
->
p
.
getValue
()
instanceof
List
<?>)
.
map
(
p
->
forVertexPropertyStr
().
apply
((
List
<?>)
p
.
getValue
()))
.
collect
(
Collectors
.
joining
(
","
))
+
"]}"
;
}
/**
* parser one vertex property to json string item, set 'cardinality'
*
* @return json string item
*/
private
static
Function
<
List
<?>,
String
>
forVertexPropertyStr
()
{
return
vp
->
{
final
String
setFlag
=
vp
.
size
()
>
1
?
"set"
:
"single"
;
return
vp
.
stream
().
filter
(
p
->
p
instanceof
ReferenceVertexProperty
)
.
map
(
p
->
"{\"k\":\""
+
((
ReferenceVertexProperty
)
p
).
key
()
+
"\","
+
"\"t\":\""
+
((
ReferenceVertexProperty
)
p
).
value
().
getClass
().
getSimpleName
().
toLowerCase
()
+
"\","
+
"\"v\":\""
+
String
.
valueOf
(((
ReferenceVertexProperty
)
p
).
value
())
+
"\","
+
"\"c\":\""
+
setFlag
+
"\"}"
)
.
collect
(
Collectors
.
joining
(
","
));
};
}
}
gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/mapping/MappingRule.java
0 → 100644
View file @
05d1851d
/*
* (C) 2019-present Alibaba Group Holding Limited.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*/
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
gdbreader
.
mapping
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.plugin.reader.gdbreader.GdbReaderErrorCode
;
import
com.alibaba.datax.plugin.reader.gdbreader.Key.ColumnType
;
import
com.alibaba.datax.plugin.reader.gdbreader.Key.ExportType
;
import
lombok.Data
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* @author : Liu Jianping
* @date : 2019/9/6
*/
@Data
public
class
MappingRule
{
private
boolean
hasRelation
=
false
;
private
boolean
hasProperty
=
false
;
private
ExportType
type
=
ExportType
.
VERTEX
;
/**
* property names for property key-value
*/
private
List
<
String
>
propertyNames
=
new
ArrayList
<>();
private
List
<
ColumnMappingRule
>
columns
=
new
ArrayList
<>();
void
addColumn
(
ColumnType
columnType
,
ValueType
type
,
String
name
)
{
ColumnMappingRule
rule
=
new
ColumnMappingRule
();
rule
.
setColumnType
(
columnType
);
rule
.
setName
(
name
);
rule
.
setValueType
(
type
);
if
(
columnType
==
ColumnType
.
vertexProperty
||
columnType
==
ColumnType
.
edgeProperty
)
{
propertyNames
.
add
(
name
);
hasProperty
=
true
;
}
boolean
hasTo
=
columnType
==
ColumnType
.
dstPrimaryKey
||
columnType
==
ColumnType
.
dstPrimaryLabel
;
boolean
hasFrom
=
columnType
==
ColumnType
.
srcPrimaryKey
||
columnType
==
ColumnType
.
srcPrimaryLabel
;
if
(
hasTo
||
hasFrom
)
{
hasRelation
=
true
;
}
columns
.
add
(
rule
);
}
void
addJsonColumn
(
ColumnType
columnType
)
{
ColumnMappingRule
rule
=
new
ColumnMappingRule
();
rule
.
setColumnType
(
columnType
);
rule
.
setName
(
"json"
);
rule
.
setValueType
(
ValueType
.
STRING
);
if
(!
propertyNames
.
isEmpty
())
{
throw
DataXException
.
asDataXException
(
GdbReaderErrorCode
.
BAD_CONFIG_VALUE
,
"JsonProperties should be only property"
);
}
columns
.
add
(
rule
);
hasProperty
=
true
;
}
@Data
protected
static
class
ColumnMappingRule
{
private
String
name
=
null
;
private
ValueType
valueType
=
null
;
private
ColumnType
columnType
=
null
;
}
}
gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/mapping/MappingRuleFactory.java
0 → 100644
View file @
05d1851d
/*
* (C) 2019-present Alibaba Group Holding Limited.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*/
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
gdbreader
.
mapping
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.reader.gdbreader.GdbReaderErrorCode
;
import
com.alibaba.datax.plugin.reader.gdbreader.Key
;
import
com.alibaba.datax.plugin.reader.gdbreader.Key.ColumnType
;
import
com.alibaba.datax.plugin.reader.gdbreader.Key.ExportType
;
import
com.alibaba.datax.plugin.reader.gdbreader.util.ConfigHelper
;
import
java.util.List
;
/**
* @author : Liu Jianping
* @date : 2019/9/20
*/
public
class
MappingRuleFactory
{
private
static
final
MappingRuleFactory
instance
=
new
MappingRuleFactory
();
public
static
MappingRuleFactory
getInstance
()
{
return
instance
;
}
public
MappingRule
create
(
Configuration
config
,
ExportType
exportType
)
{
MappingRule
rule
=
new
MappingRule
();
rule
.
setType
(
exportType
);
List
<
Configuration
>
configurationList
=
config
.
getListConfiguration
(
Key
.
COLUMN
);
for
(
Configuration
column
:
configurationList
)
{
ColumnType
columnType
;
try
{
columnType
=
ColumnType
.
valueOf
(
column
.
getString
(
Key
.
COLUMN_NODE_TYPE
));
}
catch
(
NullPointerException
|
IllegalArgumentException
e
)
{
throw
DataXException
.
asDataXException
(
GdbReaderErrorCode
.
BAD_CONFIG_VALUE
,
Key
.
COLUMN_NODE_TYPE
);
}
if
(
exportType
==
ExportType
.
VERTEX
)
{
// only id/label/property column allow when vertex
ConfigHelper
.
assertConfig
(
Key
.
COLUMN_NODE_TYPE
,
()
->
columnType
==
ColumnType
.
primaryKey
||
columnType
==
ColumnType
.
primaryLabel
||
columnType
==
ColumnType
.
vertexProperty
||
columnType
==
ColumnType
.
vertexJsonProperty
);
}
else
if
(
exportType
==
ExportType
.
EDGE
)
{
// edge
ConfigHelper
.
assertConfig
(
Key
.
COLUMN_NODE_TYPE
,
()
->
columnType
==
ColumnType
.
primaryKey
||
columnType
==
ColumnType
.
primaryLabel
||
columnType
==
ColumnType
.
srcPrimaryKey
||
columnType
==
ColumnType
.
srcPrimaryLabel
||
columnType
==
ColumnType
.
dstPrimaryKey
||
columnType
==
ColumnType
.
dstPrimaryLabel
||
columnType
==
ColumnType
.
edgeProperty
||
columnType
==
ColumnType
.
edgeJsonProperty
);
}
if
(
columnType
==
ColumnType
.
edgeProperty
||
columnType
==
ColumnType
.
vertexProperty
)
{
String
name
=
column
.
getString
(
Key
.
COLUMN_NAME
);
ValueType
propType
=
ValueType
.
fromShortName
(
column
.
getString
(
Key
.
COLUMN_TYPE
));
ConfigHelper
.
assertConfig
(
Key
.
COLUMN_NAME
,
()
->
name
!=
null
);
if
(
propType
==
null
)
{
throw
DataXException
.
asDataXException
(
GdbReaderErrorCode
.
UNSUPPORTED_TYPE
,
Key
.
COLUMN_TYPE
);
}
rule
.
addColumn
(
columnType
,
propType
,
name
);
}
else
if
(
columnType
==
ColumnType
.
vertexJsonProperty
||
columnType
==
ColumnType
.
edgeJsonProperty
)
{
rule
.
addJsonColumn
(
columnType
);
}
else
{
rule
.
addColumn
(
columnType
,
ValueType
.
STRING
,
null
);
}
}
return
rule
;
}
}
gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/mapping/ValueType.java
0 → 100644
View file @
05d1851d
/*
* (C) 2019-present Alibaba Group Holding Limited.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*/
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
gdbreader
.
mapping
;
import
com.alibaba.datax.common.element.BoolColumn
;
import
com.alibaba.datax.common.element.Column
;
import
com.alibaba.datax.common.element.DoubleColumn
;
import
com.alibaba.datax.common.element.LongColumn
;
import
com.alibaba.datax.common.element.StringColumn
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.function.Function
;
/**
* @author : Liu Jianping
* @date : 2019/9/6
*/
public
enum
ValueType
{
/**
* transfer gdb element object value to DataX Column data
* <p>
* int, long -> LongColumn
* float, double -> DoubleColumn
* bool -> BooleanColumn
* string -> StringColumn
*/
INT
(
Integer
.
class
,
"int"
,
ValueTypeHolder:
:
longColumnMapper
),
INTEGER
(
Integer
.
class
,
"integer"
,
ValueTypeHolder:
:
longColumnMapper
),
LONG
(
Long
.
class
,
"long"
,
ValueTypeHolder:
:
longColumnMapper
),
DOUBLE
(
Double
.
class
,
"double"
,
ValueTypeHolder:
:
doubleColumnMapper
),
FLOAT
(
Float
.
class
,
"float"
,
ValueTypeHolder:
:
doubleColumnMapper
),
BOOLEAN
(
Boolean
.
class
,
"boolean"
,
ValueTypeHolder:
:
boolColumnMapper
),
STRING
(
String
.
class
,
"string"
,
ValueTypeHolder:
:
stringColumnMapper
),
;
private
Class
<?>
type
=
null
;
private
String
shortName
=
null
;
private
Function
<
Object
,
Column
>
columnFunc
=
null
;
ValueType
(
Class
<?>
type
,
String
name
,
Function
<
Object
,
Column
>
columnFunc
)
{
this
.
type
=
type
;
this
.
shortName
=
name
;
this
.
columnFunc
=
columnFunc
;
ValueTypeHolder
.
shortName2type
.
put
(
shortName
,
this
);
}
public
static
ValueType
fromShortName
(
String
name
)
{
return
ValueTypeHolder
.
shortName2type
.
get
(
name
);
}
public
Column
applyObject
(
Object
value
)
{
if
(
value
==
null
)
{
return
null
;
}
return
columnFunc
.
apply
(
value
);
}
private
static
class
ValueTypeHolder
{
private
static
Map
<
String
,
ValueType
>
shortName2type
=
new
HashMap
<>();
private
static
LongColumn
longColumnMapper
(
Object
o
)
{
long
v
;
if
(
o
instanceof
Integer
)
{
v
=
(
int
)
o
;
}
else
if
(
o
instanceof
Long
)
{
v
=
(
long
)
o
;
}
else
if
(
o
instanceof
String
)
{
v
=
Long
.
valueOf
((
String
)
o
);
}
else
{
throw
new
RuntimeException
(
"Failed to cast "
+
o
.
getClass
()
+
" to Long"
);
}
return
new
LongColumn
(
v
);
}
private
static
DoubleColumn
doubleColumnMapper
(
Object
o
)
{
double
v
;
if
(
o
instanceof
Integer
)
{
v
=
(
double
)
(
int
)
o
;
}
else
if
(
o
instanceof
Long
)
{
v
=
(
double
)
(
long
)
o
;
}
else
if
(
o
instanceof
Float
)
{
v
=
(
double
)
(
float
)
o
;
}
else
if
(
o
instanceof
Double
)
{
v
=
(
double
)
o
;
}
else
if
(
o
instanceof
String
)
{
v
=
Double
.
valueOf
((
String
)
o
);
}
else
{
throw
new
RuntimeException
(
"Failed to cast "
+
o
.
getClass
()
+
" to Double"
);
}
return
new
DoubleColumn
(
v
);
}
private
static
BoolColumn
boolColumnMapper
(
Object
o
)
{
boolean
v
;
if
(
o
instanceof
Integer
)
{
v
=
((
int
)
o
!=
0
);
}
else
if
(
o
instanceof
Long
)
{
v
=
((
long
)
o
!=
0
);
}
else
if
(
o
instanceof
Boolean
)
{
v
=
(
boolean
)
o
;
}
else
if
(
o
instanceof
String
)
{
v
=
Boolean
.
valueOf
((
String
)
o
);
}
else
{
throw
new
RuntimeException
(
"Failed to cast "
+
o
.
getClass
()
+
" to Boolean"
);
}
return
new
BoolColumn
(
v
);
}
private
static
StringColumn
stringColumnMapper
(
Object
o
)
{
if
(
o
instanceof
String
)
{
return
new
StringColumn
((
String
)
o
);
}
else
{
return
new
StringColumn
(
String
.
valueOf
(
o
));
}
}
}
}
gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/model/AbstractGdbGraph.java
0 → 100644
View file @
05d1851d
/*
* (C) 2019-present Alibaba Group Holding Limited.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*/
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
gdbreader
.
model
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.reader.gdbreader.Key
;
import
org.apache.tinkerpop.gremlin.driver.Client
;
import
org.apache.tinkerpop.gremlin.driver.Cluster
;
import
org.apache.tinkerpop.gremlin.driver.RequestOptions
;
import
org.apache.tinkerpop.gremlin.driver.Result
;
import
org.apache.tinkerpop.gremlin.driver.ResultSet
;
import
org.apache.tinkerpop.gremlin.driver.ser.Serializers
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.TimeUnit
;
/**
* @author : Liu Jianping
* @date : 2019/9/6
*/
public
abstract
class
AbstractGdbGraph
implements
GdbGraph
{
final
static
int
DEFAULT_TIMEOUT
=
30000
;
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
AbstractGdbGraph
.
class
);
private
Client
client
;
AbstractGdbGraph
()
{
}
AbstractGdbGraph
(
Configuration
config
)
{
log
.
info
(
"init graphdb client"
);
String
host
=
config
.
getString
(
Key
.
HOST
);
int
port
=
config
.
getInt
(
Key
.
PORT
);
String
username
=
config
.
getString
(
Key
.
USERNAME
);
String
password
=
config
.
getString
(
Key
.
PASSWORD
);
try
{
Cluster
cluster
=
Cluster
.
build
(
host
).
port
(
port
).
credentials
(
username
,
password
)
.
serializer
(
Serializers
.
GRAPHBINARY_V1D0
)
.
maxContentLength
(
1024
*
1024
)
.
resultIterationBatchSize
(
64
)
.
create
();
client
=
cluster
.
connect
().
init
();
warmClient
();
}
catch
(
RuntimeException
e
)
{
log
.
error
(
"Failed to connect to GDB {}:{}, due to {}"
,
host
,
port
,
e
);
throw
e
;
}
}
protected
List
<
Result
>
runInternal
(
String
dsl
,
Map
<
String
,
Object
>
params
)
throws
Exception
{
return
runInternalAsync
(
dsl
,
params
).
all
().
get
(
DEFAULT_TIMEOUT
+
1000
,
TimeUnit
.
MILLISECONDS
);
}
protected
ResultSet
runInternalAsync
(
String
dsl
,
Map
<
String
,
Object
>
params
)
throws
Exception
{
RequestOptions
.
Builder
options
=
RequestOptions
.
build
().
timeout
(
DEFAULT_TIMEOUT
);
if
(
params
!=
null
&&
!
params
.
isEmpty
())
{
params
.
forEach
(
options:
:
addParameter
);
}
return
client
.
submitAsync
(
dsl
,
options
.
create
()).
get
(
DEFAULT_TIMEOUT
,
TimeUnit
.
MILLISECONDS
);
}
private
void
warmClient
()
{
try
{
runInternal
(
"g.V('test')"
,
null
);
log
.
info
(
"warm graphdb client over"
);
}
catch
(
Exception
e
)
{
log
.
error
(
"warmClient error"
);
throw
new
RuntimeException
(
e
);
}
}
@Override
public
void
close
()
throws
Exception
{
if
(
client
!=
null
)
{
log
.
info
(
"close graphdb client"
);
client
.
close
();
}
}
}
gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/model/GdbElement.java
0 → 100644
View file @
05d1851d
/*
* (C) 2019-present Alibaba Group Holding Limited.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*/
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
gdbreader
.
model
;
import
lombok.Data
;
import
java.util.HashMap
;
import
java.util.Map
;
/**
* @author : Liu Jianping
* @date : 2019/9/6
*/
@Data
public
class
GdbElement
{
String
id
=
null
;
String
label
=
null
;
String
to
=
null
;
String
from
=
null
;
String
toLabel
=
null
;
String
fromLabel
=
null
;
Map
<
String
,
Object
>
properties
=
new
HashMap
<>();
public
GdbElement
()
{
}
public
GdbElement
(
String
id
,
String
label
)
{
this
.
id
=
id
;
this
.
label
=
label
;
}
}
gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/model/GdbGraph.java
0 → 100644
View file @
05d1851d
/*
* (C) 2019-present Alibaba Group Holding Limited.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*/
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
gdbreader
.
model
;
import
org.apache.tinkerpop.gremlin.driver.ResultSet
;
import
java.util.List
;
import
java.util.Map
;
/**
* @author : Liu Jianping
* @date : 2019/9/6
*/
public
interface
GdbGraph
extends
AutoCloseable
{
/**
* Get All labels of GraphDB
*
* @return labels map included numbers
*/
Map
<
String
,
Long
>
getLabels
();
/**
* Get the Ids list of special 'label', size up to 'limit'
*
* @param label is Label of Vertex or Edge
* @param start of Ids range to get
* @param limit size of Ids list
* @return Ids list
*/
List
<
String
>
fetchIds
(
String
label
,
String
start
,
long
limit
);
/**
* Fetch element in async mode, just send query dsl to server
*
* @param label node label to filter
* @param start range begin(included)
* @param end range end(included)
* @param propNames propKey list to fetch
* @return future to get result later
*/
ResultSet
fetchElementsAsync
(
String
label
,
String
start
,
String
end
,
List
<
String
>
propNames
);
/**
* Get get element from Response @{ResultSet}
*
* @param results Response of Server
* @return element sets
*/
List
<
GdbElement
>
getElement
(
ResultSet
results
);
/**
* close graph client
*
* @throws Exception if fails
*/
@Override
void
close
()
throws
Exception
;
}
gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/model/ScriptGdbGraph.java
0 → 100644
View file @
05d1851d
/*
* (C) 2019-present Alibaba Group Holding Limited.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*/
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
gdbreader
.
model
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.reader.gdbreader.Key.ExportType
;
import
org.apache.tinkerpop.gremlin.driver.Result
;
import
org.apache.tinkerpop.gremlin.driver.ResultSet
;
import
org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceEdge
;
import
org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertex
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.LinkedList
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.TimeUnit
;
/**
* @author : Liu Jianping
* @date : 2019/9/6
*/
public
class
ScriptGdbGraph
extends
AbstractGdbGraph
{
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
ScriptGdbGraph
.
class
);
private
final
static
String
LABEL
=
"GDB___LABEL"
;
private
final
static
String
START_ID
=
"GDB___ID"
;
private
final
static
String
END_ID
=
"GDB___ID_END"
;
private
final
static
String
LIMIT
=
"GDB___LIMIT"
;
private
final
static
String
FETCH_VERTEX_IDS_DSL
=
"g.V().hasLabel("
+
LABEL
+
").has(id, gt("
+
START_ID
+
")).limit("
+
LIMIT
+
").id()"
;
private
final
static
String
FETCH_EDGE_IDS_DSL
=
"g.E().hasLabel("
+
LABEL
+
").has(id, gt("
+
START_ID
+
")).limit("
+
LIMIT
+
").id()"
;
private
final
static
String
FETCH_VERTEX_LABELS_DSL
=
"g.V().groupCount().by(label)"
;
private
final
static
String
FETCH_EDGE_LABELS_DSL
=
"g.E().groupCount().by(label)"
;
/**
* fetch node range [START_ID, END_ID]
*/
private
final
static
String
FETCH_RANGE_VERTEX_DSL
=
"g.V().hasLabel("
+
LABEL
+
").has(id, gte("
+
START_ID
+
")).has(id, lte("
+
END_ID
+
"))"
;
private
final
static
String
FETCH_RANGE_EDGE_DSL
=
"g.E().hasLabel("
+
LABEL
+
").has(id, gte("
+
START_ID
+
")).has(id, lte("
+
END_ID
+
"))"
;
private
final
static
String
PART_WITH_PROP_DSL
=
".as('a').project('node', 'props').by(select('a')).by(select('a').propertyMap("
;
private
final
ExportType
exportType
;
public
ScriptGdbGraph
(
ExportType
exportType
)
{
super
();
this
.
exportType
=
exportType
;
}
public
ScriptGdbGraph
(
Configuration
config
,
ExportType
exportType
)
{
super
(
config
);
this
.
exportType
=
exportType
;
}
@Override
public
List
<
String
>
fetchIds
(
final
String
label
,
final
String
start
,
long
limit
)
{
Map
<
String
,
Object
>
params
=
new
HashMap
<
String
,
Object
>(
3
)
{{
put
(
LABEL
,
label
);
put
(
START_ID
,
start
);
put
(
LIMIT
,
limit
);
}};
String
fetchDsl
=
exportType
==
ExportType
.
VERTEX
?
FETCH_VERTEX_IDS_DSL
:
FETCH_EDGE_IDS_DSL
;
List
<
String
>
ids
=
new
ArrayList
<>();
try
{
List
<
Result
>
results
=
runInternal
(
fetchDsl
,
params
);
// transfer result to id string
results
.
forEach
(
id
->
ids
.
add
(
id
.
getString
()));
}
catch
(
Exception
e
)
{
log
.
error
(
"fetch range node failed, label {}, start {}"
,
label
,
start
);
throw
new
RuntimeException
(
e
);
}
return
ids
;
}
@Override
public
ResultSet
fetchElementsAsync
(
final
String
label
,
final
String
start
,
final
String
end
,
final
List
<
String
>
propNames
)
{
Map
<
String
,
Object
>
params
=
new
HashMap
<>(
3
);
params
.
put
(
LABEL
,
label
);
params
.
put
(
START_ID
,
start
);
params
.
put
(
END_ID
,
end
);
String
prefixDsl
=
exportType
==
ExportType
.
VERTEX
?
FETCH_RANGE_VERTEX_DSL
:
FETCH_RANGE_EDGE_DSL
;
StringBuilder
fetchDsl
=
new
StringBuilder
(
prefixDsl
);
if
(
propNames
!=
null
)
{
fetchDsl
.
append
(
PART_WITH_PROP_DSL
);
for
(
int
i
=
0
;
i
<
propNames
.
size
();
i
++)
{
String
propName
=
"GDB___PK"
+
String
.
valueOf
(
i
);
params
.
put
(
propName
,
propNames
.
get
(
i
));
fetchDsl
.
append
(
propName
);
if
(
i
!=
propNames
.
size
()
-
1
)
{
fetchDsl
.
append
(
", "
);
}
}
fetchDsl
.
append
(
"))"
);
}
try
{
return
runInternalAsync
(
fetchDsl
.
toString
(),
params
);
}
catch
(
Exception
e
)
{
log
.
error
(
"Failed to fetch range node startId {}, end {} , e {}"
,
start
,
end
,
e
);
throw
new
RuntimeException
(
e
);
}
}
@Override
@SuppressWarnings
(
"unchecked"
)
public
List
<
GdbElement
>
getElement
(
ResultSet
results
)
{
List
<
GdbElement
>
elements
=
new
LinkedList
<>();
try
{
List
<
Result
>
resultList
=
results
.
all
().
get
(
DEFAULT_TIMEOUT
+
1000
,
TimeUnit
.
MILLISECONDS
);
resultList
.
forEach
(
n
->
{
Object
o
=
n
.
getObject
();
GdbElement
element
=
new
GdbElement
();
if
(
o
instanceof
Map
)
{
// project response
Object
node
=
((
Map
)
o
).
get
(
"node"
);
Object
props
=
((
Map
)
o
).
get
(
"props"
);
mapNodeToElement
(
node
,
element
);
mapPropToElement
((
Map
<
String
,
Object
>)
props
,
element
);
}
else
{
// range node response
mapNodeToElement
(
n
.
getObject
(),
element
);
}
if
(
element
.
getId
()
!=
null
)
{
elements
.
add
(
element
);
}
});
}
catch
(
Exception
e
)
{
log
.
error
(
"Failed to get node: {}"
,
e
);
throw
new
RuntimeException
(
e
);
}
return
elements
;
}
private
void
mapNodeToElement
(
Object
node
,
GdbElement
element
)
{
if
(
node
instanceof
ReferenceVertex
)
{
ReferenceVertex
v
=
(
ReferenceVertex
)
node
;
element
.
setId
((
String
)
v
.
id
());
element
.
setLabel
(
v
.
label
());
}
else
if
(
node
instanceof
ReferenceEdge
)
{
ReferenceEdge
e
=
(
ReferenceEdge
)
node
;
element
.
setId
((
String
)
e
.
id
());
element
.
setLabel
(
e
.
label
());
element
.
setTo
((
String
)
e
.
inVertex
().
id
());
element
.
setToLabel
(
e
.
inVertex
().
label
());
element
.
setFrom
((
String
)
e
.
outVertex
().
id
());
element
.
setFromLabel
(
e
.
outVertex
().
label
());
}
}
private
void
mapPropToElement
(
Map
<
String
,
Object
>
props
,
GdbElement
element
)
{
element
.
setProperties
(
props
);
}
@Override
public
Map
<
String
,
Long
>
getLabels
()
{
String
dsl
=
exportType
==
ExportType
.
VERTEX
?
FETCH_VERTEX_LABELS_DSL
:
FETCH_EDGE_LABELS_DSL
;
try
{
List
<
Result
>
results
=
runInternal
(
dsl
,
null
);
Map
<
String
,
Long
>
labelMap
=
new
HashMap
<>(
2
);
Map
<?,
?>
labels
=
results
.
get
(
0
).
get
(
Map
.
class
);
labels
.
forEach
((
k
,
v
)
->
{
String
label
=
(
String
)
k
;
Long
count
=
(
Long
)
v
;
labelMap
.
put
(
label
,
count
);
});
return
labelMap
;
}
catch
(
Exception
e
)
{
log
.
error
(
"Failed to fetch label list, please give special labels and run again, e {}"
,
e
);
throw
new
RuntimeException
(
e
);
}
}
}
gdbreader/src/main/java/com/alibaba/datax/plugin/reader/gdbreader/util/ConfigHelper.java
0 → 100644
View file @
05d1851d
/*
* (C) 2019-present Alibaba Group Holding Limited.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*/
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
gdbreader
.
util
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.reader.gdbreader.GdbReaderErrorCode
;
import
com.alibaba.datax.plugin.reader.gdbreader.Key
;
import
org.apache.commons.lang3.StringUtils
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.function.Supplier
;
/**
* @author : Liu Jianping
* @date : 2019/9/6
*/
public
interface
ConfigHelper
{
static
void
assertConfig
(
String
key
,
Supplier
<
Boolean
>
f
)
{
if
(!
f
.
get
())
{
throw
DataXException
.
asDataXException
(
GdbReaderErrorCode
.
BAD_CONFIG_VALUE
,
key
);
}
}
static
void
assertHasContent
(
Configuration
config
,
String
key
)
{
assertConfig
(
key
,
()
->
StringUtils
.
isNotBlank
(
config
.
getString
(
key
)));
}
static
void
assertGdbClient
(
Configuration
config
)
{
assertHasContent
(
config
,
Key
.
HOST
);
assertConfig
(
Key
.
PORT
,
()
->
config
.
getInt
(
Key
.
PORT
)
>
0
);
assertHasContent
(
config
,
Key
.
USERNAME
);
assertHasContent
(
config
,
Key
.
PASSWORD
);
}
static
List
<
String
>
assertLabels
(
Configuration
config
)
{
Object
labels
=
config
.
get
(
Key
.
LABEL
);
if
(!(
labels
instanceof
List
))
{
throw
DataXException
.
asDataXException
(
GdbReaderErrorCode
.
BAD_CONFIG_VALUE
,
"labels should be List"
);
}
List
<?>
list
=
(
List
<?>)
labels
;
List
<
String
>
configLabels
=
new
ArrayList
<>(
0
);
list
.
forEach
(
n
->
configLabels
.
add
(
String
.
valueOf
(
n
)));
return
configLabels
;
}
static
List
<
Configuration
>
splitConfig
(
Configuration
config
,
List
<
String
>
labels
)
{
List
<
Configuration
>
configs
=
new
ArrayList
<>();
for
(
String
label
:
labels
)
{
Configuration
conf
=
config
.
clone
();
conf
.
set
(
Key
.
LABEL
,
label
);
configs
.
add
(
conf
);
}
return
configs
;
}
static
Configuration
fromClasspath
(
String
name
)
{
try
(
InputStream
is
=
Thread
.
currentThread
().
getContextClassLoader
().
getResourceAsStream
(
name
))
{
return
Configuration
.
from
(
is
);
}
catch
(
IOException
e
)
{
throw
new
IllegalArgumentException
(
"File not found: "
+
name
);
}
}
}
gdbreader/src/main/resources/plugin.json
0 → 100644
View file @
05d1851d
{
"name"
:
"gdbreader"
,
"class"
:
"com.alibaba.datax.plugin.reader.gdbreader.GdbReader"
,
"description"
:
"useScene: prod. mechanism: connect GDB with gremlin-client, execute 'g.V().propertyMap() or g.E().propertyMap()' to get record"
,
"developer"
:
"alibaba"
}
\ No newline at end of file
gdbreader/src/main/resources/plugin_job_template.json
0 → 100644
View file @
05d1851d
{
"job"
:
{
"setting"
:
{
"speed"
:
{
"channel"
:
1
},
"errorLimit"
:
{
"record"
:
1
}
},
"content"
:
[
{
"reader"
:
{
"name"
:
"gdbreader"
,
"parameter"
:
{
"host"
:
"10.218.145.24"
,
"port"
:
8182
,
"username"
:
"***"
,
"password"
:
"***"
,
"labelType"
:
"EDGE"
,
"labels"
:
[
"label1"
,
"label2"
],
"column"
:
[
{
"name"
:
"id"
,
"type"
:
"string"
,
"columnType"
:
"primaryKey"
},
{
"name"
:
"label"
,
"type"
:
"string"
,
"columnType"
:
"primaryLabel"
},
{
"name"
:
"srcId"
,
"type"
:
"string"
,
"columnType"
:
"srcPrimaryKey"
},
{
"name"
:
"srcLabel"
,
"type"
:
"string"
,
"columnType"
:
"srcPrimaryLabel"
},
{
"name"
:
"dstId"
,
"type"
:
"string"
,
"columnType"
:
"srcPrimaryKey"
},
{
"name"
:
"dstLabel"
,
"type"
:
"string"
,
"columnType"
:
"srcPrimaryLabel"
},
{
"name"
:
"name"
,
"type"
:
"string"
,
"columnType"
:
"edgeProperty"
},
{
"name"
:
"weight"
,
"type"
:
"double"
,
"columnType"
:
"edgeProperty"
}
]
}
},
"writer"
:
{
"name"
:
"streamwriter"
,
"parameter"
:
{
"print"
:
true
}
}
}
]
}
}
\ No newline at end of file
pom.xml
View file @
05d1851d
...
...
@@ -66,6 +66,7 @@
<module>
tsdbreader
</module>
<module>
opentsdbreader
</module>
<module>
cassandrareader
</module>
<module>
gdbreader
</module>
<!-- writer -->
<module>
mysqlwriter
</module>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment