Unverified Commit 3c21cc28 authored by Chase's avatar Chase Committed by GitHub

Merge pull request #1 from alibaba/master

rebase
parents 92328083 4b99fab8
/target/ # Created by .ignore support plugin (hsz.mobi)
.classpath
.project
.settings
.DS_Store .DS_Store
/logs/ .AppleDouble
.idea/ .LSOverride
Icon
._*
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk
*.class
*.log
*.ctxt
.mtj.tmp/
*.jar
*.war
*.nar
*.ear
*.zip
*.tar.gz
*.rar
hs_err_pid*
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/dictionaries
.idea/**/shelf
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
.idea/**/dbnavigator.xml
.idea/**/gradle.xml
.idea/**/libraries
cmake-build-debug/
cmake-build-release/
.idea/**/mongoSettings.xml
*.iws
out/
.idea_modules/
atlassian-ide-plugin.xml
.idea/replstate.xml
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
.idea/httpRequests
target/
pom.xml.tag
pom.xml.releaseBackup
pom.xml.versionsBackup
pom.xml.next
release.properties
dependency-reduced-pom.xml
buildNumber.properties
.mvn/timing.properties
!/.mvn/wrapper/maven-wrapper.jar
.idea
*.iml *.iml
out
gen### Python template
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
*.manifest
*.spec
pip-log.txt
pip-delete-this-directory.txt
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
*.mo
*.pot
*.log
local_settings.py
db.sqlite3
instance/
.webassets-cache
.scrapy
docs/_build/
target/
.ipynb_checkpoints
.python-version
celerybeat-schedule
*.sage.py
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
.spyderproject
.spyproject
.ropeproject
/site
.mypy_cache/
.metadata
bin/
tmp/
*.tmp
*.bak
*.swp
*~.nib
local.properties
.settings/
.loadpath
.recommenders
.externalToolBuilders/
*.launch
*.pydevproject
.cproject
.autotools
.factorypath
.buildpath
.target
.tern-project
.texlipse
.springBeans
.recommenders/
.cache-main
.scala_dependencies
.worksheet
...@@ -48,12 +48,15 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N ...@@ -48,12 +48,15 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N
| | Hbase0.94 | √ | √ |[](https://github.com/alibaba/DataX/blob/master/hbase094xreader/doc/hbase094xreader.md)[](https://github.com/alibaba/DataX/blob/master/hbase094xwriter/doc/hbase094xwriter.md)| | | Hbase0.94 | √ | √ |[](https://github.com/alibaba/DataX/blob/master/hbase094xreader/doc/hbase094xreader.md)[](https://github.com/alibaba/DataX/blob/master/hbase094xwriter/doc/hbase094xwriter.md)|
| | Hbase1.1 | √ | √ |[](https://github.com/alibaba/DataX/blob/master/hbase11xreader/doc/hbase11xreader.md)[](https://github.com/alibaba/DataX/blob/master/hbase11xwriter/doc/hbase11xwriter.md)| | | Hbase1.1 | √ | √ |[](https://github.com/alibaba/DataX/blob/master/hbase11xreader/doc/hbase11xreader.md)[](https://github.com/alibaba/DataX/blob/master/hbase11xwriter/doc/hbase11xwriter.md)|
| | Phoenix4.x | √ | √ |[](https://github.com/alibaba/DataX/blob/master/hbase11xsqlreader/doc/hbase11xsqlreader.md)[](https://github.com/alibaba/DataX/blob/master/hbase11xsqlwriter/doc/hbase11xsqlwriter.md)| | | Phoenix4.x | √ | √ |[](https://github.com/alibaba/DataX/blob/master/hbase11xsqlreader/doc/hbase11xsqlreader.md)[](https://github.com/alibaba/DataX/blob/master/hbase11xsqlwriter/doc/hbase11xsqlwriter.md)|
| | Phoenix5.x | √ | √ |[](https://github.com/alibaba/DataX/blob/master/hbase20xsqlreader/doc/hbase20xsqlreader.md)[](https://github.com/alibaba/DataX/blob/master/hbase20xsqlwriter/doc/hbase20xsqlwriter.md)|
| | MongoDB | √ | √ |[](https://github.com/alibaba/DataX/blob/master/mongoreader/doc/mongoreader.md)[](https://github.com/alibaba/DataX/blob/master/mongowriter/doc/mongowriter.md)| | | MongoDB | √ | √ |[](https://github.com/alibaba/DataX/blob/master/mongoreader/doc/mongoreader.md)[](https://github.com/alibaba/DataX/blob/master/mongowriter/doc/mongowriter.md)|
| | Hive | √ | √ |[](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md)[](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md)| | | Hive | √ | √ |[](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md)[](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md)|
| 无结构化数据存储 | TxtFile | √ | √ |[](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md)[](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md)| | 无结构化数据存储 | TxtFile | √ | √ |[](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md)[](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md)|
| | FTP | √ | √ |[](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md)[](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md)| | | FTP | √ | √ |[](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md)[](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md)|
| | HDFS | √ | √ |[](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md)[](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md)| | | HDFS | √ | √ |[](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md)[](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md)|
| | Elasticsearch | | √ |[](https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md)| | | Elasticsearch | | √ |[](https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md)|
| 时间序列数据库 | OpenTSDB | √ | |[](https://github.com/alibaba/DataX/blob/master/opentsdbreader/doc/opentsdbreader.md)|
| | TSDB | | √ |[](https://github.com/alibaba/DataX/blob/master/tsdbwriter/doc/tsdbhttpwriter.md)|
# 我要开发新的插件 # 我要开发新的插件
请点击:[DataX插件开发宝典](https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md) 请点击:[DataX插件开发宝典](https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md)
...@@ -105,7 +108,7 @@ This software is free to use under the Apache License [Apache license](https://g ...@@ -105,7 +108,7 @@ This software is free to use under the Apache License [Apache license](https://g
9. 有大数据产品、云产品、中间件技术解决方案者优先考虑。 9. 有大数据产品、云产品、中间件技术解决方案者优先考虑。
```` ````
钉钉用户请扫描以下二维码进行讨论: 钉钉用户请扫描以下二维码进行讨论:
![DataX-OpenSource-Dingding](https://img.alicdn.com/tfs/TB1SdPUH21TBuNjy0FjXXajyXXa-359-504.png) ![DataX-OpenSource-Dingding](https://img.alicdn.com/tfs/TB1ZQuhIG6qK1RjSZFmXXX0PFXa-362-501.png)
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
<phoenix.version>4.11.0-HBase-1.1</phoenix.version> <phoenix.version>4.11.0-HBase-1.1</phoenix.version>
<hadoop.version>2.7.1</hadoop.version> <hadoop.version>2.7.1</hadoop.version>
<commons-codec.version>1.8</commons-codec.version> <commons-codec.version>1.8</commons-codec.version>
<protobuf.version>3.2.0</protobuf.version>
<httpclient.version>4.4.1</httpclient.version>
</properties> </properties>
<dependencies> <dependencies>
...@@ -47,6 +49,11 @@ ...@@ -47,6 +49,11 @@
<artifactId>phoenix-core</artifactId> <artifactId>phoenix-core</artifactId>
<version>${phoenix.version}</version> <version>${phoenix.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-queryserver-client</artifactId>
<version>${phoenix.version}</version>
</dependency>
<dependency> <dependency>
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
...@@ -58,6 +65,21 @@ ...@@ -58,6 +65,21 @@
<version>${commons-codec.version}</version> <version>${commons-codec.version}</version>
</dependency> </dependency>
<!-- httpclient begin -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>
<!-- httpclient end -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<!-- for test --> <!-- for test -->
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
......
...@@ -8,6 +8,7 @@ public final class Constant { ...@@ -8,6 +8,7 @@ public final class Constant {
public static final boolean DEFAULT_LAST_COLUMN_IS_VERSION = false; // 默认最后一列不是version列 public static final boolean DEFAULT_LAST_COLUMN_IS_VERSION = false; // 默认最后一列不是version列
public static final int DEFAULT_BATCH_ROW_COUNT = 256; // 默认一次写256行 public static final int DEFAULT_BATCH_ROW_COUNT = 256; // 默认一次写256行
public static final boolean DEFAULT_TRUNCATE = false; // 默认开始的时候不清空表 public static final boolean DEFAULT_TRUNCATE = false; // 默认开始的时候不清空表
public static final boolean DEFAULT_USE_THIN_CLIENT = false; // 默认不用thin客户端
public static final int TYPE_UNSIGNED_TINYINT = 11; public static final int TYPE_UNSIGNED_TINYINT = 11;
public static final int TYPE_UNSIGNED_SMALLINT = 13; public static final int TYPE_UNSIGNED_SMALLINT = 13;
......
...@@ -11,6 +11,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection; ...@@ -11,6 +11,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -18,7 +19,11 @@ import org.slf4j.LoggerFactory; ...@@ -18,7 +19,11 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -28,6 +33,8 @@ import java.util.Map; ...@@ -28,6 +33,8 @@ import java.util.Map;
public class HbaseSQLHelper { public class HbaseSQLHelper {
private static final Logger LOG = LoggerFactory.getLogger(HbaseSQLHelper.class); private static final Logger LOG = LoggerFactory.getLogger(HbaseSQLHelper.class);
public static ThinClientPTable ptable;
/** /**
* 将datax的配置解析成sql writer的配置 * 将datax的配置解析成sql writer的配置
*/ */
...@@ -53,6 +60,11 @@ public class HbaseSQLHelper { ...@@ -53,6 +60,11 @@ public class HbaseSQLHelper {
return new Pair<String, String>(zkQuorum, znode); return new Pair<String, String>(zkQuorum, znode);
} }
public static Map<String, String> getThinConnectConfig(String hbaseCfgString) {
assert hbaseCfgString != null;
return JSON.parseObject(hbaseCfgString, new TypeReference<Map<String, String>>() {});
}
/** /**
* 校验配置 * 校验配置
*/ */
...@@ -61,12 +73,12 @@ public class HbaseSQLHelper { ...@@ -61,12 +73,12 @@ public class HbaseSQLHelper {
Connection conn = getJdbcConnection(cfg); Connection conn = getJdbcConnection(cfg);
// 检查表:存在,可用 // 检查表:存在,可用
checkTable(conn, cfg.getTableName()); checkTable(conn, cfg.getNamespace(), cfg.getTableName(), cfg.isThinClient());
// 校验元数据:配置中给出的列必须是目的表中已经存在的列 // 校验元数据:配置中给出的列必须是目的表中已经存在的列
PTable schema = null; PTable schema = null;
try { try {
schema = getTableSchema(conn, cfg.getTableName()); schema = getTableSchema(conn, cfg.getNamespace(), cfg.getTableName(), cfg.isThinClient());
} catch (SQLException e) { } catch (SQLException e) {
throw DataXException.asDataXException(HbaseSQLWriterErrorCode.GET_HBASE_CONNECTION_ERROR, throw DataXException.asDataXException(HbaseSQLWriterErrorCode.GET_HBASE_CONNECTION_ERROR,
"无法获取目的表" + cfg.getTableName() + "的元数据信息,表可能不是SQL表或表名配置错误,请检查您的配置 或者 联系 HBase 管理员.", e); "无法获取目的表" + cfg.getTableName() + "的元数据信息,表可能不是SQL表或表名配置错误,请检查您的配置 或者 联系 HBase 管理员.", e);
...@@ -97,7 +109,11 @@ public class HbaseSQLHelper { ...@@ -97,7 +109,11 @@ public class HbaseSQLHelper {
Connection conn; Connection conn;
try { try {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
conn = DriverManager.getConnection(connStr); if (cfg.isThinClient()) {
conn = getThinClientJdbcConnection(cfg);
} else {
conn = DriverManager.getConnection(connStr);
}
conn.setAutoCommit(false); conn.setAutoCommit(false);
} catch (Throwable e) { } catch (Throwable e) {
throw DataXException.asDataXException(HbaseSQLWriterErrorCode.GET_HBASE_CONNECTION_ERROR, throw DataXException.asDataXException(HbaseSQLWriterErrorCode.GET_HBASE_CONNECTION_ERROR,
...@@ -107,6 +123,32 @@ public class HbaseSQLHelper { ...@@ -107,6 +123,32 @@ public class HbaseSQLHelper {
return conn; return conn;
} }
/**
* 创建 thin client jdbc连接
* @param cfg
* @return
* @throws SQLException
*/
public static Connection getThinClientJdbcConnection(HbaseSQLWriterConfig cfg) throws SQLException {
String connStr = cfg.getConnectionString();
LOG.info("Connecting to HBase cluster [" + connStr + "] use thin client ...");
Connection conn = DriverManager.getConnection(connStr, cfg.getUsername(), cfg.getPassword());
String userNamespaceQuery = "use " + cfg.getNamespace();
Statement statement = null;
try {
statement = conn.createStatement();
statement.executeUpdate(userNamespaceQuery);
return conn;
} catch (Exception e) {
throw DataXException.asDataXException(HbaseSQLWriterErrorCode.GET_HBASE_CONNECTION_ERROR,
"无法连接配置的namespace, 请检查配置 或者 联系 HBase 管理员.", e);
} finally {
if (statement != null) {
statement.close();
}
}
}
/** /**
* 获取一张表的元数据信息 * 获取一张表的元数据信息
* @param conn hbsae sql的jdbc连接 * @param conn hbsae sql的jdbc连接
...@@ -121,6 +163,70 @@ public class HbaseSQLHelper { ...@@ -121,6 +163,70 @@ public class HbaseSQLHelper {
return mdc.updateCache(schemaName, tableName).getTable(); return mdc.updateCache(schemaName, tableName).getTable();
} }
/**
* 获取一张表的元数据信息
* @param conn
* @param namespace
* @param fullTableName
* @param isThinClient 是否使用thin client
* @return 表的元数据
* @throws SQLException
*/
public static PTable getTableSchema(Connection conn, String namespace, String fullTableName, boolean isThinClient)
throws
SQLException {
LOG.info("Start to get table schema of namespace=" + namespace + " , fullTableName=" + fullTableName);
if (!isThinClient) {
return getTableSchema(conn, fullTableName);
} else {
if (ptable == null) {
ResultSet result = conn.getMetaData().getColumns(null, namespace, fullTableName, null);
try {
ThinClientPTable retTable = new ThinClientPTable();
retTable.setColTypeMap(parseColType(result));
ptable = retTable;
}finally {
if (result != null) {
result.close();
}
}
}
return ptable;
}
}
/**
* 解析字段
* @param rs
* @return
* @throws SQLException
*/
public static Map<String, ThinClientPTable.ThinClientPColumn> parseColType(ResultSet rs) throws SQLException {
Map<String, ThinClientPTable.ThinClientPColumn> cols = new HashMap<String, ThinClientPTable
.ThinClientPColumn>();
ResultSetMetaData md = rs.getMetaData();
int columnCount = md.getColumnCount();
while (rs.next()) {
String colName = null;
PDataType colType = null;
for (int i = 1; i <= columnCount; i++) {
if (md.getColumnLabel(i).equals("TYPE_NAME")) {
colType = PDataType.fromSqlTypeName((String) rs.getObject(i));
} else if (md.getColumnLabel(i).equals("COLUMN_NAME")) {
colName = (String) rs.getObject(i);
}
}
if (colType == null || colName == null) {
throw new SQLException("ColType or colName is null, colType : " + colType + " , colName : " + colName);
}
cols.put(colName, new ThinClientPTable.ThinClientPColumn(colName, colType));
}
return cols;
}
/** /**
* 清空表 * 清空表
*/ */
...@@ -148,6 +254,24 @@ public class HbaseSQLHelper { ...@@ -148,6 +254,24 @@ public class HbaseSQLHelper {
} }
} }
/**
* 检查表
* @param conn
* @param namespace
* @param tableName
* @param isThinClient
* @throws DataXException
*/
public static void checkTable(Connection conn, String namespace, String tableName, boolean isThinClient)
throws DataXException {
if (!isThinClient) {
checkTable(conn, tableName);
} else {
//ignore check table when use thin client
}
}
/** /**
* 检查表:表要存在,enabled * 检查表:表要存在,enabled
*/ */
......
...@@ -2,6 +2,7 @@ package com.alibaba.datax.plugin.writer.hbase11xsqlwriter; ...@@ -2,6 +2,7 @@ package com.alibaba.datax.plugin.writer.hbase11xsqlwriter;
import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.google.common.base.Strings;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
...@@ -9,6 +10,7 @@ import org.slf4j.Logger; ...@@ -9,6 +10,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* HBase SQL writer config * HBase SQL writer config
...@@ -30,6 +32,10 @@ public class HbaseSQLWriterConfig { ...@@ -30,6 +32,10 @@ public class HbaseSQLWriterConfig {
private NullModeType nullMode; private NullModeType nullMode;
private int batchSize; // 一次批量写入多少行 private int batchSize; // 一次批量写入多少行
private boolean truncate; // 导入开始前是否要清空目的表 private boolean truncate; // 导入开始前是否要清空目的表
private boolean isThinClient;
private String namespace;
private String username;
private String password;
/** /**
* @return 获取原始的datax配置 * @return 获取原始的datax配置
...@@ -81,6 +87,22 @@ public class HbaseSQLWriterConfig { ...@@ -81,6 +87,22 @@ public class HbaseSQLWriterConfig {
return truncate; return truncate;
} }
public boolean isThinClient() {
return isThinClient;
}
public String getNamespace() {
return namespace;
}
public String getPassword() {
return password;
}
public String getUsername() {
return username;
}
/** /**
* @param dataxCfg * @param dataxCfg
* @return * @return
...@@ -100,6 +122,7 @@ public class HbaseSQLWriterConfig { ...@@ -100,6 +122,7 @@ public class HbaseSQLWriterConfig {
cfg.nullMode = NullModeType.getByTypeName(dataxCfg.getString(Key.NULL_MODE, Constant.DEFAULT_NULL_MODE)); cfg.nullMode = NullModeType.getByTypeName(dataxCfg.getString(Key.NULL_MODE, Constant.DEFAULT_NULL_MODE));
cfg.batchSize = dataxCfg.getInt(Key.BATCH_SIZE, Constant.DEFAULT_BATCH_ROW_COUNT); cfg.batchSize = dataxCfg.getInt(Key.BATCH_SIZE, Constant.DEFAULT_BATCH_ROW_COUNT);
cfg.truncate = dataxCfg.getBool(Key.TRUNCATE, Constant.DEFAULT_TRUNCATE); cfg.truncate = dataxCfg.getBool(Key.TRUNCATE, Constant.DEFAULT_TRUNCATE);
cfg.isThinClient = dataxCfg.getBool(Key.THIN_CLIENT, Constant.DEFAULT_USE_THIN_CLIENT);
// 4. 打印解析出来的配置 // 4. 打印解析出来的配置
LOG.info("HBase SQL writer config parsed:" + cfg.toString()); LOG.info("HBase SQL writer config parsed:" + cfg.toString());
...@@ -117,31 +140,52 @@ public class HbaseSQLWriterConfig { ...@@ -117,31 +140,52 @@ public class HbaseSQLWriterConfig {
"读 Hbase 时需要配置hbaseConfig,其内容为 Hbase 连接信息,请联系 Hbase PE 获取该信息."); "读 Hbase 时需要配置hbaseConfig,其内容为 Hbase 连接信息,请联系 Hbase PE 获取该信息.");
} }
// 解析zk服务器和znode信息
Pair<String, String> zkCfg; if (dataxCfg.getBool(Key.THIN_CLIENT, Constant.DEFAULT_USE_THIN_CLIENT)) {
try { Map<String, String> thinConnectConfig = HbaseSQLHelper.getThinConnectConfig(hbaseCfg);
zkCfg = HbaseSQLHelper.getHbaseConfig(hbaseCfg); String thinConnectStr = thinConnectConfig.get(Key.HBASE_THIN_CONNECT_URL);
} catch (Throwable t) { cfg.namespace = thinConnectConfig.get(Key.HBASE_THIN_CONNECT_NAMESPACE);
// 解析hbase配置错误 cfg.username = thinConnectConfig.get(Key.HBASE_THIN_CONNECT_USERNAME);
throw DataXException.asDataXException( cfg.password = thinConnectConfig.get(Key.HBASE_THIN_CONNECT_PASSWORD);
if (Strings.isNullOrEmpty(thinConnectStr)) {
throw DataXException.asDataXException(
HbaseSQLWriterErrorCode.ILLEGAL_VALUE,
"thinClient=true的轻客户端模式下HBase的hbase.thin.connect.url配置不能为空,请联系HBase PE获取该信息.");
}
if (Strings.isNullOrEmpty(cfg.namespace) || Strings.isNullOrEmpty(cfg.username) || Strings
.isNullOrEmpty(cfg.password)) {
throw DataXException.asDataXException(HbaseSQLWriterErrorCode.ILLEGAL_VALUE,
"thinClient=true的轻客户端模式下HBase的hbase.thin.connect.namespce|username|password配置不能为空,请联系HBase "
+ "PE获取该信息.");
}
cfg.connectionString = thinConnectStr;
} else {
// 解析zk服务器和znode信息
Pair<String, String> zkCfg;
try {
zkCfg = HbaseSQLHelper.getHbaseConfig(hbaseCfg);
} catch (Throwable t) {
// 解析hbase配置错误
throw DataXException.asDataXException(
HbaseSQLWriterErrorCode.REQUIRED_VALUE, HbaseSQLWriterErrorCode.REQUIRED_VALUE,
"解析hbaseConfig出错,请确认您配置的hbaseConfig为合法的json数据格式,内容正确."); "解析hbaseConfig出错,请确认您配置的hbaseConfig为合法的json数据格式,内容正确.");
} }
String zkQuorum = zkCfg.getFirst(); String zkQuorum = zkCfg.getFirst();
String znode = zkCfg.getSecond(); String znode = zkCfg.getSecond();
if (zkQuorum == null || zkQuorum.isEmpty()) { if (zkQuorum == null || zkQuorum.isEmpty()) {
throw DataXException.asDataXException( throw DataXException.asDataXException(
HbaseSQLWriterErrorCode.ILLEGAL_VALUE, HbaseSQLWriterErrorCode.ILLEGAL_VALUE,
"HBase的hbase.zookeeper.quorum配置不能为空,请联系HBase PE获取该信息."); "HBase的hbase.zookeeper.quorum配置不能为空,请联系HBase PE获取该信息.");
} }
if (znode == null || znode.isEmpty()) { if (znode == null || znode.isEmpty()) {
throw DataXException.asDataXException( throw DataXException.asDataXException(
HbaseSQLWriterErrorCode.ILLEGAL_VALUE, HbaseSQLWriterErrorCode.ILLEGAL_VALUE,
"HBase的zookeeper.znode.parent配置不能为空,请联系HBase PE获取该信息."); "HBase的zookeeper.znode.parent配置不能为空,请联系HBase PE获取该信息.");
} }
// 生成sql使用的连接字符串, 格式: jdbc:phoenix:zk_quorum:2181:/znode_parent // 生成sql使用的连接字符串, 格式: jdbc:phoenix:zk_quorum:2181:/znode_parent
cfg.connectionString = "jdbc:phoenix:" + zkQuorum + ":2181:" + znode; cfg.connectionString = "jdbc:phoenix:" + zkQuorum + ":2181:" + znode;
}
} }
private static void parseTableConfig(HbaseSQLWriterConfig cfg, Configuration dataxCfg) { private static void parseTableConfig(HbaseSQLWriterConfig cfg, Configuration dataxCfg) {
......
...@@ -157,12 +157,20 @@ public class HbaseSQLWriterTask { ...@@ -157,12 +157,20 @@ public class HbaseSQLWriterTask {
private PreparedStatement createPreparedStatement() throws SQLException { private PreparedStatement createPreparedStatement() throws SQLException {
// 生成列名集合,列之间用逗号分隔: col1,col2,col3,... // 生成列名集合,列之间用逗号分隔: col1,col2,col3,...
StringBuilder columnNamesBuilder = new StringBuilder(); StringBuilder columnNamesBuilder = new StringBuilder();
for (String col : cfg.getColumns()) { if (cfg.isThinClient()) {
// 列名使用双引号,则不自动转换为全大写,而是保留用户配置的大小写 for (String col : cfg.getColumns()) {
columnNamesBuilder.append("\""); // thin 客户端不使用双引号
columnNamesBuilder.append(col); columnNamesBuilder.append(col);
columnNamesBuilder.append("\""); columnNamesBuilder.append(",");
columnNamesBuilder.append(","); }
} else {
for (String col : cfg.getColumns()) {
// 列名使用双引号,则不自动转换为全大写,而是保留用户配置的大小写
columnNamesBuilder.append("\"");
columnNamesBuilder.append(col);
columnNamesBuilder.append("\"");
columnNamesBuilder.append(",");
}
} }
columnNamesBuilder.setLength(columnNamesBuilder.length() - 1); // 移除末尾多余的逗号 columnNamesBuilder.setLength(columnNamesBuilder.length() - 1); // 移除末尾多余的逗号
String columnNames = columnNamesBuilder.toString(); String columnNames = columnNamesBuilder.toString();
...@@ -171,9 +179,13 @@ public class HbaseSQLWriterTask { ...@@ -171,9 +179,13 @@ public class HbaseSQLWriterTask {
// 生成UPSERT模板 // 生成UPSERT模板
String tableName = cfg.getTableName(); String tableName = cfg.getTableName();
// 表名使用双引号,则不自动转换为全大写,而是保留用户配置的大小写 StringBuilder upsertBuilder = null;
StringBuilder upsertBuilder = if (cfg.isThinClient()) {
new StringBuilder("upsert into \"" + tableName + "\" (" + columnNames + " ) values ("); upsertBuilder = new StringBuilder("upsert into " + tableName + " (" + columnNames + " ) values (");
} else {
// 表名使用双引号,则不自动转换为全大写,而是保留用户配置的大小写
upsertBuilder = new StringBuilder("upsert into \"" + tableName + "\" (" + columnNames + " ) values (");
}
for (int i = 0; i < cfg.getColumns().size(); i++) { for (int i = 0; i < cfg.getColumns().size(); i++) {
upsertBuilder.append("?,"); upsertBuilder.append("?,");
} }
...@@ -191,7 +203,8 @@ public class HbaseSQLWriterTask { ...@@ -191,7 +203,8 @@ public class HbaseSQLWriterTask {
*/ */
private int[] getColumnSqlType(List<String> columnNames) throws SQLException { private int[] getColumnSqlType(List<String> columnNames) throws SQLException {
int[] types = new int[numberOfColumnsToWrite]; int[] types = new int[numberOfColumnsToWrite];
PTable ptable = HbaseSQLHelper.getTableSchema(connection, cfg.getTableName()); PTable ptable = HbaseSQLHelper
.getTableSchema(connection, cfg.getNamespace(), cfg.getTableName(), cfg.isThinClient());
for (int i = 0; i < columnNames.size(); i++) { for (int i = 0; i < columnNames.size(); i++) {
String name = columnNames.get(i); String name = columnNames.get(i);
......
...@@ -10,6 +10,10 @@ public final class Key { ...@@ -10,6 +10,10 @@ public final class Key {
public final static String HBASE_CONFIG = "hbaseConfig"; public final static String HBASE_CONFIG = "hbaseConfig";
public final static String HBASE_ZK_QUORUM = HConstants.ZOOKEEPER_QUORUM; public final static String HBASE_ZK_QUORUM = HConstants.ZOOKEEPER_QUORUM;
public final static String HBASE_ZNODE_PARENT = HConstants.ZOOKEEPER_ZNODE_PARENT; public final static String HBASE_ZNODE_PARENT = HConstants.ZOOKEEPER_ZNODE_PARENT;
public final static String HBASE_THIN_CONNECT_URL = "hbase.thin.connect.url";
public final static String HBASE_THIN_CONNECT_NAMESPACE = "hbase.thin.connect.namespace";
public final static String HBASE_THIN_CONNECT_USERNAME = "hbase.thin.connect.username";
public final static String HBASE_THIN_CONNECT_PASSWORD = "hbase.thin.connect.password";
/** /**
* 【必选】writer要写入的表的表名 * 【必选】writer要写入的表的表名
...@@ -34,6 +38,9 @@ public final class Key { ...@@ -34,6 +38,9 @@ public final class Key {
*/ */
public static final String TRUNCATE = "truncate"; public static final String TRUNCATE = "truncate";
public static final String THIN_CLIENT = "thinClient";
/** /**
* 【可选】批量写入的最大行数,默认100行 * 【可选】批量写入的最大行数,默认100行
*/ */
......
# hbase20xsqlreader 插件文档
___
## 1 快速介绍
hbase20xsqlreader插件实现了从Phoenix(HBase SQL)读取数据,对应版本为HBase2.X和Phoenix5.X。
## 2 实现原理
简而言之,hbase20xsqlreader通过Phoenix轻客户端去连接Phoenix QueryServer,并根据用户配置信息生成查询SELECT 语句,然后发送到QueryServer读取HBase数据,并将返回结果使用DataX自定义的数据类型拼装为抽象的数据集,最终传递给下游Writer处理。
## 3 功能说明
### 3.1 配置样例
* 配置一个从Phoenix同步抽取数据到本地的作业:
```
{
"job": {
"content": [
{
"reader": {
"name": "hbase20xsqlreader", //指定插件为hbase20xsqlreader
"parameter": {
"queryServerAddress": "http://127.0.0.1:8765", //填写连接Phoenix QueryServer地址
"serialization": "PROTOBUF", //QueryServer序列化格式
"table": "TEST", //读取表名
"column": ["ID", "NAME"], //所要读取列名
"splitKey": "ID" //切分列,必须是表主键
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}
```
### 3.2 参数说明
* **queryServerAddress**
* 描述:hbase20xsqlreader需要通过Phoenix轻客户端去连接Phoenix QueryServer,因此这里需要填写对应QueryServer地址。
* 必选:是 <br />
* 默认值:无 <br />
* **serialization**
* 描述:QueryServer使用的序列化协议
* 必选:否 <br />
* 默认值:PROTOBUF <br />
* **table**
* 描述:所要读取表名
* 必选:是 <br />
* 默认值:无 <br />
* **schema**
* 描述:表所在的schema
* 必选:否 <br />
* 默认值:无 <br />
* **column**
* 描述:填写需要从phoenix表中读取的列名集合,使用JSON的数组描述字段信息,空值表示读取所有列。
* 必选: 否<br />
* 默认值:全部列 <br />
* **splitKey**
* 描述:读取表时对表进行切分并行读取,切分时有两种方式:1.根据该列的最大最小值按照指定channel个数均分,这种方式仅支持整形和字符串类型切分列;2.根据设置的splitPoint进行切分
* 必选:是 <br />
* 默认值:无 <br />
* **splitPoints**
* 描述:由于根据切分列最大最小值切分时不能保证避免数据热点,splitKey支持用户根据数据特征动态指定切分点,对表数据进行切分。建议切分点根据Region的startkey和endkey设置,保证每个查询对应单个Region
* 必选: 否<br />
* 默认值:无 <br />
* **where**
* 描述:支持对表查询增加过滤条件,每个切分都会携带该过滤条件。
* 必选: 否<br />
* 默认值:无<br />
* **querySql**
* 描述:支持指定多个查询语句,但查询列类型和数目必须保持一致,用户可根据实际情况手动输入表查询语句或多表联合查询语句,设置该参数后,除queryserverAddress参数必须设置外,其余参数将失去作用或可不设置。
* 必选: 否<br />
* 默认值:无<br />
### 3.3 类型转换
目前hbase20xsqlreader支持大部分Phoenix类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。
下面列出MysqlReader针对Mysql类型转换列表:
| DataX 内部类型| Phoenix 数据类型 |
| -------- | ----- |
| String |CHAR, VARCHAR|
| Bytes |BINARY, VARBINARY|
| Bool |BOOLEAN |
| Long |INTEGER, TINYINT, SMALLINT, BIGINT |
| Double |FLOAT, DECIMAL, DOUBLE, |
| Date |DATE, TIME, TIMESTAMP |
## 4 性能报告
## 5 约束限制
* 切分表时切分列仅支持单个列,且该列必须是表主键
* 不设置splitPoint默认使用自动切分,此时切分列仅支持整形和字符型
* 表名和SCHEMA名及列名大小写敏感,请与Phoenix表实际大小写保持一致
* 仅支持通过Phoenix QeuryServer读取数据,因此您的Phoenix必须启动QueryServer服务才能使用本插件
## 6 FAQ
***
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hbase20xsqlreader</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<phoenix.version>5.1.0-HBase-2.0.0.2</phoenix.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.aliyun.phoenix</groupId>
<artifactId>ali-phoenix-shaded-thin-client</artifactId>
<version>${phoenix.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.0.44-beta</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-service-face</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
</includes>
</resource>
</resources>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/reader/hbase20xsqlreader</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>hbase20xsqlreader-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/reader/hbase20xsqlreader</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/reader/hbase20xsqlreader/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>
package com.alibaba.datax.plugin.reader.hbase20xsqlreader;
public class Constant {
public static final String PK_TYPE = "pkType";
public static final Object PK_TYPE_STRING = "pkTypeString";
public static final Object PK_TYPE_LONG = "pkTypeLong";
public static final String DEFAULT_SERIALIZATION = "PROTOBUF";
public static final String CONNECT_STRING_TEMPLATE = "jdbc:phoenix:thin:url=%s;serialization=%s";
public static final String CONNECT_DRIVER_STRING = "org.apache.phoenix.queryserver.client.Driver";
public static final String SELECT_COLUMNS_TEMPLATE = "SELECT COLUMN_NAME, COLUMN_FAMILY FROM SYSTEM.CATALOG WHERE TABLE_NAME='%s' AND COLUMN_NAME IS NOT NULL";
public static String QUERY_SQL_TEMPLATE_WITHOUT_WHERE = "select %s from %s ";
public static String QUERY_SQL_TEMPLATE = "select %s from %s where (%s)";
public static String QUERY_MIN_MAX_TEMPLATE = "SELECT MIN(%s),MAX(%s) FROM %s";
public static String QUERY_COLUMN_TYPE_TEMPLATE = "SELECT %s FROM %s LIMIT 1";
public static String QUERY_SQL_PER_SPLIT = "querySqlPerSplit";
}
package com.alibaba.datax.plugin.reader.hbase20xsqlreader;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import java.util.List;
public class HBase20xSQLReader extends Reader {
public static class Job extends Reader.Job {
private Configuration originalConfig;
private HBase20SQLReaderHelper readerHelper;
@Override
public void init() {
this.originalConfig = this.getPluginJobConf();
this.readerHelper = new HBase20SQLReaderHelper(this.originalConfig);
readerHelper.validateParameter();
}
@Override
public List<Configuration> split(int adviceNumber) {
return readerHelper.doSplit(adviceNumber);
}
@Override
public void destroy() {
// do nothing
}
}
public static class Task extends Reader.Task {
private Configuration readerConfig;
private HBase20xSQLReaderTask hbase20xSQLReaderTask;
@Override
public void init() {
this.readerConfig = super.getPluginJobConf();
hbase20xSQLReaderTask = new HBase20xSQLReaderTask(readerConfig, super.getTaskGroupId(), super.getTaskId());
}
@Override
public void startRead(RecordSender recordSender) {
hbase20xSQLReaderTask.readRecord(recordSender);
}
@Override
public void destroy() {
// do nothing
}
}
}
package com.alibaba.datax.plugin.reader.hbase20xsqlreader;
import com.alibaba.datax.common.spi.ErrorCode;
public enum HBase20xSQLReaderErrorCode implements ErrorCode {
REQUIRED_VALUE("Hbasewriter-00", "您缺失了必须填写的参数值."),
ILLEGAL_VALUE("Hbasewriter-01", "您填写的参数值不合法."),
GET_QUERYSERVER_CONNECTION_ERROR("Hbasewriter-02", "获取QueryServer连接时出错."),
GET_PHOENIX_TABLE_ERROR("Hbasewriter-03", "获取 Phoenix table时出错."),
GET_TABLE_COLUMNTYPE_ERROR("Hbasewriter-05", "获取表列类型时出错."),
CLOSE_PHOENIX_CONNECTION_ERROR("Hbasewriter-06", "关闭JDBC连接时时出错."),
ILLEGAL_SPLIT_PK("Hbasewriter-07", "非法splitKey配置."),
PHOENIX_COLUMN_TYPE_CONVERT_ERROR("Hbasewriter-08", "phoenix的列类型转换错误."),
QUERY_DATA_ERROR("Hbasewriter-09", "truncate hbase表时发生异常."),
;
private final String code;
private final String description;
private HBase20xSQLReaderErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s].", this.code, this.description);
}
}
package com.alibaba.datax.plugin.reader.hbase20xsqlreader;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.statistics.PerfRecord;
import com.alibaba.datax.common.util.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.sql.*;
public class HBase20xSQLReaderTask {
private static final Logger LOG = LoggerFactory.getLogger(HBase20xSQLReaderTask.class);
private Configuration readerConfig;
private int taskGroupId = -1;
private int taskId=-1;
public HBase20xSQLReaderTask(Configuration config, int taskGroupId, int taskId) {
this.readerConfig = config;
this.taskGroupId = taskGroupId;
this.taskId = taskId;
}
public void readRecord(RecordSender recordSender) {
String querySql = readerConfig.getString(Constant.QUERY_SQL_PER_SPLIT);
LOG.info("Begin to read record by Sql: [{}\n] {}.", querySql);
HBase20SQLReaderHelper helper = new HBase20SQLReaderHelper(readerConfig);
Connection conn = helper.getConnection(readerConfig.getString(Key.QUERYSERVER_ADDRESS),
readerConfig.getString(Key.SERIALIZATION_NAME, Constant.DEFAULT_SERIALIZATION));
Statement statement = null;
ResultSet resultSet = null;
try {
long rsNextUsedTime = 0;
long lastTime = System.nanoTime();
statement = conn.createStatement();
// 统计查询时间
PerfRecord queryPerfRecord = new PerfRecord(taskGroupId,taskId, PerfRecord.PHASE.SQL_QUERY);
queryPerfRecord.start();
resultSet = statement.executeQuery(querySql);
ResultSetMetaData meta = resultSet.getMetaData();
int columnNum = meta.getColumnCount();
// 统计的result_Next时间
PerfRecord allResultPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL);
allResultPerfRecord.start();
while (resultSet.next()) {
Record record = recordSender.createRecord();
rsNextUsedTime += (System.nanoTime() - lastTime);
for (int i = 1; i <= columnNum; i++) {
Column column = this.convertPhoenixValueToDataxColumn(meta.getColumnType(i), resultSet.getObject(i));
record.addColumn(column);
}
lastTime = System.nanoTime();
recordSender.sendToWriter(record);
}
allResultPerfRecord.end(rsNextUsedTime);
LOG.info("Finished read record by Sql: [{}\n] {}.", querySql);
} catch (SQLException e) {
throw DataXException.asDataXException(
HBase20xSQLReaderErrorCode.QUERY_DATA_ERROR, "查询Phoenix数据出现异常,请检查服务状态或与HBase管理员联系!", e);
} finally {
helper.closeJdbc(conn, statement, resultSet);
}
}
private Column convertPhoenixValueToDataxColumn(int sqlType, Object value) {
Column column;
switch (sqlType) {
case Types.CHAR:
case Types.VARCHAR:
column = new StringColumn((String) value);
break;
case Types.BINARY:
case Types.VARBINARY:
column = new BytesColumn((byte[]) value);
break;
case Types.BOOLEAN:
column = new BoolColumn((Boolean) value);
break;
case Types.INTEGER:
column = new LongColumn((Integer) value);
break;
case Types.TINYINT:
column = new LongColumn(((Byte) value).longValue());
break;
case Types.SMALLINT:
column = new LongColumn(((Short) value).longValue());
break;
case Types.BIGINT:
column = new LongColumn((Long) value);
break;
case Types.FLOAT:
column = new DoubleColumn((Float.valueOf(value.toString())));
break;
case Types.DECIMAL:
column = new DoubleColumn((BigDecimal)value);
break;
case Types.DOUBLE:
column = new DoubleColumn((Double) value);
break;
case Types.DATE:
column = new DateColumn((Date) value);
break;
case Types.TIME:
column = new DateColumn((Time) value);
break;
case Types.TIMESTAMP:
column = new DateColumn((Timestamp) value);
break;
default:
throw DataXException.asDataXException(
HBase20xSQLReaderErrorCode.PHOENIX_COLUMN_TYPE_CONVERT_ERROR, "遇到不可识别的phoenix类型," + "sqlType :" + sqlType);
}
return column;
}
}
package com.alibaba.datax.plugin.reader.hbase20xsqlreader;
public class Key {
/**
* 【必选】writer要读取的表的表名
*/
public final static String TABLE = "table";
/**
* 【必选】writer要读取哪些列
*/
public final static String COLUMN = "column";
/**
* 【必选】Phoenix QueryServer服务地址
*/
public final static String QUERYSERVER_ADDRESS = "queryServerAddress";
/**
* 【可选】序列化格式,默认为PROTOBUF
*/
public static final String SERIALIZATION_NAME = "serialization";
/**
* 【可选】Phoenix表所属schema,默认为空
*/
public static final String SCHEMA = "schema";
/**
* 【可选】读取数据时切分列
*/
public static final String SPLIT_KEY = "splitKey";
/**
* 【可选】读取数据时切分点
*/
public static final String SPLIT_POINT = "splitPoint";
/**
* 【可选】读取数据过滤条件配置
*/
public static final String WHERE = "where";
/**
* 【可选】查询语句配置
*/
public static final String QUERY_SQL = "querySql";
}
{
"name": "hbase20xsqlreader",
"class": "com.alibaba.datax.plugin.reader.hbase20xsqlreader.HBase20xSQLReader",
"description": "useScene: prod. mechanism: read data from phoenix through queryserver.",
"developer": "bake"
}
{
"name": "hbase20xsqlreader",
"parameter": {
"queryserverAddress": "",
"serialization": "PROTOBUF",
"schema": "",
"table": "TABLE1",
"column": ["ID", "NAME"],
"splitKey": "rowkey",
"splitPoint":[],
"where": ""
}
}
# HBase20xsqlwriter插件文档
## 1. 快速介绍
HBase20xsqlwriter实现了向hbase中的SQL表(phoenix)批量导入数据的功能。Phoenix因为对rowkey做了数据编码,所以,直接使用HBaseAPI进行写入会面临手工数据转换的问题,麻烦且易错。本插件提供了SQL方式直接向Phoenix表写入数据。
在底层实现上,通过Phoenix QueryServer的轻客户端驱动,执行UPSERT语句向Phoenix写入数据。
### 1.1 支持的功能
* 支持带索引的表的数据导入,可以同步更新所有的索引表
### 1.2 限制
* 要求版本为Phoenix5.x及HBase2.x
* 仅支持通过Phoenix QeuryServer导入数据,因此您Phoenix必须启动QueryServer服务才能使用本插件
* 不支持清空已有表数据
* 仅支持通过phoenix创建的表,不支持原生HBase表
* 不支持带时间戳的数据导入
## 2. 实现原理
通过Phoenix轻客户端,连接Phoenix QueryServer服务,执行UPSERT语句向表中批量写入数据。因为使用上层接口,所以,可以同步更新索引表。
## 3. 配置说明
### 3.1 配置样例
```json
{
"job": {
"entry": {
"jvm": "-Xms2048m -Xmx2048m"
},
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": "/Users/shf/workplace/datax_test/hbase20xsqlwriter/txt/normal.txt",
"charset": "UTF-8",
"column": [
{
"index": 0,
"type": "String"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
}
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "hbase20xsqlwriter",
"parameter": {
"batchSize": "100",
"column": [
"UID",
"TS",
"EVENTID",
"CONTENT"
],
"queryServerAddress": "http://127.0.0.1:8765",
"nullMode": "skip",
"table": "目标hbase表名,大小写有关"
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}
```
### 3.2 参数说明
* **name**
* 描述:插件名字,必须是`hbase11xsqlwriter`
* 必选:是
* 默认值:无
* **schema**
* 描述:表所在的schema
* 必选:否 <br />
* 默认值:无 <br />
* **table**
* 描述:要导入的表名,大小写敏感,通常phoenix表都是**大写**表名
* 必选:是
* 默认值:无
* **column**
* 描述:列名,大小写敏感,通常phoenix的列名都是**大写**
* 需要注意列的顺序,必须与reader输出的列的顺序一一对应。
* 不需要填写数据类型,会自动从phoenix获取列的元数据
* 必选:是
* 默认值:无
* **queryServerAddress**
* 描述:Phoenix QueryServer地址,为必填项,格式:http://${hostName}:${ip},如http://172.16.34.58:8765
* 必选:是
* 默认值:无
* **serialization**
* 描述:QueryServer使用的序列化协议
* 必选:否
* 默认值:PROTOBUF
* **batchSize**
* 描述:批量写入的最大行数
* 必选:否
* 默认值:256
* **nullMode**
* 描述:读取到的列值为null时,如何处理。目前有两种方式:
* skip:跳过这一列,即不插入这一列(如果该行的这一列之前已经存在,则会被删除)
* empty:插入空值,值类型的空值是0,varchar的空值是空字符串
* 必选:否
* 默认值:skip
## 4. 性能报告
## 5. 约束限制
writer中的列的定义顺序必须与reader的列顺序匹配。reader中的列顺序定义了输出的每一行中,列的组织顺序。而writer的列顺序,定义的是在收到的数据中,writer期待的列的顺序。例如:
reader的列顺序是: c1, c2, c3, c4
writer的列顺序是: x1, x2, x3, x4
则reader输出的列c1就会赋值给writer的列x1。如果writer的列顺序是x1, x2, x4, x3,则c3会赋值给x4,c4会赋值给x3.
## 6. FAQ
1. 并发开多少合适?速度慢时增加并发有用吗?
数据导入进程默认JVM的堆大小是2GB,并发(channel数)是通过多线程实现的,开过多的线程有时并不能提高导入速度,反而可能因为过于频繁的GC导致性能下降。一般建议并发数(channel)为5-10.
2. batchSize设置多少比较合适?
默认是256,但应根据每行的大小来计算最合适的batchSize。通常一次操作的数据量在2MB-4MB左右,用这个值除以行大小,即可得到batchSize。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hbase20xsqlwriter</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<phoenix.version>5.1.0-HBase-2.0.0.2</phoenix.version>
<commons-codec.version>1.8</commons-codec.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.aliyun.phoenix</groupId>
<artifactId>ali-phoenix-shaded-thin-client</artifactId>
<version>${phoenix.version}</version>
</dependency>
<!-- for test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-service-face</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
</includes>
</resource>
</resources>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/hbase20xsqlwriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>hbase20xsqlwriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/hbase20xsqlwriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/hbase20xsqlwriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>
package com.alibaba.datax.plugin.writer.hbase20xsqlwriter;
public final class Constant {
public static final String DEFAULT_NULL_MODE = "skip";
public static final String DEFAULT_SERIALIZATION = "PROTOBUF";
public static final int DEFAULT_BATCH_ROW_COUNT = 256; // 默认一次写256行
public static final int TYPE_UNSIGNED_TINYINT = 11;
public static final int TYPE_UNSIGNED_SMALLINT = 13;
public static final int TYPE_UNSIGNED_INTEGER = 9;
public static final int TYPE_UNSIGNED_LONG = 10;
public static final int TYPE_UNSIGNED_FLOAT = 14;
public static final int TYPE_UNSIGNED_DOUBLE = 15;
public static final int TYPE_UNSIGNED_DATE = 19;
public static final int TYPE_UNSIGNED_TIME = 18;
public static final int TYPE_UNSIGNED_TIMESTAMP = 20;
}
package com.alibaba.datax.plugin.writer.hbase20xsqlwriter;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
public class HBase20xSQLHelper {
private static final Logger LOG = LoggerFactory.getLogger(HBase20xSQLHelper.class);
/**
* phoenix瘦客户端连接前缀
*/
public static final String CONNECT_STRING_PREFIX = "jdbc:phoenix:thin:";
/**
* phoenix驱动名
*/
public static final String CONNECT_DRIVER_STRING = "org.apache.phoenix.queryserver.client.Driver";
/**
* 从系统表查找配置表信息
*/
public static final String SELECT_CATALOG_TABLE_STRING = "SELECT COLUMN_NAME FROM SYSTEM.CATALOG WHERE TABLE_NAME='%s' AND COLUMN_NAME IS NOT NULL";
/**
* 验证配置参数是否正确
*/
public static void validateParameter(com.alibaba.datax.common.util.Configuration originalConfig) {
// 表名和queryserver地址必须配置,否则抛异常
String tableName = originalConfig.getNecessaryValue(Key.TABLE, HBase20xSQLWriterErrorCode.REQUIRED_VALUE);
String queryServerAddress = originalConfig.getNecessaryValue(Key.QUERYSERVER_ADDRESS, HBase20xSQLWriterErrorCode.REQUIRED_VALUE);
// 序列化格式,可不配置,默认PROTOBUF
String serialization = originalConfig.getString(Key.SERIALIZATION_NAME, Constant.DEFAULT_SERIALIZATION);
String connStr = getConnectionUrl(queryServerAddress, serialization);
// 校验jdbc连接是否正常
Connection conn = getThinClientConnection(connStr);
List<String> columnNames = originalConfig.getList(Key.COLUMN, String.class);
if (columnNames == null || columnNames.isEmpty()) {
throw DataXException.asDataXException(
HBase20xSQLWriterErrorCode.ILLEGAL_VALUE, "HBase的columns配置不能为空,请添加目标表的列名配置.");
}
String schema = originalConfig.getString(Key.SCHEMA);
// 检查表以及配置列是否存在
checkTable(conn, schema, tableName, columnNames);
}
/**
* 获取JDBC连接,轻量级连接,使用完后必须显式close
*/
public static Connection getThinClientConnection(String connStr) {
LOG.debug("Connecting to QueryServer [" + connStr + "] ...");
Connection conn;
try {
Class.forName(CONNECT_DRIVER_STRING);
conn = DriverManager.getConnection(connStr);
conn.setAutoCommit(false);
} catch (Throwable e) {
throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.GET_QUERYSERVER_CONNECTION_ERROR,
"无法连接QueryServer,配置不正确或服务未启动,请检查配置和服务状态或者联系HBase管理员.", e);
}
LOG.debug("Connected to QueryServer successfully.");
return conn;
}
public static Connection getJdbcConnection(Configuration conf) {
String queryServerAddress = conf.getNecessaryValue(Key.QUERYSERVER_ADDRESS, HBase20xSQLWriterErrorCode.REQUIRED_VALUE);
// 序列化格式,可不配置,默认PROTOBUF
String serialization = conf.getString(Key.SERIALIZATION_NAME, "PROTOBUF");
String connStr = getConnectionUrl(queryServerAddress, serialization);
return getThinClientConnection(connStr);
}
public static String getConnectionUrl(String queryServerAddress, String serialization) {
String urlFmt = CONNECT_STRING_PREFIX + "url=%s;serialization=%s";
return String.format(urlFmt, queryServerAddress, serialization);
}
public static void checkTable(Connection conn, String schema, String tableName, List<String> columnNames) throws DataXException {
String selectSystemTable = getSelectSystemSQL(schema, tableName);
Statement st = null;
ResultSet rs = null;
try {
st = conn.createStatement();
rs = st.executeQuery(selectSystemTable);
List<String> allColumns = new ArrayList<String>();
if (rs.next()) {
allColumns.add(rs.getString(1));
} else {
LOG.error(tableName + "表不存在,请检查表名是否正确或是否已创建.", HBase20xSQLWriterErrorCode.GET_HBASE_TABLE_ERROR);
throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.GET_HBASE_TABLE_ERROR,
tableName + "表不存在,请检查表名是否正确或是否已创建.");
}
while (rs.next()) {
allColumns.add(rs.getString(1));
}
for (String columnName : columnNames) {
if (!allColumns.contains(columnName)) {
// 用户配置的列名在元数据中不存在
throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.ILLEGAL_VALUE,
"您配置的列" + columnName + "在目的表" + tableName + "的元数据中不存在,请检查您的配置或者联系HBase管理员.");
}
}
} catch (SQLException t) {
throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.GET_HBASE_TABLE_ERROR,
"获取表" + tableName + "信息失败,请检查您的集群和表状态或者联系HBase管理员.", t);
} finally {
closeJdbc(conn, st, rs);
}
}
private static String getSelectSystemSQL(String schema, String tableName) {
String sql = String.format(SELECT_CATALOG_TABLE_STRING, tableName);
if (schema != null) {
sql = sql + " AND TABLE_SCHEM = '" + schema + "'";
}
return sql;
}
public static void closeJdbc(Connection connection, Statement statement, ResultSet resultSet) {
try {
if (resultSet != null) {
resultSet.close();
}
if (statement != null) {
statement.close();
}
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
LOG.warn("数据库连接关闭异常.", HBase20xSQLWriterErrorCode.CLOSE_HBASE_CONNECTION_ERROR);
}
}
}
package com.alibaba.datax.plugin.writer.hbase20xsqlwriter;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import java.util.ArrayList;
import java.util.List;
public class HBase20xSQLWriter extends Writer {
public static class Job extends Writer.Job {
private Configuration config = null;
@Override
public void init() {
this.config = this.getPluginJobConf();
HBase20xSQLHelper.validateParameter(this.config);
}
@Override
public List<Configuration> split(int mandatoryNumber) {
List<Configuration> splitResultConfigs = new ArrayList<Configuration>();
for (int j = 0; j < mandatoryNumber; j++) {
splitResultConfigs.add(config.clone());
}
return splitResultConfigs;
}
@Override
public void destroy() {
//doNothing
}
}
public static class Task extends Writer.Task {
private Configuration taskConfig;
private HBase20xSQLWriterTask writerTask;
@Override
public void init() {
this.taskConfig = super.getPluginJobConf();
this.writerTask = new HBase20xSQLWriterTask(this.taskConfig);
}
@Override
public void startWrite(RecordReceiver lineReceiver) {
this.writerTask.startWriter(lineReceiver, super.getTaskPluginCollector());
}
@Override
public void destroy() {
// 不需要close
}
}
}
\ No newline at end of file
package com.alibaba.datax.plugin.writer.hbase20xsqlwriter;
import com.alibaba.datax.common.spi.ErrorCode;
public enum HBase20xSQLWriterErrorCode implements ErrorCode {
REQUIRED_VALUE("Hbasewriter-00", "您缺失了必须填写的参数值."),
ILLEGAL_VALUE("Hbasewriter-01", "您填写的参数值不合法."),
GET_QUERYSERVER_CONNECTION_ERROR("Hbasewriter-02", "获取QueryServer连接时出错."),
GET_HBASE_TABLE_ERROR("Hbasewriter-03", "获取 Hbase table时出错."),
CLOSE_HBASE_CONNECTION_ERROR("Hbasewriter-04", "关闭Hbase连接时出错."),
GET_TABLE_COLUMNTYPE_ERROR("Hbasewriter-05", "获取表列类型时出错."),
PUT_HBASE_ERROR("Hbasewriter-07", "写入hbase时发生IO异常."),
;
private final String code;
private final String description;
private HBase20xSQLWriterErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s].", this.code, this.description);
}
}
package com.alibaba.datax.plugin.writer.hbase20xsqlwriter;
public class Key {
/**
* 【必选】writer要写入的表的表名
*/
public final static String TABLE = "table";
/**
* 【必选】writer要写入哪些列
*/
public final static String COLUMN = "column";
/**
* 【必选】Phoenix QueryServer服务地址
*/
public final static String QUERYSERVER_ADDRESS = "queryServerAddress";
/**
* 【可选】序列化格式,默认为PROTOBUF
*/
public static final String SERIALIZATION_NAME = "serialization";
/**
* 【可选】批量写入的最大行数,默认100行
*/
public static final String BATCHSIZE = "batchSize";
/**
* 【可选】遇到空值默认跳过
*/
public static final String NULLMODE = "nullMode";
/**
* 【可选】Phoenix表所属schema,默认为空
*/
public static final String SCHEMA = "schema";
}
package com.alibaba.datax.plugin.writer.hbase20xsqlwriter;
import com.alibaba.datax.common.exception.DataXException;
import java.util.Arrays;
public enum NullModeType {
Skip("skip"),
Empty("empty")
;
private String mode;
NullModeType(String mode) {
this.mode = mode.toLowerCase();
}
public String getMode() {
return mode;
}
public static NullModeType getByTypeName(String modeName) {
for (NullModeType modeType : values()) {
if (modeType.mode.equalsIgnoreCase(modeName)) {
return modeType;
}
}
throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.ILLEGAL_VALUE,
"Hbasewriter 不支持该 nullMode 类型:" + modeName + ", 目前支持的 nullMode 类型是:" + Arrays.asList(values()));
}
}
{
"name": "hbase20xsqlwriter",
"class": "com.alibaba.datax.plugin.writer.hbase20xsqlwriter.HBase20xSQLWriter",
"description": "useScene: prod. mechanism: use hbase sql UPSERT to put data, index tables will be updated too.",
"developer": "bake"
}
{
"name": "hbase20xsqlwriter",
"parameter": {
"queryServerAddress": "",
"table": "",
"serialization": "PROTOBUF",
"column": [
],
"batchSize": "100",
"nullMode": "skip",
"schema": ""
}
}
\ No newline at end of file
# OpenTSDBReader 插件文档
___
## 1 快速介绍
OpenTSDBReader 插件实现了从 OpenTSDB 读取数据。OpenTSDB 是主要由 Yahoo 维护的、可扩展的、分布式时序数据库,与阿里巴巴自研 TSDB 的关系与区别详见阿里云官网:《[相比 OpenTSDB 优势](https://help.aliyun.com/document_detail/113368.html)
## 2 实现原理
在底层实现上,OpenTSDBReader 通过 HTTP 请求链接到 OpenTSDB 实例,利用 `/api/config` 接口获取到其底层存储 HBase 的连接信息,再利用 AsyncHBase 框架连接 HBase,通过 Scan 的方式将数据点扫描出来。整个同步的过程通过 metric 和时间段进行切分,即某个 metric 在某一个小时内的数据迁移,组合成一个迁移 Task。
## 3 功能说明
### 3.1 配置样例
* 配置一个从 OpenTSDB 数据库同步抽取数据到本地的作业:
```json
{
"job": {
"content": [
{
"reader": {
"name": "opentsdbreader",
"parameter": {
"endpoint": "http://localhost:4242",
"column": [
"m"
],
"beginDateTime": "2019-01-01 00:00:00",
"endDateTime": "2019-01-01 03:00:00"
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
```
### 3.2 参数说明
* **name**
* 描述:本插件的名称
* 必选:是
* 默认值:opentsdbreader
* **parameter**
* **endpoint**
* 描述:OpenTSDB 的 HTTP 连接地址
* 必选:是
* 格式:http://IP:Port
* 默认值:无
* **column**
* 描述:数据迁移任务需要迁移的 Metric 列表
* 必选:是
* 默认值:无
* **beginDateTime**
* 描述:和 endDateTime 配合使用,用于指定哪个时间段内的数据点,需要被迁移
* 必选:是
* 格式:`yyyy-MM-dd HH:mm:ss`
* 默认值:无
* 注意:指定起止时间会自动忽略分钟和秒,转为整点时刻,例如 2019-4-18 的 [3:35, 4:55) 会被转为 [3:00, 4:00)
* **endDateTime**
* 描述:和 beginDateTime 配合使用,用于指定哪个时间段内的数据点,需要被迁移
* 必选:是
* 格式:`yyyy-MM-dd HH:mm:ss`
* 默认值:无
* 注意:指定起止时间会自动忽略分钟和秒,转为整点时刻,例如 2019-4-18 的 [3:35, 4:55) 会被转为 [3:00, 4:00)
### 3.3 类型转换
| DataX 内部类型 | TSDB 数据类型 |
| -------------- | ------------------------------------------------------------ |
| String | TSDB 数据点序列化字符串,包括 timestamp、metric、tags 和 value |
## 4 性能报告
### 4.1 环境准备
#### 4.1.1 数据特征
从 Metric、时间线、Value 和 采集周期 四个方面来描述:
##### metric
固定指定一个 metric 为 `m`
##### tagkv
前四个 tagkv 全排列,形成 `10 * 20 * 100 * 100 = 2000000` 条时间线,最后 IP 对应 2000000 条时间线从 1 开始自增。
| **tag_k** | **tag_v** |
| --------- | ------------- |
| zone | z1~z10 |
| cluster | c1~c20 |
| group | g1~100 |
| app | a1~a100 |
| ip | ip1~ip2000000 |
##### value
度量值为 [1, 100] 区间内的随机值
##### interval
采集周期为 10 秒,持续摄入 3 小时,总数据量为 `3 * 60 * 60 / 10 * 2000000 = 2,160,000,000` 个数据点。
#### 4.1.2 机器参数
OpenTSDB Reader 机型: 64C256G
HBase 机型: 8C16G * 5
#### 4.1.3 DataX jvm 参数
"-Xms4096m -Xmx4096m"
### 4.2 测试报告
| 通道数| DataX 速度 (Rec/s) |DataX 流量 (MB/s)|
|--------| --------|--------|
|1| 215428 | 25.65 |
|2| 424994 | 50.60 |
|3| 603132 | 71.81 |
## 5 约束限制
### 5.1 需要确保与 OpenTSDB 底层存储的网络是连通的
具体缘由详见 6.1
### 5.2 如果存在某一个 Metric 下在一个小时范围内的数据量过大,可能需要通过 `-j` 参数调整 JVM 内存大小
考虑到下游 Writer 如果写入速度不及 OpenTSDB reader 的查询数据,可能会存在积压的情况,因此需要适当地调整 JVM 参数。以"从 OpenTSDB 数据库同步抽取数据到本地的作业"为例,启动命令如下:
```bash
python datax/bin/datax.py opentsdb2stream.json -j "-Xms4096m -Xmx4096m"
```
### 5.3 指定起止时间会自动被转为整点时刻
指定起止时间会自动被转为整点时刻,例如 2019-4-18 的 `[3:35, 3:55)` 会被转为 `[3:00, 4:00)`
### 5.4 目前只支持兼容 OpenTSDB 2.3.x
其他版本暂不保证兼容
## 6 FAQ
***
**Q:为什么需要连接 OpenTSDB 的底层存储,为什么不直接使用 `/api/query` 查询获取数据点?**
A:因为通过 OpenTSDB 的 HTTP 接口(`/api/query`)来读取数据的话,经内部压测发现,在大数据量的情况下,会导致 OpenTSDB 的异步框架会报 CallBack 过多的问题;所以,采用了直连底层 HBase 存储,通过 Scan 的方式来扫描数据点,来避免这个问题。另外,还考虑到,可以通过指定 metric 和时间范围,可以顺序地 Scan HBase 表,提高查询效率。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>opentsdbreader</artifactId>
<name>opentsdbreader</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- common -->
<commons-lang3.version>3.3.2</commons-lang3.version>
<!-- http -->
<httpclient.version>4.4</httpclient.version>
<commons-io.version>2.4</commons-io.version>
<!-- json -->
<fastjson.version>1.2.28</fastjson.version>
<!-- opentsdb -->
<opentsdb.version>2.3.2</opentsdb.version>
<!-- test -->
<junit4.version>4.12</junit4.version>
<!-- time -->
<joda-time.version>2.9.9</joda-time.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>fastjson</artifactId>
<groupId>com.alibaba</groupId>
</exclusion>
<exclusion>
<artifactId>commons-math3</artifactId>
<groupId>org.apache.commons</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<!-- common -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<!-- http -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>fluent-hc</artifactId>
<version>${httpclient.version}</version>
</dependency>
<!-- json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- opentsdb -->
<dependency>
<groupId>net.opentsdb</groupId>
<artifactId>opentsdb</artifactId>
<version>${opentsdb.version}</version>
</dependency>
<!-- time -->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${joda-time.version}</version>
</dependency>
<!-- test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit4.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/reader/opentsdbreader</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>opentsdbreader-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/reader/opentsdbreader</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/reader/opentsdbreader/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>
package com.alibaba.datax.plugin.reader.conn;
import net.opentsdb.core.*;
import net.opentsdb.utils.DateTime;
import java.util.ArrayList;
import java.util.HashMap;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:CliQuery
*
* @author Benedict Jin
* @since 2019-04-17
*/
final class CliQuery {
/**
* Parses the query from the command lines.
*
* @param args The command line arguments.
* @param tsdb The TSDB to use.
* @param queries The list in which {@link Query}s will be appended.
*/
static void parseCommandLineQuery(final String[] args,
final TSDB tsdb,
final ArrayList<Query> queries) {
long start_ts = DateTime.parseDateTimeString(args[0], null);
if (start_ts >= 0) {
start_ts /= 1000;
}
long end_ts = -1;
if (args.length > 3) {
// see if we can detect an end time
try {
if (args[1].charAt(0) != '+' && (args[1].indexOf(':') >= 0
|| args[1].indexOf('/') >= 0 || args[1].indexOf('-') >= 0
|| Long.parseLong(args[1]) > 0)) {
end_ts = DateTime.parseDateTimeString(args[1], null);
}
} catch (NumberFormatException ignore) {
// ignore it as it means the third parameter is likely the aggregator
}
}
// temp fixup to seconds from ms until the rest of TSDB supports ms
// Note you can't append this to the DateTime.parseDateTimeString() call as
// it clobbers -1 results
if (end_ts >= 0) {
end_ts /= 1000;
}
int i = end_ts < 0 ? 1 : 2;
while (i < args.length && args[i].charAt(0) == '+') {
i++;
}
while (i < args.length) {
final Aggregator agg = Aggregators.get(args[i++]);
final boolean rate = "rate".equals(args[i]);
RateOptions rate_options = new RateOptions(false, Long.MAX_VALUE,
RateOptions.DEFAULT_RESET_VALUE);
if (rate) {
i++;
long counterMax = Long.MAX_VALUE;
long resetValue = RateOptions.DEFAULT_RESET_VALUE;
if (args[i].startsWith("counter")) {
String[] parts = Tags.splitString(args[i], ',');
if (parts.length >= 2 && parts[1].length() > 0) {
counterMax = Long.parseLong(parts[1]);
}
if (parts.length >= 3 && parts[2].length() > 0) {
resetValue = Long.parseLong(parts[2]);
}
rate_options = new RateOptions(true, counterMax, resetValue);
i++;
}
}
final boolean downsample = "downsample".equals(args[i]);
if (downsample) {
i++;
}
final long interval = downsample ? Long.parseLong(args[i++]) : 0;
final Aggregator sampler = downsample ? Aggregators.get(args[i++]) : null;
final String metric = args[i++];
final HashMap<String, String> tags = new HashMap<String, String>();
while (i < args.length && args[i].indexOf(' ', 1) < 0
&& args[i].indexOf('=', 1) > 0) {
Tags.parse(tags, args[i++]);
}
final Query query = tsdb.newQuery();
query.setStartTime(start_ts);
if (end_ts > 0) {
query.setEndTime(end_ts);
}
query.setTimeSeries(metric, tags, agg, rate, rate_options);
if (downsample) {
query.downsample(interval, sampler);
}
queries.add(query);
}
}
}
package com.alibaba.datax.plugin.reader.conn;
import com.alibaba.datax.common.plugin.RecordSender;
import java.util.List;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Connection for TSDB-like databases
*
* @author Benedict Jin
* @since 2019-03-29
*/
public interface Connection4TSDB {
/**
* Get the address of Database.
*
* @return host+ip
*/
String address();
/**
* Get the version of Database.
*
* @return version
*/
String version();
/**
* Get these configurations.
*
* @return configs
*/
String config();
/**
* Get the list of supported version.
*
* @return version list
*/
String[] getSupportVersionPrefix();
/**
* Send data points by metric & start time & end time.
*
* @param metric metric
* @param start startTime
* @param end endTime
* @param recordSender sender
*/
void sendDPs(String metric, Long start, Long end, RecordSender recordSender) throws Exception;
/**
* Put data point.
*
* @param dp data point
* @return whether the data point is written successfully
*/
boolean put(DataPoint4TSDB dp);
/**
* Put data points.
*
* @param dps data points
* @return whether the data point is written successfully
*/
boolean put(List<DataPoint4TSDB> dps);
/**
* Whether current version is supported.
*
* @return true: supported; false: not yet!
*/
boolean isSupported();
}
package com.alibaba.datax.plugin.reader.conn;
import com.alibaba.fastjson.JSON;
import java.util.Map;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:DataPoint for TSDB
*
* @author Benedict Jin
* @since 2019-04-10
*/
public class DataPoint4TSDB {
private long timestamp;
private String metric;
private Map<String, String> tags;
private Object value;
public DataPoint4TSDB() {
}
public DataPoint4TSDB(long timestamp, String metric, Map<String, String> tags, Object value) {
this.timestamp = timestamp;
this.metric = metric;
this.tags = tags;
this.value = value;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public String getMetric() {
return metric;
}
public void setMetric(String metric) {
this.metric = metric;
}
public Map<String, String> getTags() {
return tags;
}
public void setTags(Map<String, String> tags) {
this.tags = tags;
}
public Object getValue() {
return value;
}
public void setValue(Object value) {
this.value = value;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}
package com.alibaba.datax.plugin.reader.conn;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.plugin.RecordSender;
import net.opentsdb.core.*;
import net.opentsdb.core.Internal.Cell;
import org.hbase.async.KeyValue;
import org.hbase.async.Scanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Tool to dump the data straight from HBase
*
* @author Benedict Jin
* @since 2019-04-17
*/
final class DumpSeries {
private static final Logger LOG = LoggerFactory.getLogger(DumpSeries.class);
/**
* Dump all data points with special metric and time range, then send them all by {@link RecordSender}.
*/
static void doDump(TSDB tsdb, String[] args, RecordSender sender) throws Exception {
final ArrayList<Query> queries = new ArrayList<Query>();
CliQuery.parseCommandLineQuery(args, tsdb, queries);
List<DataPoint4TSDB> dps = new LinkedList<DataPoint4TSDB>();
for (final Query query : queries) {
final List<Scanner> scanners = Internal.getScanners(query);
for (Scanner scanner : scanners) {
ArrayList<ArrayList<KeyValue>> rows;
while ((rows = scanner.nextRows().join()) != null) {
for (final ArrayList<KeyValue> row : rows) {
final byte[] key = row.get(0).key();
final long baseTime = Internal.baseTime(tsdb, key);
final String metric = Internal.metricName(tsdb, key);
for (final KeyValue kv : row) {
formatKeyValue(dps, tsdb, kv, baseTime, metric);
for (DataPoint4TSDB dp : dps) {
StringColumn tsdbColumn = new StringColumn(dp.toString());
Record record = sender.createRecord();
record.addColumn(tsdbColumn);
sender.sendToWriter(record);
}
dps.clear();
}
}
}
}
}
}
/**
* Parse KeyValue into data points.
*/
private static void formatKeyValue(final List<DataPoint4TSDB> dps, final TSDB tsdb,
final KeyValue kv, final long baseTime, final String metric) {
Map<String, String> tagKVs = Internal.getTags(tsdb, kv.key());
final byte[] qualifier = kv.qualifier();
final int q_len = qualifier.length;
if (!AppendDataPoints.isAppendDataPoints(qualifier) && q_len % 2 != 0) {
// custom data object, not a data point
if (LOG.isDebugEnabled()) {
LOG.debug("Not a data point");
}
} else if (q_len == 2 || q_len == 4 && Internal.inMilliseconds(qualifier)) {
// regular data point
final Cell cell = Internal.parseSingleValue(kv);
if (cell == null) {
throw new IllegalDataException("Unable to parse row: " + kv);
}
dps.add(new DataPoint4TSDB(cell.absoluteTimestamp(baseTime), metric, tagKVs, cell.parseValue()));
} else {
final Collection<Cell> cells;
if (q_len == 3) {
// append data points
cells = new AppendDataPoints().parseKeyValue(tsdb, kv);
} else {
// compacted column
cells = Internal.extractDataPoints(kv);
}
for (Cell cell : cells) {
dps.add(new DataPoint4TSDB(cell.absoluteTimestamp(baseTime), metric, tagKVs, cell.parseValue()));
}
}
}
}
package com.alibaba.datax.plugin.reader.conn;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.plugin.reader.util.TSDBUtils;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:OpenTSDB Connection
*
* @author Benedict Jin
* @since 2019-03-29
*/
public class OpenTSDBConnection implements Connection4TSDB {
private String address;
public OpenTSDBConnection(String address) {
this.address = address;
}
@Override
public String address() {
return address;
}
@Override
public String version() {
return TSDBUtils.version(address);
}
@Override
public String config() {
return TSDBUtils.config(address);
}
@Override
public String[] getSupportVersionPrefix() {
return new String[]{"2.3"};
}
@Override
public void sendDPs(String metric, Long start, Long end, RecordSender recordSender) throws Exception {
OpenTSDBDump.dump(this, metric, start, end, recordSender);
}
@Override
public boolean put(DataPoint4TSDB dp) {
return false;
}
@Override
public boolean put(List<DataPoint4TSDB> dps) {
return false;
}
@Override
public boolean isSupported() {
String versionJson = version();
if (StringUtils.isBlank(versionJson)) {
throw new RuntimeException("Cannot get the version!");
}
String version = JSON.parseObject(versionJson).getString("version");
if (StringUtils.isBlank(version)) {
return false;
}
for (String prefix : getSupportVersionPrefix()) {
if (version.startsWith(prefix)) {
return true;
}
}
return false;
}
}
package com.alibaba.datax.plugin.reader.conn;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.fastjson.JSON;
import net.opentsdb.core.TSDB;
import net.opentsdb.utils.Config;
import java.util.Map;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:OpenTSDB Dump
*
* @author Benedict Jin
* @since 2019-04-15
*/
final class OpenTSDBDump {
private static TSDB TSDB_INSTANCE;
private OpenTSDBDump() {
}
static void dump(OpenTSDBConnection conn, String metric, Long start, Long end, RecordSender sender) throws Exception {
DumpSeries.doDump(getTSDB(conn), new String[]{start + "", end + "", "none", metric}, sender);
}
private static TSDB getTSDB(OpenTSDBConnection conn) {
if (TSDB_INSTANCE == null) {
synchronized (TSDB.class) {
if (TSDB_INSTANCE == null) {
try {
Config config = new Config(false);
Map configurations = JSON.parseObject(conn.config(), Map.class);
for (Object key : configurations.keySet()) {
config.overrideConfig(key.toString(), configurations.get(key.toString()).toString());
}
TSDB_INSTANCE = new TSDB(config);
} catch (Exception e) {
throw new RuntimeException("Cannot init OpenTSDB connection!");
}
}
}
}
return TSDB_INSTANCE;
}
}
package com.alibaba.datax.plugin.reader.opentsdbreader;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Key
*
* @author Benedict Jin
* @since 2019-04-18
*/
public final class Constant {
static final String DEFAULT_DATA_FORMAT = "yyyy-MM-dd HH:mm:ss";
}
package com.alibaba.datax.plugin.reader.opentsdbreader;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Key
*
* @author Benedict Jin
* @since 2019-04-18
*/
public class Key {
static final String ENDPOINT = "endpoint";
static final String COLUMN = "column";
static final String BEGIN_DATE_TIME = "beginDateTime";
static final String END_DATE_TIME = "endDateTime";
}
package com.alibaba.datax.plugin.reader.opentsdbreader;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.reader.conn.OpenTSDBConnection;
import com.alibaba.datax.plugin.reader.util.TimeUtils;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Key
*
* @author Benedict Jin
* @since 2019-04-18
*/
@SuppressWarnings("unused")
public class OpenTSDBReader extends Reader {
public static class Job extends Reader.Job {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration originalConfig;
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
String address = originalConfig.getString(Key.ENDPOINT);
if (StringUtils.isBlank(address)) {
throw DataXException.asDataXException(
OpenTSDBReaderErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.ENDPOINT + "] is not set.");
}
List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
if (columns == null || columns.isEmpty()) {
throw DataXException.asDataXException(
OpenTSDBReaderErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.COLUMN + "] is not set.");
}
SimpleDateFormat format = new SimpleDateFormat(Constant.DEFAULT_DATA_FORMAT);
String startTime = originalConfig.getString(Key.BEGIN_DATE_TIME);
Long startDate;
if (startTime == null || startTime.trim().length() == 0) {
throw DataXException.asDataXException(
OpenTSDBReaderErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.BEGIN_DATE_TIME + "] is not set.");
} else {
try {
startDate = format.parse(startTime).getTime();
} catch (ParseException e) {
throw DataXException.asDataXException(OpenTSDBReaderErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.BEGIN_DATE_TIME +
"] needs to conform to the [" + Constant.DEFAULT_DATA_FORMAT + "] format.");
}
}
String endTime = originalConfig.getString(Key.END_DATE_TIME);
Long endDate;
if (endTime == null || endTime.trim().length() == 0) {
throw DataXException.asDataXException(
OpenTSDBReaderErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.END_DATE_TIME + "] is not set.");
} else {
try {
endDate = format.parse(endTime).getTime();
} catch (ParseException e) {
throw DataXException.asDataXException(OpenTSDBReaderErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.END_DATE_TIME +
"] needs to conform to the [" + Constant.DEFAULT_DATA_FORMAT + "] format.");
}
}
if (startDate >= endDate) {
throw DataXException.asDataXException(OpenTSDBReaderErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.BEGIN_DATE_TIME +
"] should be less than the parameter [" + Key.END_DATE_TIME + "].");
}
}
@Override
public void prepare() {
}
@Override
public List<Configuration> split(int adviceNumber) {
List<Configuration> configurations = new ArrayList<Configuration>();
// get metrics
List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
// get time range
SimpleDateFormat format = new SimpleDateFormat(Constant.DEFAULT_DATA_FORMAT);
long startTime;
try {
startTime = format.parse(originalConfig.getString(Key.BEGIN_DATE_TIME)).getTime();
} catch (ParseException e) {
throw DataXException.asDataXException(
OpenTSDBReaderErrorCode.ILLEGAL_VALUE, "解析[" + Key.BEGIN_DATE_TIME + "]失败.", e);
}
long endTime;
try {
endTime = format.parse(originalConfig.getString(Key.END_DATE_TIME)).getTime();
} catch (ParseException e) {
throw DataXException.asDataXException(
OpenTSDBReaderErrorCode.ILLEGAL_VALUE, "解析[" + Key.END_DATE_TIME + "]失败.", e);
}
if (TimeUtils.isSecond(startTime)) {
startTime *= 1000;
}
if (TimeUtils.isSecond(endTime)) {
endTime *= 1000;
}
DateTime startDateTime = new DateTime(TimeUtils.getTimeInHour(startTime));
DateTime endDateTime = new DateTime(TimeUtils.getTimeInHour(endTime));
// split by metric
for (String column : columns) {
// split by time in hour
while (startDateTime.isBefore(endDateTime)) {
Configuration clone = this.originalConfig.clone();
clone.set(Key.COLUMN, Collections.singletonList(column));
clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis());
startDateTime = startDateTime.plusHours(1);
// Make sure the time interval is [start, end).
// Because net.opentsdb.core.Query.setEndTime means less than or equal to the end time.
clone.set(Key.END_DATE_TIME, startDateTime.getMillis() - 1);
configurations.add(clone);
LOG.info("Configuration: {}", JSON.toJSONString(clone));
}
}
return configurations;
}
@Override
public void post() {
}
@Override
public void destroy() {
}
}
public static class Task extends Reader.Task {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private List<String> columns;
private OpenTSDBConnection conn;
private Long startTime;
private Long endTime;
@Override
public void init() {
Configuration readerSliceConfig = super.getPluginJobConf();
LOG.info("getPluginJobConf: {}", JSON.toJSONString(readerSliceConfig));
this.columns = readerSliceConfig.getList(Key.COLUMN, String.class);
String address = readerSliceConfig.getString(Key.ENDPOINT);
conn = new OpenTSDBConnection(address);
this.startTime = readerSliceConfig.getLong(Key.BEGIN_DATE_TIME);
this.endTime = readerSliceConfig.getLong(Key.END_DATE_TIME);
}
@Override
public void prepare() {
}
@Override
public void startRead(RecordSender recordSender) {
try {
for (String column : columns) {
conn.sendDPs(column, this.startTime, this.endTime, recordSender);
}
} catch (Exception e) {
throw DataXException.asDataXException(
OpenTSDBReaderErrorCode.ILLEGAL_VALUE, "获取或发送数据点的过程中出错!", e);
}
}
@Override
public void post() {
}
@Override
public void destroy() {
}
}
}
package com.alibaba.datax.plugin.reader.opentsdbreader;
import com.alibaba.datax.common.spi.ErrorCode;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:OpenTSDB Reader Error Code
*
* @author Benedict Jin
* @since 2019-04-18
*/
public enum OpenTSDBReaderErrorCode implements ErrorCode {
REQUIRED_VALUE("OpenTSDBReader-00", "缺失必要的值"),
ILLEGAL_VALUE("OpenTSDBReader-01", "值非法");
private final String code;
private final String description;
OpenTSDBReaderErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s]. ", this.code, this.description);
}
}
package com.alibaba.datax.plugin.reader.util;
import com.alibaba.fastjson.JSON;
import org.apache.http.client.fluent.Content;
import org.apache.http.client.fluent.Request;
import org.apache.http.entity.ContentType;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:HttpUtils
*
* @author Benedict Jin
* @since 2019-03-29
*/
public final class HttpUtils {
public final static Charset UTF_8 = Charset.forName("UTF-8");
public final static int CONNECT_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60);
public final static int SOCKET_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60);
private HttpUtils() {
}
public static String get(String url) throws Exception {
Content content = Request.Get(url)
.connectTimeout(CONNECT_TIMEOUT_DEFAULT_IN_MILL)
.socketTimeout(SOCKET_TIMEOUT_DEFAULT_IN_MILL)
.execute()
.returnContent();
if (content == null) {
return null;
}
return content.asString(UTF_8);
}
public static String post(String url, Map<String, Object> params) throws Exception {
return post(url, JSON.toJSONString(params), CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
}
public static String post(String url, String params) throws Exception {
return post(url, params, CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
}
public static String post(String url, Map<String, Object> params,
int connectTimeoutInMill, int socketTimeoutInMill) throws Exception {
return post(url, JSON.toJSONString(params), connectTimeoutInMill, socketTimeoutInMill);
}
public static String post(String url, String params,
int connectTimeoutInMill, int socketTimeoutInMill) throws Exception {
Content content = Request.Post(url)
.connectTimeout(connectTimeoutInMill)
.socketTimeout(socketTimeoutInMill)
.addHeader("Content-Type", "application/json")
.bodyString(params, ContentType.APPLICATION_JSON)
.execute()
.returnContent();
if (content == null) {
return null;
}
return content.asString(UTF_8);
}
}
package com.alibaba.datax.plugin.reader.util;
import com.alibaba.datax.plugin.reader.conn.DataPoint4TSDB;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:TSDB Utils
*
* @author Benedict Jin
* @since 2019-03-29
*/
public final class TSDBUtils {
private static final Logger LOG = LoggerFactory.getLogger(TSDBUtils.class);
private TSDBUtils() {
}
public static String version(String address) {
String url = String.format("%s/api/version", address);
String rsp;
try {
rsp = HttpUtils.get(url);
} catch (Exception e) {
throw new RuntimeException(e);
}
return rsp;
}
public static String config(String address) {
String url = String.format("%s/api/config", address);
String rsp;
try {
rsp = HttpUtils.get(url);
} catch (Exception e) {
throw new RuntimeException(e);
}
return rsp;
}
public static boolean put(String address, List<DataPoint4TSDB> dps) {
return put(address, JSON.toJSON(dps));
}
public static boolean put(String address, DataPoint4TSDB dp) {
return put(address, JSON.toJSON(dp));
}
private static boolean put(String address, Object o) {
String url = String.format("%s/api/put", address);
String rsp;
try {
rsp = HttpUtils.post(url, o.toString());
// If successful, the returned content should be null.
assert rsp == null;
} catch (Exception e) {
LOG.error("Address: {}, DataPoints: {}", url, o);
throw new RuntimeException(e);
}
return true;
}
}
package com.alibaba.datax.plugin.reader.util;
import java.util.concurrent.TimeUnit;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:TimeUtils
*
* @author Benedict Jin
* @since 2019-04-22
*/
public final class TimeUtils {
private TimeUtils() {
}
private static final long SECOND_MASK = 0xFFFFFFFF00000000L;
private static final long HOUR_IN_MILL = TimeUnit.HOURS.toMillis(1);
/**
* Weather the timestamp is second.
*
* @param ts timestamp
*/
public static boolean isSecond(long ts) {
return (ts & SECOND_MASK) == 0;
}
/**
* Get the hour.
*
* @param ms time in millisecond
*/
public static long getTimeInHour(long ms) {
return ms - ms % HOUR_IN_MILL;
}
}
{
"name": "opentsdbreader",
"class": "com.alibaba.datax.plugin.reader.opentsdbreader.OpenTSDBReader",
"description": {
"useScene": "从 OpenTSDB 中摄取数据点",
"mechanism": "根据时间和 metric 直连底层 HBase 存储,从而 Scan 出符合条件的数据点",
"warn": "指定起止时间会自动忽略分钟和秒,转为整点时刻,例如 2019-4-18 的 [3:35, 4:55) 会被转为 [3:00, 4:00)"
},
"developer": "Benedict Jin"
}
{
"name": "opentsdbreader",
"parameter": {
"endpoint": "http://localhost:8242",
"column": [
"m"
],
"startTime": "2019-01-01 00:00:00",
"endTime": "2019-01-01 01:00:00"
}
}
package com.alibaba.datax.plugin.reader.conn;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:OpenTSDB Connection4TSDB Test
*
* @author Benedict Jin
* @since 2019-03-29
*/
@Ignore
public class OpenTSDBConnectionTest {
private static final String OPENTSDB_ADDRESS = "http://localhost:8242";
@Test
public void testVersion() {
String version = new OpenTSDBConnection(OPENTSDB_ADDRESS).version();
Assert.assertNotNull(version);
}
@Test
public void testIsSupported() {
Assert.assertTrue(new OpenTSDBConnection(OPENTSDB_ADDRESS).isSupported());
}
}
package com.alibaba.datax.plugin.reader.util;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Const
*
* @author Benedict Jin
* @since 2019-03-29
*/
final class Const {
private Const() {
}
static final String OPENTSDB_ADDRESS = "http://localhost:8242";
static final String TSDB_ADDRESS = "http://localhost:8240";
}
package com.alibaba.datax.plugin.reader.util;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:HttpUtils Test
*
* @author Benedict Jin
* @since 2019-03-29
*/
@Ignore
public class HttpUtilsTest {
@Test
public void testSimpleCase() throws Exception {
String url = "https://httpbin.org/post";
Map<String, Object> params = new HashMap<String, Object>();
params.put("foo", "bar");
String rsp = HttpUtils.post(url, params);
System.out.println(rsp);
Assert.assertNotNull(rsp);
}
@Test
public void testGet() throws Exception {
String url = String.format("%s/api/version", Const.OPENTSDB_ADDRESS);
String rsp = HttpUtils.get(url);
System.out.println(rsp);
Assert.assertNotNull(rsp);
}
}
package com.alibaba.datax.plugin.reader.util;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:TSDB Test
*
* @author Benedict Jin
* @since 2019-04-11
*/
@Ignore
public class TSDBTest {
@Test
public void testVersion() {
String version = TSDBUtils.version(Const.TSDB_ADDRESS);
Assert.assertNotNull(version);
System.out.println(version);
version = TSDBUtils.version(Const.OPENTSDB_ADDRESS);
Assert.assertNotNull(version);
System.out.println(version);
}
}
package com.alibaba.datax.plugin.reader.util;
import org.junit.Assert;
import org.junit.Test;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:com.alibaba.datax.common.util
*
* @author Benedict Jin
* @since 2019-04-22
*/
public class TimeUtilsTest {
@Test
public void testIsSecond() {
Assert.assertFalse(TimeUtils.isSecond(System.currentTimeMillis()));
Assert.assertTrue(TimeUtils.isSecond(System.currentTimeMillis() / 1000));
}
@Test
public void testGetTimeInHour() throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = sdf.parse("2019-04-18 15:32:33");
long timeInHour = TimeUtils.getTimeInHour(date.getTime());
Assert.assertEquals("2019-04-18 15:00:00", sdf.format(timeInHour));
}
}
...@@ -159,6 +159,13 @@ ...@@ -159,6 +159,13 @@
</includes> </includes>
<outputDirectory>datax</outputDirectory> <outputDirectory>datax</outputDirectory>
</fileSet> </fileSet>
<fileSet>
<directory>opentsdbreader/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<!-- writer --> <!-- writer -->
<fileSet> <fileSet>
...@@ -308,5 +315,26 @@ ...@@ -308,5 +315,26 @@
</includes> </includes>
<outputDirectory>datax</outputDirectory> <outputDirectory>datax</outputDirectory>
</fileSet> </fileSet>
<fileSet>
<directory>hbase20xsqlreader/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>hbase20xsqlwriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>tsdbwriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
</fileSets> </fileSets>
</assembly> </assembly>
package com.alibaba.datax.plugin.rdbms.util;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
import java.util.StringTokenizer;
// TODO delete it
public class SqlFormatUtil {
private static final Set<String> BEGIN_CLAUSES = new HashSet<String>();
private static final Set<String> END_CLAUSES = new HashSet<String>();
private static final Set<String> LOGICAL = new HashSet<String>();
private static final Set<String> QUANTIFIERS = new HashSet<String>();
private static final Set<String> DML = new HashSet<String>();
private static final Set<String> MISC = new HashSet<String>();
private static final String WHITESPACE = " \n\r\f\t";
static {
BEGIN_CLAUSES.add("left");
BEGIN_CLAUSES.add("right");
BEGIN_CLAUSES.add("inner");
BEGIN_CLAUSES.add("outer");
BEGIN_CLAUSES.add("group");
BEGIN_CLAUSES.add("order");
END_CLAUSES.add("where");
END_CLAUSES.add("set");
END_CLAUSES.add("having");
END_CLAUSES.add("join");
END_CLAUSES.add("from");
END_CLAUSES.add("by");
END_CLAUSES.add("join");
END_CLAUSES.add("into");
END_CLAUSES.add("union");
LOGICAL.add("and");
LOGICAL.add("or");
LOGICAL.add("when");
LOGICAL.add("else");
LOGICAL.add("end");
QUANTIFIERS.add("in");
QUANTIFIERS.add("all");
QUANTIFIERS.add("exists");
QUANTIFIERS.add("some");
QUANTIFIERS.add("any");
DML.add("insert");
DML.add("update");
DML.add("delete");
MISC.add("select");
MISC.add("on");
}
static final String indentString = " ";
static final String initial = "\n ";
public static String format(String source) {
return new FormatProcess(source).perform();
}
private static class FormatProcess {
boolean beginLine = true;
boolean afterBeginBeforeEnd = false;
boolean afterByOrSetOrFromOrSelect = false;
boolean afterValues = false;
boolean afterOn = false;
boolean afterBetween = false;
boolean afterInsert = false;
int inFunction = 0;
int parensSinceSelect = 0;
private LinkedList<Integer> parenCounts = new LinkedList<Integer>();
private LinkedList<Boolean> afterByOrFromOrSelects = new LinkedList<Boolean>();
int indent = 1;
StringBuilder result = new StringBuilder();
StringTokenizer tokens;
String lastToken;
String token;
String lcToken;
public FormatProcess(String sql) {
tokens = new StringTokenizer(sql, "()+*/-=<>'`\"[]," + WHITESPACE,
true);
}
public String perform() {
result.append(initial);
while (tokens.hasMoreTokens()) {
token = tokens.nextToken();
lcToken = token.toLowerCase();
if ("'".equals(token)) {
String t;
do {
t = tokens.nextToken();
token += t;
} while (!"'".equals(t) && tokens.hasMoreTokens()); // cannot
// handle
// single
// quotes
} else if ("\"".equals(token)) {
String t;
do {
t = tokens.nextToken();
token += t;
} while (!"\"".equals(t));
}
if (afterByOrSetOrFromOrSelect && ",".equals(token)) {
commaAfterByOrFromOrSelect();
} else if (afterOn && ",".equals(token)) {
commaAfterOn();
}
else if ("(".equals(token)) {
openParen();
} else if (")".equals(token)) {
closeParen();
}
else if (BEGIN_CLAUSES.contains(lcToken)) {
beginNewClause();
}
else if (END_CLAUSES.contains(lcToken)) {
endNewClause();
}
else if ("select".equals(lcToken)) {
select();
}
else if (DML.contains(lcToken)) {
updateOrInsertOrDelete();
}
else if ("values".equals(lcToken)) {
values();
}
else if ("on".equals(lcToken)) {
on();
}
else if (afterBetween && lcToken.equals("and")) {
misc();
afterBetween = false;
}
else if (LOGICAL.contains(lcToken)) {
logical();
}
else if (isWhitespace(token)) {
white();
}
else {
misc();
}
if (!isWhitespace(token)) {
lastToken = lcToken;
}
}
return result.toString();
}
private void commaAfterOn() {
out();
indent--;
newline();
afterOn = false;
afterByOrSetOrFromOrSelect = true;
}
private void commaAfterByOrFromOrSelect() {
out();
newline();
}
private void logical() {
if ("end".equals(lcToken)) {
indent--;
}
newline();
out();
beginLine = false;
}
private void on() {
indent++;
afterOn = true;
newline();
out();
beginLine = false;
}
private void misc() {
out();
if ("between".equals(lcToken)) {
afterBetween = true;
}
if (afterInsert) {
newline();
afterInsert = false;
} else {
beginLine = false;
if ("case".equals(lcToken)) {
indent++;
}
}
}
private void white() {
if (!beginLine) {
result.append(" ");
}
}
private void updateOrInsertOrDelete() {
out();
indent++;
beginLine = false;
if ("update".equals(lcToken)) {
newline();
}
if ("insert".equals(lcToken)) {
afterInsert = true;
}
}
private void select() {
out();
indent++;
newline();
parenCounts.addLast(Integer.valueOf(parensSinceSelect));
afterByOrFromOrSelects.addLast(Boolean
.valueOf(afterByOrSetOrFromOrSelect));
parensSinceSelect = 0;
afterByOrSetOrFromOrSelect = true;
}
private void out() {
result.append(token);
}
private void endNewClause() {
if (!afterBeginBeforeEnd) {
indent--;
if (afterOn) {
indent--;
afterOn = false;
}
newline();
}
out();
if (!"union".equals(lcToken)) {
indent++;
}
newline();
afterBeginBeforeEnd = false;
afterByOrSetOrFromOrSelect = "by".equals(lcToken)
|| "set".equals(lcToken) || "from".equals(lcToken);
}
private void beginNewClause() {
if (!afterBeginBeforeEnd) {
if (afterOn) {
indent--;
afterOn = false;
}
indent--;
newline();
}
out();
beginLine = false;
afterBeginBeforeEnd = true;
}
private void values() {
indent--;
newline();
out();
indent++;
newline();
afterValues = true;
}
private void closeParen() {
parensSinceSelect--;
if (parensSinceSelect < 0) {
indent--;
parensSinceSelect = parenCounts.removeLast().intValue();
afterByOrSetOrFromOrSelect = afterByOrFromOrSelects
.removeLast().booleanValue();
}
if (inFunction > 0) {
inFunction--;
out();
} else {
if (!afterByOrSetOrFromOrSelect) {
indent--;
newline();
}
out();
}
beginLine = false;
}
private void openParen() {
if (isFunctionName(lastToken) || inFunction > 0) {
inFunction++;
}
beginLine = false;
if (inFunction > 0) {
out();
} else {
out();
if (!afterByOrSetOrFromOrSelect) {
indent++;
newline();
beginLine = true;
}
}
parensSinceSelect++;
}
private static boolean isFunctionName(String tok) {
final char begin = tok.charAt(0);
final boolean isIdentifier = Character.isJavaIdentifierStart(begin)
|| '"' == begin;
return isIdentifier && !LOGICAL.contains(tok)
&& !END_CLAUSES.contains(tok) && !QUANTIFIERS.contains(tok)
&& !DML.contains(tok) && !MISC.contains(tok);
}
private static boolean isWhitespace(String token) {
return WHITESPACE.indexOf(token) >= 0;
}
private void newline() {
result.append("\n");
for (int i = 0; i < indent; i++) {
result.append(indentString);
}
beginLine = true;
}
}
}
...@@ -62,6 +62,7 @@ ...@@ -62,6 +62,7 @@
<module>rdbmsreader</module> <module>rdbmsreader</module>
<module>hbase11xreader</module> <module>hbase11xreader</module>
<module>hbase094xreader</module> <module>hbase094xreader</module>
<module>opentsdbreader</module>
<!-- writer --> <!-- writer -->
<module>mysqlwriter</module> <module>mysqlwriter</module>
...@@ -85,10 +86,13 @@ ...@@ -85,10 +86,13 @@
<module>hbase11xsqlwriter</module> <module>hbase11xsqlwriter</module>
<module>hbase11xsqlreader</module> <module>hbase11xsqlreader</module>
<module>elasticsearchwriter</module> <module>elasticsearchwriter</module>
<module>tsdbwriter</module>
<!-- common support module --> <!-- common support module -->
<module>plugin-rdbms-util</module> <module>plugin-rdbms-util</module>
<module>plugin-unstructured-storage-util</module> <module>plugin-unstructured-storage-util</module>
<module>hbase20xsqlreader</module>
<module>hbase20xsqlwriter</module>
</modules> </modules>
<dependencyManagement> <dependencyManagement>
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
{
"name": "tsdbwriter",
"class": "com.alibaba.datax.plugin.writer.tsdbwriter.TSDBWriter",
"description": {
"useScene": "往 TSDB 中摄入数据点",
"mechanism": "调用 TSDB 的 /api/put 接口,实现数据点的写入",
"warn": ""
},
"developer": "Benedict Jin"
}
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment