Commit 2484343a authored by Liu Jianping's avatar Liu Jianping

gdbwriter: update to support set-property

1. 添加id/label/属性字段长度限制
2. 添加对reader列索引格式,增加'#{i}'支持,同时兼容原格式
2. 添加SET属性导入支持
parent 643b6e9c
package com.alibaba.datax.plugin.writer.gdbwriter; package com.alibaba.datax.plugin.writer.gdbwriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Function;
import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.plugin.RecordReceiver;
...@@ -18,13 +13,21 @@ import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRule; ...@@ -18,13 +13,21 @@ import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRule;
import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRuleFactory; import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MappingRuleFactory;
import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement; import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbElement;
import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbGraph; import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbGraph;
import groovy.lang.Tuple2; import groovy.lang.Tuple2;
import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
public class GdbWriter extends Writer { public class GdbWriter extends Writer {
private static final Logger log = LoggerFactory.getLogger(GdbWriter.class); private static final Logger log = LoggerFactory.getLogger(GdbWriter.class);
...@@ -36,6 +39,7 @@ public class GdbWriter extends Writer { ...@@ -36,6 +39,7 @@ public class GdbWriter extends Writer {
* Job 中的方法仅执行一次,Task 中方法会由框架启动多个 Task 线程并行执行。 * Job 中的方法仅执行一次,Task 中方法会由框架启动多个 Task 线程并行执行。
* <p/> * <p/>
* 整个 Writer 执行流程是: * 整个 Writer 执行流程是:
*
* <pre> * <pre>
* Job类init-->prepare-->split * Job类init-->prepare-->split
* *
...@@ -46,8 +50,7 @@ public class GdbWriter extends Writer { ...@@ -46,8 +50,7 @@ public class GdbWriter extends Writer {
* </pre> * </pre>
*/ */
public static class Job extends Writer.Job { public static class Job extends Writer.Job {
private static final Logger LOG = LoggerFactory private static final Logger LOG = LoggerFactory.getLogger(Job.class);
.getLogger(Job.class);
private Configuration jobConfig = null; private Configuration jobConfig = null;
...@@ -73,30 +76,30 @@ public class GdbWriter extends Writer { ...@@ -73,30 +76,30 @@ public class GdbWriter extends Writer {
*/ */
super.prepare(); super.prepare();
MappingRule rule = MappingRuleFactory.getInstance().createV2(jobConfig); final MappingRule rule = MappingRuleFactory.getInstance().createV2(this.jobConfig);
mapper = new DefaultGdbMapper().getMapper(rule); mapper = new DefaultGdbMapper(this.jobConfig).getMapper(rule);
session = jobConfig.getBool(Key.SESSION_STATE, false); session = this.jobConfig.getBool(Key.SESSION_STATE, false);
/** /**
* client connect check before task * client connect check before task
*/ */
try { try {
globalGraph = GdbGraphManager.instance().getGraph(jobConfig, false); globalGraph = GdbGraphManager.instance().getGraph(this.jobConfig, false);
} catch (RuntimeException e) { } catch (final RuntimeException e) {
throw DataXException.asDataXException(GdbWriterErrorCode.FAIL_CLIENT_CONNECT, e.getMessage()); throw DataXException.asDataXException(GdbWriterErrorCode.FAIL_CLIENT_CONNECT, e.getMessage());
} }
} }
@Override @Override
public List<Configuration> split(int mandatoryNumber) { public List<Configuration> split(final int mandatoryNumber) {
/** /**
* 注意:此方法仅执行一次。 * 注意:此方法仅执行一次。
* 最佳实践:通常采用工具静态类完成把 Job 配置切分成多个 Task 配置的工作。 * 最佳实践:通常采用工具静态类完成把 Job 配置切分成多个 Task 配置的工作。
* 这里的 mandatoryNumber 是强制必须切分的份数。 * 这里的 mandatoryNumber 是强制必须切分的份数。
*/ */
LOG.info("split begin..."); LOG.info("split begin...");
List<Configuration> configurationList = new ArrayList<Configuration>(); final List<Configuration> configurationList = new ArrayList<Configuration>();
for (int i = 0; i < mandatoryNumber; i++) { for (int i = 0; i < mandatoryNumber; i++) {
configurationList.add(this.jobConfig.clone()); configurationList.add(this.jobConfig.clone());
} }
...@@ -140,22 +143,22 @@ public class GdbWriter extends Writer { ...@@ -140,22 +143,22 @@ public class GdbWriter extends Writer {
* 最佳实践:此处通过对 taskConfig 配置的读取,进而初始化一些资源为 startWrite()做准备。 * 最佳实践:此处通过对 taskConfig 配置的读取,进而初始化一些资源为 startWrite()做准备。
*/ */
this.taskConfig = super.getPluginJobConf(); this.taskConfig = super.getPluginJobConf();
batchRecords = taskConfig.getInt(Key.MAX_RECORDS_IN_BATCH, GdbWriterConfig.DEFAULT_RECORD_NUM_IN_BATCH); this.batchRecords = this.taskConfig.getInt(Key.MAX_RECORDS_IN_BATCH, GdbWriterConfig.DEFAULT_RECORD_NUM_IN_BATCH);
submitService = new ThreadPoolExecutor(1, 1, 0L, this.submitService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(),
TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(), new DefaultThreadFactory("submit-dsl")); new DefaultThreadFactory("submit-dsl"));
if (!session) { if (!session) {
graph = globalGraph; this.graph = globalGraph;
} else { } else {
/** /**
* 分批创建session client,由于服务端groovy编译性能的限制 * 分批创建session client,由于服务端groovy编译性能的限制
*/ */
try { try {
Thread.sleep((getTaskId()/10)*10000); Thread.sleep((getTaskId() / 10) * 10000);
} catch (Exception e) { } catch (final Exception e) {
// ... // ...
} }
graph = GdbGraphManager.instance().getGraph(taskConfig, session); this.graph = GdbGraphManager.instance().getGraph(this.taskConfig, session);
} }
} }
...@@ -169,56 +172,61 @@ public class GdbWriter extends Writer { ...@@ -169,56 +172,61 @@ public class GdbWriter extends Writer {
} }
@Override @Override
public void startWrite(RecordReceiver recordReceiver) { public void startWrite(final RecordReceiver recordReceiver) {
/** /**
* 注意:此方法每个 Task 都会执行一次。 * 注意:此方法每个 Task 都会执行一次。
* 最佳实践:此处适当封装确保简洁清晰完成数据写入工作。 * 最佳实践:此处适当封装确保简洁清晰完成数据写入工作。
*/ */
Record r; Record r;
Future<Boolean> future = null; Future<Boolean> future = null;
List<Tuple2<Record, GdbElement>> records = new ArrayList<>(batchRecords); List<Tuple2<Record, GdbElement>> records = new ArrayList<>(this.batchRecords);
while ((r = recordReceiver.getFromReader()) != null) { while ((r = recordReceiver.getFromReader()) != null) {
try {
records.add(new Tuple2<>(r, mapper.apply(r))); records.add(new Tuple2<>(r, mapper.apply(r)));
} catch (final Exception ex) {
getTaskPluginCollector().collectDirtyRecord(r, ex);
continue;
}
if (records.size() >= batchRecords) { if (records.size() >= this.batchRecords) {
wait4Submit(future); wait4Submit(future);
final List<Tuple2<Record, GdbElement>> batch = records; final List<Tuple2<Record, GdbElement>> batch = records;
future = submitService.submit(() -> batchCommitRecords(batch)); future = this.submitService.submit(() -> batchCommitRecords(batch));
records = new ArrayList<>(batchRecords); records = new ArrayList<>(this.batchRecords);
} }
} }
wait4Submit(future); wait4Submit(future);
if (!records.isEmpty()) { if (!records.isEmpty()) {
final List<Tuple2<Record, GdbElement>> batch = records; final List<Tuple2<Record, GdbElement>> batch = records;
future = submitService.submit(() -> batchCommitRecords(batch)); future = this.submitService.submit(() -> batchCommitRecords(batch));
wait4Submit(future); wait4Submit(future);
} }
} }
private void wait4Submit(Future<Boolean> future) { private void wait4Submit(final Future<Boolean> future) {
if (future == null) { if (future == null) {
return; return;
} }
try { try {
future.get(); future.get();
} catch (Exception e) { } catch (final Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
private boolean batchCommitRecords(final List<Tuple2<Record, GdbElement>> records) { private boolean batchCommitRecords(final List<Tuple2<Record, GdbElement>> records) {
TaskPluginCollector collector = getTaskPluginCollector(); final TaskPluginCollector collector = getTaskPluginCollector();
try { try {
List<Tuple2<Record, Exception>> errors = graph.add(records); final List<Tuple2<Record, Exception>> errors = this.graph.add(records);
errors.forEach(t -> collector.collectDirtyRecord(t.getFirst(), t.getSecond())); errors.forEach(t -> collector.collectDirtyRecord(t.getFirst(), t.getSecond()));
failed += errors.size(); this.failed += errors.size();
} catch (Exception e) { } catch (final Exception e) {
records.forEach(t -> collector.collectDirtyRecord(t.getFirst(), e)); records.forEach(t -> collector.collectDirtyRecord(t.getFirst(), e));
failed += records.size(); this.failed += records.size();
} }
records.clear(); records.clear();
...@@ -231,7 +239,7 @@ public class GdbWriter extends Writer { ...@@ -231,7 +239,7 @@ public class GdbWriter extends Writer {
* 注意:此方法每个 Task 都会执行一次。 * 注意:此方法每个 Task 都会执行一次。
* 最佳实践:如果 Task 中有需要进行数据同步之后的后续处理,可以在此处完成。 * 最佳实践:如果 Task 中有需要进行数据同步之后的后续处理,可以在此处完成。
*/ */
log.info("Task done, dirty record count - {}", failed); log.info("Task done, dirty record count - {}", this.failed);
} }
@Override @Override
...@@ -241,9 +249,9 @@ public class GdbWriter extends Writer { ...@@ -241,9 +249,9 @@ public class GdbWriter extends Writer {
* 最佳实践:通常配合Task 中的 post() 方法一起完成 Task 的资源释放。 * 最佳实践:通常配合Task 中的 post() 方法一起完成 Task 的资源释放。
*/ */
if (session) { if (session) {
graph.close(); this.graph.close();
} }
submitService.shutdown(); this.submitService.shutdown();
} }
} }
......
...@@ -27,7 +27,6 @@ public enum GdbWriterErrorCode implements ErrorCode { ...@@ -27,7 +27,6 @@ public enum GdbWriterErrorCode implements ErrorCode {
@Override @Override
public String toString() { public String toString() {
return String.format("Code:[%s], Description:[%s]. ", this.code, return String.format("Code:[%s], Description:[%s]. ", this.code, this.description);
this.description);
} }
} }
\ No newline at end of file
...@@ -63,6 +63,17 @@ public final class Key { ...@@ -63,6 +63,17 @@ public final class Key {
public static final String MAX_RECORDS_IN_BATCH = "maxRecordsInBatch"; public static final String MAX_RECORDS_IN_BATCH = "maxRecordsInBatch";
public static final String SESSION_STATE = "session"; public static final String SESSION_STATE = "session";
/**
* request length limit, include gdb element string length GDB字段长度限制配置,可分别配置各字段的限制,超过限制的记录会当脏数据处理
*/
public static final String MAX_GDB_STRING_LENGTH = "maxStringLengthLimit";
public static final String MAX_GDB_ID_LENGTH = "maxIdStringLengthLimit";
public static final String MAX_GDB_LABEL_LENGTH = "maxLabelStringLengthLimit";
public static final String MAX_GDB_PROP_KEY_LENGTH = "maxPropKeyStringLengthLimit";
public static final String MAX_GDB_PROP_VALUE_LENGTH = "maxPropValueStringLengthLimit";
public static final String MAX_GDB_REQUEST_LENGTH = "maxRequestLengthLimit";
public static enum ImportType { public static enum ImportType {
/** /**
* Import vertices * Import vertices
...@@ -100,6 +111,11 @@ public final class Key { ...@@ -100,6 +111,11 @@ public final class Key {
*/ */
vertexProperty, vertexProperty,
/**
* vertex setProperty
*/
vertexSetProperty,
/** /**
* start vertex id of edge * start vertex id of edge
*/ */
...@@ -138,4 +154,16 @@ public final class Key { ...@@ -138,4 +154,16 @@ public final class Key {
none none
} }
public static enum PropertyType {
/**
* single Vertex Property
*/
single,
/**
* set Vertex Property
*/
set
}
} }
...@@ -3,37 +3,37 @@ ...@@ -3,37 +3,37 @@
*/ */
package com.alibaba.datax.plugin.writer.gdbwriter.client; package com.alibaba.datax.plugin.writer.gdbwriter.client;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbGraph; import com.alibaba.datax.plugin.writer.gdbwriter.model.GdbGraph;
import com.alibaba.datax.plugin.writer.gdbwriter.model.ScriptGdbGraph; import com.alibaba.datax.plugin.writer.gdbwriter.model.ScriptGdbGraph;
import java.util.ArrayList;
import java.util.List;
/** /**
* @author jerrywang * @author jerrywang
* *
*/ */
public class GdbGraphManager implements AutoCloseable { public class GdbGraphManager implements AutoCloseable {
private static final GdbGraphManager instance = new GdbGraphManager(); private static final GdbGraphManager INSTANCE = new GdbGraphManager();
private List<GdbGraph> graphs = new ArrayList<>(); private List<GdbGraph> graphs = new ArrayList<>();
public static GdbGraphManager instance() { public static GdbGraphManager instance() {
return instance; return INSTANCE;
} }
public GdbGraph getGraph(Configuration config, boolean session) { public GdbGraph getGraph(final Configuration config, final boolean session) {
GdbGraph graph = new ScriptGdbGraph(config, session); final GdbGraph graph = new ScriptGdbGraph(config, session);
graphs.add(graph); this.graphs.add(graph);
return graph; return graph;
} }
@Override @Override
public void close() { public void close() {
for(GdbGraph graph : graphs) { for (final GdbGraph graph : this.graphs) {
graph.close(); graph.close();
} }
graphs.clear(); this.graphs.clear();
} }
} }
...@@ -3,11 +3,12 @@ ...@@ -3,11 +3,12 @@
*/ */
package com.alibaba.datax.plugin.writer.gdbwriter.client; package com.alibaba.datax.plugin.writer.gdbwriter.client;
import static com.alibaba.datax.plugin.writer.gdbwriter.util.ConfigHelper.assertConfig;
import static com.alibaba.datax.plugin.writer.gdbwriter.util.ConfigHelper.assertHasContent;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.gdbwriter.Key; import com.alibaba.datax.plugin.writer.gdbwriter.Key;
import static com.alibaba.datax.plugin.writer.gdbwriter.util.ConfigHelper.*;
/** /**
* @author jerrywang * @author jerrywang
* *
...@@ -19,23 +20,26 @@ public class GdbWriterConfig { ...@@ -19,23 +20,26 @@ public class GdbWriterConfig {
public static final int DEFAULT_BATCH_PROPERTY_NUM = 30; public static final int DEFAULT_BATCH_PROPERTY_NUM = 30;
public static final int DEFAULT_RECORD_NUM_IN_BATCH = 16; public static final int DEFAULT_RECORD_NUM_IN_BATCH = 16;
public static final int MAX_STRING_LENGTH = 10240;
public static final int MAX_REQUEST_LENGTH = 65535 - 1000;
private Configuration config; private Configuration config;
private GdbWriterConfig(Configuration config) { private GdbWriterConfig(final Configuration config) {
this.config = config; this.config = config;
validate(); validate();
} }
private void validate() { public static GdbWriterConfig of(final Configuration config) {
assertHasContent(config, Key.HOST); return new GdbWriterConfig(config);
assertConfig(Key.PORT, () -> config.getInt(Key.PORT) > 0);
assertHasContent(config, Key.USERNAME);
assertHasContent(config, Key.PASSWORD);
} }
public static GdbWriterConfig of(Configuration config) { private void validate() {
return new GdbWriterConfig(config); assertHasContent(this.config, Key.HOST);
assertConfig(Key.PORT, () -> this.config.getInt(Key.PORT) > 0);
assertHasContent(this.config, Key.USERNAME);
assertHasContent(this.config, Key.PASSWORD);
} }
} }
/*
* (C) 2019-present Alibaba Group Holding Limited.
*
* This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public
* License version 2 as published by the Free Software Foundation.
*/
package com.alibaba.datax.plugin.writer.gdbwriter.mapping;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.gdbwriter.Key;
import com.alibaba.datax.plugin.writer.gdbwriter.client.GdbWriterConfig;
/**
* @author : Liu Jianping
* @date : 2019/10/15
*/
public class MapperConfig {
private static MapperConfig instance = new MapperConfig();
private int maxIdLength;
private int maxLabelLength;
private int maxPropKeyLength;
private int maxPropValueLength;
private MapperConfig() {
this.maxIdLength = GdbWriterConfig.MAX_STRING_LENGTH;
this.maxLabelLength = GdbWriterConfig.MAX_STRING_LENGTH;
this.maxPropKeyLength = GdbWriterConfig.MAX_STRING_LENGTH;
this.maxPropValueLength = GdbWriterConfig.MAX_STRING_LENGTH;
}
public static MapperConfig getInstance() {
return instance;
}
public void updateConfig(final Configuration config) {
final int length = config.getInt(Key.MAX_GDB_STRING_LENGTH, GdbWriterConfig.MAX_STRING_LENGTH);
Integer sLength = config.getInt(Key.MAX_GDB_ID_LENGTH);
this.maxIdLength = sLength == null ? length : sLength;
sLength = config.getInt(Key.MAX_GDB_LABEL_LENGTH);
this.maxLabelLength = sLength == null ? length : sLength;
sLength = config.getInt(Key.MAX_GDB_PROP_KEY_LENGTH);
this.maxPropKeyLength = sLength == null ? length : sLength;
sLength = config.getInt(Key.MAX_GDB_PROP_VALUE_LENGTH);
this.maxPropValueLength = sLength == null ? length : sLength;
}
public int getMaxIdLength() {
return this.maxIdLength;
}
public int getMaxLabelLength() {
return this.maxLabelLength;
}
public int getMaxPropKeyLength() {
return this.maxPropKeyLength;
}
public int getMaxPropValueLength() {
return this.maxPropValueLength;
}
}
...@@ -7,6 +7,7 @@ import java.util.ArrayList; ...@@ -7,6 +7,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType; import com.alibaba.datax.plugin.writer.gdbwriter.Key.ImportType;
import com.alibaba.datax.plugin.writer.gdbwriter.Key.PropertyType;
import lombok.Data; import lombok.Data;
...@@ -30,6 +31,8 @@ public class MappingRule { ...@@ -30,6 +31,8 @@ public class MappingRule {
private String propertiesJsonStr = null; private String propertiesJsonStr = null;
private boolean numPattern = false;
@Data @Data
public static class PropertyMappingRule { public static class PropertyMappingRule {
private String key = null; private String key = null;
...@@ -37,5 +40,7 @@ public class MappingRule { ...@@ -37,5 +40,7 @@ public class MappingRule {
private String value = null; private String value = null;
private ValueType valueType = null; private ValueType valueType = null;
private PropertyType pType = PropertyType.single;
} }
} }
...@@ -8,6 +8,7 @@ import java.util.Map; ...@@ -8,6 +8,7 @@ import java.util.Map;
import java.util.function.Function; import java.util.function.Function;
import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.Column;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
/** /**
...@@ -16,7 +17,11 @@ import lombok.extern.slf4j.Slf4j; ...@@ -16,7 +17,11 @@ import lombok.extern.slf4j.Slf4j;
*/ */
@Slf4j @Slf4j
public enum ValueType { public enum ValueType {
/**
* property value type
*/
INT(Integer.class, "int", Column::asLong, Integer::valueOf), INT(Integer.class, "int", Column::asLong, Integer::valueOf),
INTEGER(Integer.class, "integer", Column::asLong, Integer::valueOf),
LONG(Long.class, "long", Column::asLong, Long::valueOf), LONG(Long.class, "long", Column::asLong, Long::valueOf),
DOUBLE(Double.class, "double", Column::asDouble, Double::valueOf), DOUBLE(Double.class, "double", Column::asDouble, Double::valueOf),
FLOAT(Float.class, "float", Column::asDouble, Float::valueOf), FLOAT(Float.class, "float", Column::asDouble, Float::valueOf),
...@@ -28,7 +33,8 @@ public enum ValueType { ...@@ -28,7 +33,8 @@ public enum ValueType {
private Function<Column, Object> columnFunc = null; private Function<Column, Object> columnFunc = null;
private Function<String, Object> fromStrFunc = null; private Function<String, Object> fromStrFunc = null;
private ValueType(Class<?> type, String name, Function<Column, Object> columnFunc, Function<String, Object> fromStrFunc) { private ValueType(final Class<?> type, final String name, final Function<Column, Object> columnFunc,
final Function<String, Object> fromStrFunc) {
this.type = type; this.type = type;
this.shortName = name; this.shortName = name;
this.columnFunc = columnFunc; this.columnFunc = columnFunc;
...@@ -37,7 +43,7 @@ public enum ValueType { ...@@ -37,7 +43,7 @@ public enum ValueType {
ValueTypeHolder.shortName2type.put(name, this); ValueTypeHolder.shortName2type.put(name, this);
} }
public static ValueType fromShortName(String name) { public static ValueType fromShortName(final String name) {
return ValueTypeHolder.shortName2type.get(name); return ValueTypeHolder.shortName2type.get(name);
} }
...@@ -49,20 +55,20 @@ public enum ValueType { ...@@ -49,20 +55,20 @@ public enum ValueType {
return this.shortName; return this.shortName;
} }
public Object applyColumn(Column column) { public Object applyColumn(final Column column) {
try { try {
if (column == null) { if (column == null) {
return null; return null;
} }
return columnFunc.apply(column); return this.columnFunc.apply(column);
} catch (Exception e) { } catch (final Exception e) {
log.error("applyColumn error {}, column {}", e.toString(), column); log.error("applyColumn error {}, column {}", e.toString(), column);
throw e; throw e;
} }
} }
public Object fromStrFunc(String str) { public Object fromStrFunc(final String str) {
return fromStrFunc.apply(str); return this.fromStrFunc.apply(str);
} }
private static class ValueTypeHolder { private static class ValueTypeHolder {
......
...@@ -3,20 +3,24 @@ ...@@ -3,20 +3,24 @@
*/ */
package com.alibaba.datax.plugin.writer.gdbwriter.model; package com.alibaba.datax.plugin.writer.gdbwriter.model;
import com.alibaba.datax.common.util.Configuration; import static com.alibaba.datax.plugin.writer.gdbwriter.client.GdbWriterConfig.DEFAULT_BATCH_PROPERTY_NUM;
import com.alibaba.datax.plugin.writer.gdbwriter.Key; import static com.alibaba.datax.plugin.writer.gdbwriter.client.GdbWriterConfig.MAX_REQUEST_LENGTH;
import com.alibaba.datax.plugin.writer.gdbwriter.client.GdbWriterConfig;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.tinkerpop.gremlin.driver.Client; import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster; import org.apache.tinkerpop.gremlin.driver.Cluster;
import org.apache.tinkerpop.gremlin.driver.RequestOptions; import org.apache.tinkerpop.gremlin.driver.RequestOptions;
import org.apache.tinkerpop.gremlin.driver.ResultSet; import org.apache.tinkerpop.gremlin.driver.ResultSet;
import org.apache.tinkerpop.gremlin.driver.ser.Serializers; import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
import java.util.Map; import com.alibaba.datax.common.util.Configuration;
import java.util.UUID; import com.alibaba.datax.plugin.writer.gdbwriter.Key;
import java.util.concurrent.TimeUnit; import com.alibaba.datax.plugin.writer.gdbwriter.client.GdbWriterConfig;
import lombok.extern.slf4j.Slf4j;
/** /**
* @author jerrywang * @author jerrywang
...@@ -28,28 +32,28 @@ public abstract class AbstractGdbGraph implements GdbGraph { ...@@ -28,28 +32,28 @@ public abstract class AbstractGdbGraph implements GdbGraph {
protected Client client = null; protected Client client = null;
protected Key.UpdateMode updateMode = Key.UpdateMode.INSERT; protected Key.UpdateMode updateMode = Key.UpdateMode.INSERT;
protected int propertiesBatchNum = GdbWriterConfig.DEFAULT_BATCH_PROPERTY_NUM; protected int propertiesBatchNum = DEFAULT_BATCH_PROPERTY_NUM;
protected boolean session = false; protected boolean session = false;
protected int maxRequestLength = GdbWriterConfig.MAX_REQUEST_LENGTH;
protected AbstractGdbGraph() {} protected AbstractGdbGraph() {}
protected AbstractGdbGraph(Configuration config, boolean session) { protected AbstractGdbGraph(final Configuration config, final boolean session) {
initClient(config, session); initClient(config, session);
} }
protected void initClient(Configuration config, boolean session) { protected void initClient(final Configuration config, final boolean session) {
updateMode = Key.UpdateMode.valueOf(config.getString(Key.UPDATE_MODE, "INSERT")); this.updateMode = Key.UpdateMode.valueOf(config.getString(Key.UPDATE_MODE, "INSERT"));
log.info("init graphdb client"); log.info("init graphdb client");
String host = config.getString(Key.HOST); final String host = config.getString(Key.HOST);
int port = config.getInt(Key.PORT); final int port = config.getInt(Key.PORT);
String username = config.getString(Key.USERNAME); final String username = config.getString(Key.USERNAME);
String password = config.getString(Key.PASSWORD); final String password = config.getString(Key.PASSWORD);
int maxDepthPerConnection = config.getInt(Key.MAX_IN_PROCESS_PER_CONNECTION, int maxDepthPerConnection =
GdbWriterConfig.DEFAULT_MAX_IN_PROCESS_PER_CONNECTION); config.getInt(Key.MAX_IN_PROCESS_PER_CONNECTION, GdbWriterConfig.DEFAULT_MAX_IN_PROCESS_PER_CONNECTION);
int maxConnectionPoolSize = config.getInt(Key.MAX_CONNECTION_POOL_SIZE, int maxConnectionPoolSize =
GdbWriterConfig.DEFAULT_MAX_CONNECTION_POOL_SIZE); config.getInt(Key.MAX_CONNECTION_POOL_SIZE, GdbWriterConfig.DEFAULT_MAX_CONNECTION_POOL_SIZE);
int maxSimultaneousUsagePerConnection = config.getInt(Key.MAX_SIMULTANEOUS_USAGE_PER_CONNECTION, int maxSimultaneousUsagePerConnection = config.getInt(Key.MAX_SIMULTANEOUS_USAGE_PER_CONNECTION,
GdbWriterConfig.DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION); GdbWriterConfig.DEFAULT_MAX_SIMULTANEOUS_USAGE_PER_CONNECTION);
...@@ -62,79 +66,75 @@ public abstract class AbstractGdbGraph implements GdbGraph { ...@@ -62,79 +66,75 @@ public abstract class AbstractGdbGraph implements GdbGraph {
} }
try { try {
Cluster cluster = Cluster.build(host).port(port).credentials(username, password) final Cluster cluster = Cluster.build(host).port(port).credentials(username, password)
.serializer(Serializers.GRAPHBINARY_V1D0) .serializer(Serializers.GRAPHBINARY_V1D0).maxContentLength(1048576)
.maxContentLength(1048576) .maxInProcessPerConnection(maxDepthPerConnection).minInProcessPerConnection(0)
.maxInProcessPerConnection(maxDepthPerConnection) .maxConnectionPoolSize(maxConnectionPoolSize).minConnectionPoolSize(maxConnectionPoolSize)
.minInProcessPerConnection(0) .maxSimultaneousUsagePerConnection(maxSimultaneousUsagePerConnection).resultIterationBatchSize(64)
.maxConnectionPoolSize(maxConnectionPoolSize)
.minConnectionPoolSize(maxConnectionPoolSize)
.maxSimultaneousUsagePerConnection(maxSimultaneousUsagePerConnection)
.resultIterationBatchSize(64)
.create(); .create();
client = session ? cluster.connect(UUID.randomUUID().toString()).init() : cluster.connect().init(); this.client = session ? cluster.connect(UUID.randomUUID().toString()).init() : cluster.connect().init();
warmClient(maxConnectionPoolSize*maxDepthPerConnection); warmClient(maxConnectionPoolSize * maxDepthPerConnection);
} catch (RuntimeException e) { } catch (final RuntimeException e) {
log.error("Failed to connect to GDB {}:{}, due to {}", host, port, e); log.error("Failed to connect to GDB {}:{}, due to {}", host, port, e);
throw e; throw e;
} }
propertiesBatchNum = config.getInt(Key.MAX_PROPERTIES_BATCH_NUM, GdbWriterConfig.DEFAULT_BATCH_PROPERTY_NUM); this.propertiesBatchNum = config.getInt(Key.MAX_PROPERTIES_BATCH_NUM, DEFAULT_BATCH_PROPERTY_NUM);
this.maxRequestLength = config.getInt(Key.MAX_GDB_REQUEST_LENGTH, MAX_REQUEST_LENGTH);
} }
/** /**
* @param dsl * @param dsl
* @param parameters * @param parameters
*/ */
protected void runInternal(String dsl, final Map<String, Object> parameters) throws Exception { protected void runInternal(final String dsl, final Map<String, Object> parameters) throws Exception {
RequestOptions.Builder options = RequestOptions.build().timeout(DEFAULT_TIMEOUT); final RequestOptions.Builder options = RequestOptions.build().timeout(DEFAULT_TIMEOUT);
if (parameters != null && !parameters.isEmpty()) { if (parameters != null && !parameters.isEmpty()) {
parameters.forEach(options::addParameter); parameters.forEach(options::addParameter);
} }
ResultSet results = client.submitAsync(dsl, options.create()).get(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS); final ResultSet results = this.client.submitAsync(dsl, options.create()).get(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
results.all().get(DEFAULT_TIMEOUT + 1000, TimeUnit.MILLISECONDS); results.all().get(DEFAULT_TIMEOUT + 1000, TimeUnit.MILLISECONDS);
} }
void beginTx() { void beginTx() {
if (!session) { if (!this.session) {
return; return;
} }
String dsl = "g.tx().open()"; final String dsl = "g.tx().open()";
client.submit(dsl).all().join(); this.client.submit(dsl).all().join();
} }
void doCommit() { void doCommit() {
if (!session) { if (!this.session) {
return; return;
} }
try { try {
String dsl = "g.tx().commit()"; final String dsl = "g.tx().commit()";
client.submit(dsl).all().join(); this.client.submit(dsl).all().join();
} catch (Exception e) { } catch (final Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
void doRollback() { void doRollback() {
if (!session) { if (!this.session) {
return; return;
} }
String dsl = "g.tx().rollback()"; final String dsl = "g.tx().rollback()";
client.submit(dsl).all().join(); this.client.submit(dsl).all().join();
} }
private void warmClient(int num) { private void warmClient(final int num) {
try { try {
beginTx(); beginTx();
runInternal("g.V('test')", null); runInternal("g.V('test')", null);
doCommit(); doCommit();
log.info("warm graphdb client over"); log.info("warm graphdb client over");
} catch (Exception e) { } catch (final Exception e) {
log.error("warmClient error"); log.error("warmClient error");
doRollback(); doRollback();
throw new RuntimeException(e); throw new RuntimeException(e);
...@@ -143,9 +143,9 @@ public abstract class AbstractGdbGraph implements GdbGraph { ...@@ -143,9 +143,9 @@ public abstract class AbstractGdbGraph implements GdbGraph {
@Override @Override
public void close() { public void close() {
if (client != null) { if (this.client != null) {
log.info("close graphdb client"); log.info("close graphdb client");
client.close(); this.client.close();
} }
} }
} }
...@@ -3,7 +3,8 @@ ...@@ -3,7 +3,8 @@
*/ */
package com.alibaba.datax.plugin.writer.gdbwriter.model; package com.alibaba.datax.plugin.writer.gdbwriter.model;
import lombok.Data; import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MapperConfig;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.ToString; import lombok.ToString;
...@@ -11,10 +12,33 @@ import lombok.ToString; ...@@ -11,10 +12,33 @@ import lombok.ToString;
* @author jerrywang * @author jerrywang
* *
*/ */
@Data
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true) @ToString(callSuper = true)
public class GdbEdge extends GdbElement { public class GdbEdge extends GdbElement {
private String from = null; private String from = null;
private String to = null; private String to = null;
public String getFrom() {
return this.from;
}
public void setFrom(final String from) {
final int maxIdLength = MapperConfig.getInstance().getMaxIdLength();
if (from.length() > maxIdLength) {
throw new IllegalArgumentException("from length over limit(" + maxIdLength + ")");
}
this.from = from;
}
public String getTo() {
return this.to;
}
public void setTo(final String to) {
final int maxIdLength = MapperConfig.getInstance().getMaxIdLength();
if (to.length() > maxIdLength) {
throw new IllegalArgumentException("to length over limit(" + maxIdLength + ")");
}
this.to = to;
}
} }
...@@ -3,18 +3,107 @@ ...@@ -3,18 +3,107 @@
*/ */
package com.alibaba.datax.plugin.writer.gdbwriter.model; package com.alibaba.datax.plugin.writer.gdbwriter.model;
import java.util.HashMap; import java.util.LinkedList;
import java.util.Map; import java.util.List;
import lombok.Data; import com.alibaba.datax.plugin.writer.gdbwriter.Key.PropertyType;
import com.alibaba.datax.plugin.writer.gdbwriter.mapping.MapperConfig;
/** /**
* @author jerrywang * @author jerrywang
* *
*/ */
@Data
public class GdbElement { public class GdbElement {
String id = null; private String id = null;
String label = null; private String label = null;
Map<String, Object> properties = new HashMap<>(); private List<GdbProperty> properties = new LinkedList<>();
public String getId() {
return this.id;
}
public void setId(final String id) {
final int maxIdLength = MapperConfig.getInstance().getMaxIdLength();
if (id.length() > maxIdLength) {
throw new IllegalArgumentException("id length over limit(" + maxIdLength + ")");
}
this.id = id;
}
public String getLabel() {
return this.label;
}
public void setLabel(final String label) {
final int maxLabelLength = MapperConfig.getInstance().getMaxLabelLength();
if (label.length() > maxLabelLength) {
throw new IllegalArgumentException("label length over limit(" + maxLabelLength + ")");
}
this.label = label;
}
public List<GdbProperty> getProperties() {
return this.properties;
}
public void addProperty(final String propKey, final Object propValue, final PropertyType card) {
if (propKey == null || propValue == null) {
return;
}
final int maxPropKeyLength = MapperConfig.getInstance().getMaxPropKeyLength();
if (propKey.length() > maxPropKeyLength) {
throw new IllegalArgumentException("property key length over limit(" + maxPropKeyLength + ")");
}
if (propValue instanceof String) {
final int maxPropValueLength = MapperConfig.getInstance().getMaxPropValueLength();
if (((String)propValue).length() > maxPropKeyLength) {
throw new IllegalArgumentException("property value length over limit(" + maxPropValueLength + ")");
}
}
this.properties.add(new GdbProperty(propKey, propValue, card));
}
public void addProperty(final String propKey, final Object propValue) {
addProperty(propKey, propValue, PropertyType.single);
}
@Override
public String toString() {
final StringBuffer sb = new StringBuffer(this.id + "[" + this.label + "]{");
this.properties.forEach(n -> {
sb.append(n.cardinality.name());
sb.append("[");
sb.append(n.key);
sb.append(" - ");
sb.append(String.valueOf(n.value));
sb.append("]");
});
return sb.toString();
}
public static class GdbProperty {
private String key;
private Object value;
private PropertyType cardinality;
private GdbProperty(final String key, final Object value, final PropertyType card) {
this.key = key;
this.value = value;
this.cardinality = card;
}
public PropertyType getCardinality() {
return this.cardinality;
}
public String getKey() {
return this.key;
}
public Object getValue() {
return this.value;
}
}
} }
...@@ -3,10 +3,11 @@ ...@@ -3,10 +3,11 @@
*/ */
package com.alibaba.datax.plugin.writer.gdbwriter.model; package com.alibaba.datax.plugin.writer.gdbwriter.model;
import java.util.List;
import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.element.Record;
import groovy.lang.Tuple2;
import java.util.List; import groovy.lang.Tuple2;
/** /**
* @author jerrywang * @author jerrywang
......
...@@ -7,52 +7,56 @@ import java.io.IOException; ...@@ -7,52 +7,56 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.gdbwriter.GdbWriterErrorCode; import com.alibaba.datax.plugin.writer.gdbwriter.GdbWriterErrorCode;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
/** /**
* @author jerrywang * @author jerrywang
* *
*/ */
public interface ConfigHelper { public interface ConfigHelper {
static void assertConfig(String key, Supplier<Boolean> f) { static void assertConfig(final String key, final Supplier<Boolean> f) {
if (!f.get()) { if (!f.get()) {
throw DataXException.asDataXException(GdbWriterErrorCode.BAD_CONFIG_VALUE, key); throw DataXException.asDataXException(GdbWriterErrorCode.BAD_CONFIG_VALUE, key);
} }
} }
static void assertHasContent(Configuration config, String key) { static void assertHasContent(final Configuration config, final String key) {
assertConfig(key, () -> StringUtils.isNotBlank(config.getString(key))); assertConfig(key, () -> StringUtils.isNotBlank(config.getString(key)));
} }
/** /**
* NOTE: {@code Configuration::get(String, Class<T>)} doesn't work. * NOTE: {@code Configuration::get(String, Class<T>)} doesn't work.
* *
* @param conf Configuration * @param conf
* @param key key path to configuration * Configuration
* @param cls Class of result type * @param key
* key path to configuration
* @param cls
* Class of result type
* @return the target configuration object of type T * @return the target configuration object of type T
*/ */
static <T> T getConfig(Configuration conf, String key, Class<T> cls) { static <T> T getConfig(final Configuration conf, final String key, final Class<T> cls) {
JSONObject j = (JSONObject) conf.get(key); final JSONObject j = (JSONObject)conf.get(key);
return JSON.toJavaObject(j, cls); return JSON.toJavaObject(j, cls);
} }
/** /**
* Create a configuration from the specified file on the classpath. * Create a configuration from the specified file on the classpath.
* *
* @param name file name * @param name
* file name
* @return Configuration instance. * @return Configuration instance.
*/ */
static Configuration fromClasspath(String name) { static Configuration fromClasspath(final String name) {
try (InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(name)) { try (final InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(name)) {
return Configuration.from(is); return Configuration.from(is);
} catch (IOException e) { } catch (final IOException e) {
throw new IllegalArgumentException("File not found: " + name); throw new IllegalArgumentException("File not found: " + name);
} }
} }
......
/* /*
* (C) 2019-present Alibaba Group Holding Limited. * (C) 2019-present Alibaba Group Holding Limited.
* *
* This program is free software; you can redistribute it and/or modify * This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public
* it under the terms of the GNU General Public License version 2 as * License version 2 as published by the Free Software Foundation.
* published by the Free Software Foundation.
*/ */
package com.alibaba.datax.plugin.writer.gdbwriter.util; package com.alibaba.datax.plugin.writer.gdbwriter.util;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment