Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
D
DataX
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
risk-feature
DataX
Commits
b901b62d
Commit
b901b62d
authored
Nov 07, 2018
by
muyuan
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add hbase11xsqlreader
parent
94a94f2a
Changes
21
Show whitespace changes
Inline
Side-by-side
Showing
21 changed files
with
1326 additions
and
0 deletions
+1326
-0
README.md
README.md
+1
-0
hbase11xsqlreader.md
hbase11xsqlreader/doc/hbase11xsqlreader.md
+252
-0
pom.xml
hbase11xsqlreader/pom.xml
+109
-0
package.xml
hbase11xsqlreader/src/main/assembly/package.xml
+35
-0
HadoopSerializationUtil.java
...gin/reader/hbase11xsqlreader/HadoopSerializationUtil.java
+31
-0
HbaseSQLHelper.java
...datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java
+125
-0
HbaseSQLReader.java
...datax/plugin/reader/hbase11xsqlreader/HbaseSQLReader.java
+89
-0
HbaseSQLReaderConfig.java
...plugin/reader/hbase11xsqlreader/HbaseSQLReaderConfig.java
+164
-0
HbaseSQLReaderErrorCode.java
...gin/reader/hbase11xsqlreader/HbaseSQLReaderErrorCode.java
+40
-0
HbaseSQLReaderTask.java
...x/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTask.java
+178
-0
Key.java
...om/alibaba/datax/plugin/reader/hbase11xsqlreader/Key.java
+28
-0
LocalStrings.properties
...x/plugin/reader/hbase11xsqlreader/LocalStrings.properties
+32
-0
LocalStrings_en_US.properties
...in/reader/hbase11xsqlreader/LocalStrings_en_US.properties
+32
-0
LocalStrings_ja_JP.properties
...in/reader/hbase11xsqlreader/LocalStrings_ja_JP.properties
+32
-0
LocalStrings_zh_CN.properties
...in/reader/hbase11xsqlreader/LocalStrings_zh_CN.properties
+32
-0
plugin.json
hbase11xsqlreader/src/main/resources/plugin.json
+7
-0
plugin_job_template.json
...e11xsqlreader/src/main/resources/plugin_job_template.json
+13
-0
HbaseSQLHelperTest.java
...x/plugin/reader/hbase11xsqlreader/HbaseSQLHelperTest.java
+40
-0
HbaseSQLReaderTaskTest.java
...ugin/reader/hbase11xsqlreader/HbaseSQLReaderTaskTest.java
+78
-0
package.xml
package.xml
+7
-0
pom.xml
pom.xml
+1
-0
No files found.
README.md
View file @
b901b62d
...
@@ -47,6 +47,7 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N
...
@@ -47,6 +47,7 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N
| NoSQL数据存储 | OTS | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/otsreader/doc/otsreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/otswriter/doc/otswriter.md
)
|
| NoSQL数据存储 | OTS | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/otsreader/doc/otsreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/otswriter/doc/otswriter.md
)
|
| | Hbase0.94 | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/hbase094xreader/doc/hbase094xreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/hbase094xwriter/doc/hbase094xwriter.md
)
|
| | Hbase0.94 | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/hbase094xreader/doc/hbase094xreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/hbase094xwriter/doc/hbase094xwriter.md
)
|
| | Hbase1.1 | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/hbase11xreader/doc/hbase11xreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/hbase11xwriter/doc/hbase11xwriter.md
)
|
| | Hbase1.1 | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/hbase11xreader/doc/hbase11xreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/hbase11xwriter/doc/hbase11xwriter.md
)
|
| | Phoenix4.x | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/hbase11xsqlreader/doc/hbase11xsqlreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/hbase11xsqlwriter/doc/hbase11xsqlwriter.md
)
|
| | MongoDB | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/mongoreader/doc/mongoreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/mongowriter/doc/mongowriter.md
)
|
| | MongoDB | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/mongoreader/doc/mongoreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/mongowriter/doc/mongowriter.md
)
|
| | Hive | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md
)
|
| | Hive | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md
)
|
| 无结构化数据存储 | TxtFile | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md
)
|
| 无结构化数据存储 | TxtFile | √ | √ |
[
读
](
https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md
)
、
[
写
](
https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md
)
|
...
...
hbase11xsqlreader/doc/hbase11xsqlreader.md
0 → 100644
View file @
b901b62d
# hbase11xsqlreader 插件文档
__
_
## 1 快速介绍
hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实现上,hbase11xsqlreader通过Phoenix客户端去连接远程的HBase集群,并执行相应的sql语句将数据从Phoenix库中SELECT出来。
## 2 实现原理
简而言之,hbase11xsqlreader通过Phoenix客户端去连接远程的HBase集群,并根据用户配置的信息生成查询SELECT 语句,然后发送到HBase集群,并将返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。
# hbase11xsqlreader 插件文档
__
_
## 1 快速介绍
hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实现上,hbase11xsqlreader通过Phoenix客户端去连接远程的HBase集群,并执行相应的sql语句将数据从Phoenix库中SELECT出来。
## 2 实现原理
简而言之,hbase11xsqlreader通过Phoenix客户端去连接远程的HBase集群,并根据用户配置的信息生成查询SELECT 语句,然后发送到HBase集群,并将返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。
## 3 功能说明
### 3.1 配置样例
*
配置一个从Phoenix同步抽取数据到本地的作业:
```
{
"job": {
"setting": {
"speed": {
//设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它.
"byte":10485760
},
//出错限制
"errorLimit": {
//出错的record条数上限,当大于该值即报错。
"record": 0,
//出错的record百分比上限 1.0表示100%,0.02表示2%
"percentage": 0.02
}
},
"content": [ {
"reader": {
//指定插件为hbase11xsqlreader
"name": "hbase11xsqlreader",
"parameter": {
//填写连接Phoenix的hbase集群zk地址
"hbaseConfig": {
"hbase.zookeeper.quorum": "hb-proxy-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-xxx-003.hbase.rds.aliyuncs.com"
},
//填写要读取的phoenix的表名
"table": "US_POPULATION",
//填写要读取的列名,不填读取所有列
"column": [
]
}
},
"writer": {
//writer类型
"name": "streamwriter",
//是否打印内容
"parameter": {
"print":true,
"encoding": "UTF-8"
}
}
}
]
}
}
```
### 3.2 参数说明
*
**hbaseConfig**
* 描述:hbase11xsqlreader需要通过Phoenix客户端去连接hbase集群,因此这里需要填写对应hbase集群的zkurl地址,注意不要添加2181。
*
必选:是
<br
/>
*
默认值:无
<br
/>
*
**table**
*
描述:编写Phoenix中的表名,如果有namespace,该值设置为'namespace.tablename'
*
必选:是
<br
/>
*
默认值:无
<br
/>
*
**column**
* 描述:填写需要从phoenix表中读取的列名集合,使用JSON的数组描述字段信息,空值表示读取所有列。
*
必选:是
<br
/>
*
默认值:无
<br
/>
### 3.3 类型转换
目前hbase11xsqlreader支持大部分Phoenix类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。
下面列出MysqlReader针对Mysql类型转换列表:
| DataX 内部类型| Phoenix 数据类型 |
| -------- | ----- |
| String |CHAR, VARCHAR|
| Bytes |BINARY, VARBINARY|
| Bool |BOOLEAN |
| Long |INTEGER, TINYINT, SMALLINT, BIGINT |
| Double |FLOAT, DECIMAL, DOUBLE, |
| Date |DATE, TIME, TIMESTAMP |
## 4 性能报告
略
## 5 约束限制
略
## 6 FAQ
***
## 3 功能说明
### 3.1 配置样例
*
配置一个从Phoenix同步抽取数据到本地的作业:
```
{
"job": {
"setting": {
"speed": {
//设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它.
"byte":10485760
},
//出错限制
"errorLimit": {
//出错的record条数上限,当大于该值即报错。
"record": 0,
//出错的record百分比上限 1.0表示100%,0.02表示2%
"percentage": 0.02
}
},
"content": [ {
"reader": {
//指定插件为hbase11xsqlreader
"name": "hbase11xsqlreader",
"parameter": {
//填写连接Phoenix的hbase集群zk地址
"hbaseConfig": {
"hbase.zookeeper.quorum": "hb-proxy-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-xxx-003.hbase.rds.aliyuncs.com"
},
//填写要读取的phoenix的表名
"table": "US_POPULATION",
//填写要读取的列名,不填读取所有列
"column": [
]
}
},
"writer": {
//writer类型
"name": "streamwriter",
//是否打印内容
"parameter": {
"print":true,
"encoding": "UTF-8"
}
}
}
]
}
}
```
### 3.2 参数说明
*
**hbaseConfig**
* 描述:hbase11xsqlreader需要通过Phoenix客户端去连接hbase集群,因此这里需要填写对应hbase集群的zkurl地址,注意不要添加2181。
*
必选:是
<br
/>
*
默认值:无
<br
/>
*
**table**
*
描述:编写Phoenix中的表名,如果有namespace,该值设置为'namespace.tablename'
*
必选:是
<br
/>
*
默认值:无
<br
/>
*
**column**
* 描述:填写需要从phoenix表中读取的列名集合,使用JSON的数组描述字段信息,空值表示读取所有列。
*
必选:是
<br
/>
*
默认值:无
<br
/>
### 3.3 类型转换
目前hbase11xsqlreader支持大部分Phoenix类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。
下面列出MysqlReader针对Mysql类型转换列表:
| DataX 内部类型| Phoenix 数据类型 |
| -------- | ----- |
| String |CHAR, VARCHAR|
| Bytes |BINARY, VARBINARY|
| Bool |BOOLEAN |
| Long |INTEGER, TINYINT, SMALLINT, BIGINT |
| Double |FLOAT, DECIMAL, DOUBLE, |
| Date |DATE, TIME, TIMESTAMP |
## 4 性能报告
略
## 5 约束限制
略
## 6 FAQ
***
hbase11xsqlreader/pom.xml
0 → 100644
View file @
b901b62d
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<parent>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-all
</artifactId>
<version>
0.0.1-SNAPSHOT
</version>
</parent>
<artifactId>
hbase11xsqlreader
</artifactId>
<name>
hbase11xsqlreader
</name>
<version>
0.0.1-SNAPSHOT
</version>
<packaging>
jar
</packaging>
<properties>
<phoenix.version>
4.12.0-AliHBase-1.1-0.5
</phoenix.version>
</properties>
<dependencies>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-common
</artifactId>
<version>
${datax-project-version}
</version>
<exclusions>
<exclusion>
<artifactId>
slf4j-log4j12
</artifactId>
<groupId>
org.slf4j
</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
com.aliyun.phoenix
</groupId>
<artifactId>
ali-phoenix-core
</artifactId>
<version>
${phoenix.version}
</version>
<exclusions>
<exclusion>
<artifactId>
servlet-api
</artifactId>
<groupId>
javax.servlet
</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
junit
</groupId>
<artifactId>
junit
</artifactId>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.mockito
</groupId>
<artifactId>
mockito-core
</artifactId>
<version>
2.0.44-beta
</version>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-core
</artifactId>
<version>
${datax-project-version}
</version>
<exclusions>
<exclusion>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-service-face
</artifactId>
</exclusion>
</exclusions>
<scope>
test
</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>
src/main/java
</directory>
<includes>
<include>
**/*.properties
</include>
</includes>
</resource>
</resources>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>
maven-compiler-plugin
</artifactId>
<configuration>
<source>
1.6
</source>
<target>
1.6
</target>
<encoding>
${project-sourceEncoding}
</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>
maven-assembly-plugin
</artifactId>
<configuration>
<descriptors>
<descriptor>
src/main/assembly/package.xml
</descriptor>
</descriptors>
<finalName>
datax
</finalName>
</configuration>
<executions>
<execution>
<id>
dwzip
</id>
<phase>
package
</phase>
<goals>
<goal>
single
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
hbase11xsqlreader/src/main/assembly/package.xml
0 → 100644
View file @
b901b62d
<assembly
xmlns=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"
>
<id></id>
<formats>
<format>
dir
</format>
</formats>
<includeBaseDirectory>
false
</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>
src/main/resources
</directory>
<includes>
<include>
plugin.json
</include>
<include>
plugin_job_template.json
</include>
</includes>
<outputDirectory>
plugin/reader/hbase11xsqlreader
</outputDirectory>
</fileSet>
<fileSet>
<directory>
target/
</directory>
<includes>
<include>
hbase11xsqlreader-0.0.1-SNAPSHOT.jar
</include>
</includes>
<outputDirectory>
plugin/reader/hbase11xsqlreader
</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>
false
</useProjectArtifact>
<outputDirectory>
plugin/reader/hbase11xsqlreader/libs
</outputDirectory>
<scope>
runtime
</scope>
</dependencySet>
</dependencySets>
</assembly>
hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HadoopSerializationUtil.java
0 → 100644
View file @
b901b62d
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
hbase11xsqlreader
;
import
java.io.ByteArrayInputStream
;
import
java.io.ByteArrayOutputStream
;
import
java.io.DataInputStream
;
import
java.io.DataOutputStream
;
import
java.io.IOException
;
import
org.apache.hadoop.io.Writable
;
public
class
HadoopSerializationUtil
{
public
static
byte
[]
serialize
(
Writable
writable
)
throws
IOException
{
ByteArrayOutputStream
out
=
new
ByteArrayOutputStream
();
DataOutputStream
dataout
=
new
DataOutputStream
(
out
);
writable
.
write
(
dataout
);
dataout
.
close
();
return
out
.
toByteArray
();
}
public
static
void
deserialize
(
Writable
writable
,
byte
[]
bytes
)
throws
Exception
{
ByteArrayInputStream
in
=
new
ByteArrayInputStream
(
bytes
);
DataInputStream
datain
=
new
DataInputStream
(
in
);
writable
.
readFields
(
datain
);
datain
.
close
();
}
}
\ No newline at end of file
hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java
0 → 100644
View file @
b901b62d
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
hbase11xsqlreader
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.TypeReference
;
import
org.apache.hadoop.hbase.HConstants
;
import
org.apache.hadoop.hbase.util.Pair
;
import
org.apache.hadoop.mapreduce.InputSplit
;
import
org.apache.hadoop.mapreduce.JobID
;
import
org.apache.hadoop.mapreduce.task.JobContextImpl
;
import
org.apache.phoenix.jdbc.PhoenixConnection
;
import
org.apache.phoenix.jdbc.PhoenixEmbeddedDriver
;
import
org.apache.phoenix.mapreduce.PhoenixInputFormat
;
import
org.apache.phoenix.mapreduce.PhoenixInputSplit
;
import
org.apache.phoenix.mapreduce.PhoenixRecordWritable
;
import
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
;
import
org.apache.phoenix.schema.MetaDataClient
;
import
org.apache.phoenix.schema.PColumn
;
import
org.apache.phoenix.schema.PTable
;
import
org.apache.phoenix.schema.SaltingUtil
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.io.IOException
;
import
java.sql.Connection
;
import
java.sql.DriverManager
;
import
java.sql.SQLException
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
public
class
HbaseSQLHelper
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
HbaseSQLHelper
.
class
);
public
static
org
.
apache
.
hadoop
.
conf
.
Configuration
generatePhoenixConf
(
HbaseSQLReaderConfig
readerConfig
)
{
org
.
apache
.
hadoop
.
conf
.
Configuration
conf
=
new
org
.
apache
.
hadoop
.
conf
.
Configuration
();
String
table
=
readerConfig
.
getTableName
();
List
<
String
>
columns
=
readerConfig
.
getColumns
();
String
zkUrl
=
readerConfig
.
getZkUrl
();
PhoenixConfigurationUtil
.
setInputClass
(
conf
,
PhoenixRecordWritable
.
class
);
PhoenixConfigurationUtil
.
setInputTableName
(
conf
,
table
);
if
(!
columns
.
isEmpty
())
{
PhoenixConfigurationUtil
.
setSelectColumnNames
(
conf
,
columns
.
toArray
(
new
String
[
columns
.
size
()]));
}
PhoenixEmbeddedDriver
.
ConnectionInfo
info
=
null
;
try
{
info
=
PhoenixEmbeddedDriver
.
ConnectionInfo
.
create
(
zkUrl
);
}
catch
(
SQLException
e
)
{
throw
DataXException
.
asDataXException
(
HbaseSQLReaderErrorCode
.
GET_PHOENIX_CONNECTIONINFO_ERROR
,
"通过zkURL获取phoenix的connectioninfo出错,请检查hbase集群服务是否正常"
,
e
);
}
conf
.
set
(
HConstants
.
ZOOKEEPER_QUORUM
,
info
.
getZookeeperQuorum
());
if
(
info
.
getPort
()
!=
null
)
conf
.
setInt
(
HConstants
.
ZOOKEEPER_CLIENT_PORT
,
info
.
getPort
());
if
(
info
.
getRootNode
()
!=
null
)
conf
.
set
(
HConstants
.
ZOOKEEPER_ZNODE_PARENT
,
info
.
getRootNode
());
return
conf
;
}
public
static
List
<
String
>
getPColumnNames
(
String
connectionString
,
String
tableName
)
throws
SQLException
{
Connection
con
=
DriverManager
.
getConnection
(
connectionString
);
PhoenixConnection
phoenixConnection
=
con
.
unwrap
(
PhoenixConnection
.
class
);
MetaDataClient
metaDataClient
=
new
MetaDataClient
(
phoenixConnection
);
PTable
table
=
metaDataClient
.
updateCache
(
""
,
tableName
).
getTable
();
List
<
String
>
columnNames
=
new
ArrayList
<
String
>();
for
(
PColumn
pColumn
:
table
.
getColumns
())
{
if
(!
pColumn
.
getName
().
getString
().
equals
(
SaltingUtil
.
SALTING_COLUMN_NAME
))
columnNames
.
add
(
pColumn
.
getName
().
getString
());
else
LOG
.
info
(
tableName
+
" is salt table"
);
}
return
columnNames
;
}
public
static
List
<
Configuration
>
split
(
HbaseSQLReaderConfig
readerConfig
)
{
PhoenixInputFormat
inputFormat
=
new
PhoenixInputFormat
<
PhoenixRecordWritable
>();
org
.
apache
.
hadoop
.
conf
.
Configuration
conf
=
generatePhoenixConf
(
readerConfig
);
JobID
jobId
=
new
JobID
(
Key
.
MOCK_JOBID_IDENTIFIER
,
Key
.
MOCK_JOBID
);
JobContextImpl
jobContext
=
new
JobContextImpl
(
conf
,
jobId
);
List
<
Configuration
>
resultConfigurations
=
new
ArrayList
<
Configuration
>();
List
<
InputSplit
>
rawSplits
=
null
;
try
{
rawSplits
=
inputFormat
.
getSplits
(
jobContext
);
LOG
.
info
(
"split size is "
+
rawSplits
.
size
());
for
(
InputSplit
split
:
rawSplits
)
{
Configuration
cfg
=
readerConfig
.
getOriginalConfig
().
clone
();
byte
[]
splitSer
=
HadoopSerializationUtil
.
serialize
((
PhoenixInputSplit
)
split
);
String
splitBase64Str
=
org
.
apache
.
commons
.
codec
.
binary
.
Base64
.
encodeBase64String
(
splitSer
);
cfg
.
set
(
Key
.
SPLIT_KEY
,
splitBase64Str
);
resultConfigurations
.
add
(
cfg
);
}
}
catch
(
IOException
e
)
{
throw
DataXException
.
asDataXException
(
HbaseSQLReaderErrorCode
.
GET_PHOENIX_SPLITS_ERROR
,
"获取表的split信息时出现了异常,请检查hbase集群服务是否正常,"
+
e
.
getMessage
(),
e
);
}
catch
(
InterruptedException
e
)
{
throw
DataXException
.
asDataXException
(
HbaseSQLReaderErrorCode
.
GET_PHOENIX_SPLITS_ERROR
,
"获取表的split信息时被中断,请重试,若还有问题请联系datax管理员,"
+
e
.
getMessage
(),
e
);
}
return
resultConfigurations
;
}
public
static
HbaseSQLReaderConfig
parseConfig
(
Configuration
cfg
)
{
return
HbaseSQLReaderConfig
.
parse
(
cfg
);
}
public
static
Pair
<
String
,
String
>
getHbaseConfig
(
String
hbaseCfgString
)
{
assert
hbaseCfgString
!=
null
;
Map
<
String
,
String
>
hbaseConfigMap
=
JSON
.
parseObject
(
hbaseCfgString
,
new
TypeReference
<
Map
<
String
,
String
>>()
{
});
String
zkQuorum
=
hbaseConfigMap
.
get
(
Key
.
HBASE_ZK_QUORUM
);
String
znode
=
hbaseConfigMap
.
get
(
Key
.
HBASE_ZNODE_PARENT
);
if
(
znode
==
null
)
znode
=
""
;
return
new
Pair
<
String
,
String
>(
zkQuorum
,
znode
);
}
}
hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReader.java
0 → 100644
View file @
b901b62d
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
hbase11xsqlreader
;
import
com.alibaba.datax.common.element.*
;
import
com.alibaba.datax.common.plugin.RecordSender
;
import
com.alibaba.datax.common.spi.Reader
;
import
com.alibaba.datax.common.util.Configuration
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.List
;
public
class
HbaseSQLReader
extends
Reader
{
public
static
class
Job
extends
Reader
.
Job
{
private
HbaseSQLReaderConfig
readerConfig
;
@Override
public
void
init
()
{
readerConfig
=
HbaseSQLHelper
.
parseConfig
(
this
.
getPluginJobConf
());
}
@Override
public
List
<
Configuration
>
split
(
int
adviceNumber
)
{
return
HbaseSQLHelper
.
split
(
readerConfig
);
}
@Override
public
void
destroy
()
{
}
}
public
static
class
Task
extends
Reader
.
Task
{
private
static
Logger
LOG
=
LoggerFactory
.
getLogger
(
Task
.
class
);
private
HbaseSQLReaderTask
hbase11SQLReaderTask
;
@Override
public
void
init
()
{
hbase11SQLReaderTask
=
new
HbaseSQLReaderTask
(
this
.
getPluginJobConf
());
this
.
hbase11SQLReaderTask
.
init
();
}
@Override
public
void
prepare
()
{
hbase11SQLReaderTask
.
prepare
();
}
@Override
public
void
startRead
(
RecordSender
recordSender
)
{
Long
recordNum
=
0L
;
Record
record
=
recordSender
.
createRecord
();
boolean
fetchOK
;
while
(
true
)
{
try
{
fetchOK
=
this
.
hbase11SQLReaderTask
.
readRecord
(
record
);
}
catch
(
Exception
e
)
{
LOG
.
info
(
"Read record exception"
,
e
);
e
.
printStackTrace
();
super
.
getTaskPluginCollector
().
collectDirtyRecord
(
record
,
e
);
record
=
recordSender
.
createRecord
();
continue
;
}
if
(
fetchOK
)
{
recordSender
.
sendToWriter
(
record
);
recordNum
++;
if
(
recordNum
%
10000
==
0
)
LOG
.
info
(
"already read record num is "
+
recordNum
);
record
=
recordSender
.
createRecord
();
}
else
{
break
;
}
}
recordSender
.
flush
();
}
@Override
public
void
post
()
{
super
.
post
();
}
@Override
public
void
destroy
()
{
this
.
hbase11SQLReaderTask
.
destroy
();
}
}
}
hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderConfig.java
0 → 100644
View file @
b901b62d
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
hbase11xsqlreader
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.util.Configuration
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.hadoop.hbase.util.Pair
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.sql.SQLException
;
import
java.util.List
;
public
class
HbaseSQLReaderConfig
{
private
final
static
Logger
LOG
=
LoggerFactory
.
getLogger
(
HbaseSQLReaderConfig
.
class
);
private
Configuration
originalConfig
;
// 原始的配置数据
// 集群配置
private
String
connectionString
;
public
String
getZkUrl
()
{
return
zkUrl
;
}
private
String
zkUrl
;
// 表配置
private
String
tableName
;
private
List
<
String
>
columns
;
// 目的表的所有列的列名,包括主键和非主键,不包括时间列
/**
* @return 获取原始的datax配置
*/
public
Configuration
getOriginalConfig
()
{
return
originalConfig
;
}
/**
* @return 获取连接字符串,使用ZK模式
*/
public
String
getConnectionString
()
{
return
connectionString
;
}
/**
* @return 获取表名
*/
public
String
getTableName
()
{
return
tableName
;
}
/**
* @return 返回所有的列,包括主键列和非主键列,但不包括version列
*/
public
List
<
String
>
getColumns
()
{
return
columns
;
}
/**
* @param dataxCfg
* @return
*/
public
static
HbaseSQLReaderConfig
parse
(
Configuration
dataxCfg
)
{
assert
dataxCfg
!=
null
;
HbaseSQLReaderConfig
cfg
=
new
HbaseSQLReaderConfig
();
cfg
.
originalConfig
=
dataxCfg
;
// 1. 解析集群配置
parseClusterConfig
(
cfg
,
dataxCfg
);
// 2. 解析列配置
parseTableConfig
(
cfg
,
dataxCfg
);
// 4. 打印解析出来的配置
LOG
.
info
(
"HBase SQL reader config parsed:"
+
cfg
.
toString
());
return
cfg
;
}
private
static
void
parseClusterConfig
(
HbaseSQLReaderConfig
cfg
,
Configuration
dataxCfg
)
{
// 获取hbase集群的连接信息字符串
String
hbaseCfg
=
dataxCfg
.
getString
(
Key
.
HBASE_CONFIG
);
if
(
StringUtils
.
isBlank
(
hbaseCfg
))
{
// 集群配置必须存在且不为空
throw
DataXException
.
asDataXException
(
HbaseSQLReaderErrorCode
.
REQUIRED_VALUE
,
"读 Hbase 时需要配置hbaseConfig,其内容为 Hbase 连接信息,请查看 Hbase 集群信息."
);
}
// 解析zk服务器和znode信息
Pair
<
String
,
String
>
zkCfg
;
try
{
zkCfg
=
HbaseSQLHelper
.
getHbaseConfig
(
hbaseCfg
);
}
catch
(
Throwable
t
)
{
// 解析hbase配置错误
throw
DataXException
.
asDataXException
(
HbaseSQLReaderErrorCode
.
REQUIRED_VALUE
,
"解析hbaseConfig出错,请确认您配置的hbaseConfig为合法的json数据格式,内容正确."
);
}
String
zkQuorum
=
zkCfg
.
getFirst
();
String
znode
=
zkCfg
.
getSecond
();
if
(
zkQuorum
==
null
||
zkQuorum
.
isEmpty
())
{
throw
DataXException
.
asDataXException
(
HbaseSQLReaderErrorCode
.
ILLEGAL_VALUE
,
"HBase的hbase.zookeeper.quorum配置不能为空"
);
}
// 生成sql使用的连接字符串, 格式: jdbc:hbase:zk_quorum:2181:/znode_parent
cfg
.
connectionString
=
"jdbc:phoenix:"
+
zkQuorum
;
cfg
.
zkUrl
=
zkQuorum
+
":2181"
;
if
(!
znode
.
isEmpty
())
{
cfg
.
connectionString
+=
cfg
.
connectionString
+
":"
+
znode
;
cfg
.
zkUrl
+=
cfg
.
zkUrl
+
":"
+
znode
;
}
}
private
static
void
parseTableConfig
(
HbaseSQLReaderConfig
cfg
,
Configuration
dataxCfg
)
{
// 解析并检查表名
cfg
.
tableName
=
dataxCfg
.
getString
(
Key
.
TABLE
);
if
(
cfg
.
tableName
==
null
||
cfg
.
tableName
.
isEmpty
())
{
throw
DataXException
.
asDataXException
(
HbaseSQLReaderErrorCode
.
ILLEGAL_VALUE
,
"HBase的tableName配置不能为空,请检查并修改配置."
);
}
// 解析列配置,列为空时,补全所有的列
cfg
.
columns
=
dataxCfg
.
getList
(
Key
.
COLUMN
,
String
.
class
);
if
(
cfg
.
columns
==
null
)
{
throw
DataXException
.
asDataXException
(
HbaseSQLReaderErrorCode
.
ILLEGAL_VALUE
,
"您配置的tableName含有非法字符{0},请检查您的配置."
);
}
else
if
(
cfg
.
columns
.
isEmpty
())
{
try
{
cfg
.
columns
=
HbaseSQLHelper
.
getPColumnNames
(
cfg
.
connectionString
,
cfg
.
tableName
);
dataxCfg
.
set
(
Key
.
COLUMN
,
cfg
.
columns
);
}
catch
(
SQLException
e
)
{
throw
DataXException
.
asDataXException
(
HbaseSQLReaderErrorCode
.
GET_PHOENIX_COLUMN_ERROR
,
"HBase的columns配置不能为空,请添加目标表的列名配置."
+
e
.
getMessage
(),
e
);
}
}
}
@Override
public
String
toString
()
{
StringBuilder
ret
=
new
StringBuilder
();
// 集群配置
ret
.
append
(
"\n[jdbc]"
);
ret
.
append
(
connectionString
);
ret
.
append
(
"\n"
);
// 表配置
ret
.
append
(
"[tableName]"
);
ret
.
append
(
tableName
);
ret
.
append
(
"\n"
);
ret
.
append
(
"[column]"
);
for
(
String
col
:
columns
)
{
ret
.
append
(
col
);
ret
.
append
(
","
);
}
ret
.
setLength
(
ret
.
length
()
-
1
);
ret
.
append
(
"\n"
);
return
ret
.
toString
();
}
/**
* 禁止直接实例化本类,必须调用{@link #parse}接口来初始化
*/
private
HbaseSQLReaderConfig
()
{
}
}
hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderErrorCode.java
0 → 100644
View file @
b901b62d
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
hbase11xsqlreader
;
import
com.alibaba.datax.common.spi.ErrorCode
;
public
enum
HbaseSQLReaderErrorCode
implements
ErrorCode
{
REQUIRED_VALUE
(
"Hbasewriter-00"
,
"您缺失了必须填写的参数值."
),
ILLEGAL_VALUE
(
"Hbasewriter-01"
,
"您填写的参数值不合法."
),
GET_PHOENIX_COLUMN_ERROR
(
"Hbasewriter-02"
,
"获取phoenix表的列值错误"
),
GET_PHOENIX_CONNECTIONINFO_ERROR
(
"Hbasewriter-03"
,
"获取phoenix服务的zkurl错误"
),
GET_PHOENIX_SPLITS_ERROR
(
"Hbasewriter-04"
,
"获取phoenix的split信息错误"
),
PHOENIX_CREATEREADER_ERROR
(
"Hbasewriter-05"
,
"获取phoenix的reader错误"
),
PHOENIX_READERINIT_ERROR
(
"Hbasewriter-06"
,
"phoenix reader的初始化错误"
),
PHOENIX_COLUMN_TYPE_CONVERT_ERROR
(
"Hbasewriter-07"
,
"phoenix的列类型转换错误"
),
PHOENIX_RECORD_READ_ERROR
(
"Hbasewriter-08"
,
"phoenix record 读取错误"
),
PHOENIX_READER_CLOSE_ERROR
(
"Hbasewriter-09"
,
"phoenix reader 的close错误"
)
;
private
final
String
code
;
private
final
String
description
;
private
HbaseSQLReaderErrorCode
(
String
code
,
String
description
)
{
this
.
code
=
code
;
this
.
description
=
description
;
}
@Override
public
String
getCode
()
{
return
this
.
code
;
}
@Override
public
String
getDescription
()
{
return
this
.
description
;
}
@Override
public
String
toString
()
{
return
String
.
format
(
"Code:[%s], Description:[%s]."
,
this
.
code
,
this
.
description
);
}
}
hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTask.java
0 → 100644
View file @
b901b62d
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
hbase11xsqlreader
;
import
com.alibaba.datax.common.element.*
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.util.Configuration
;
import
org.apache.hadoop.mapreduce.TaskAttemptID
;
import
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
;
import
org.apache.phoenix.jdbc.PhoenixConnection
;
import
org.apache.phoenix.mapreduce.PhoenixInputFormat
;
import
org.apache.phoenix.mapreduce.PhoenixInputSplit
;
import
org.apache.phoenix.mapreduce.PhoenixRecordReader
;
import
org.apache.phoenix.mapreduce.PhoenixRecordWritable
;
import
org.apache.phoenix.schema.MetaDataClient
;
import
org.apache.phoenix.schema.PColumn
;
import
org.apache.phoenix.schema.PTable
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.io.IOException
;
import
java.math.BigDecimal
;
import
java.sql.*
;
import
java.util.HashMap
;
import
java.util.LinkedHashMap
;
import
java.util.List
;
import
java.util.Map
;
/**
* Created by admin on 1/3/18.
*/
public
class
HbaseSQLReaderTask
{
private
static
Logger
LOG
=
LoggerFactory
.
getLogger
(
HbaseSQLReaderTask
.
class
);
private
PhoenixInputFormat
phoenixInputFormat
;
PhoenixInputSplit
phoenixInputSplit
;
private
PhoenixRecordReader
phoenixRecordReader
;
private
Map
<
String
,
PColumn
>
pColumns
;
private
HbaseSQLReaderConfig
readerConfig
;
private
TaskAttemptContextImpl
hadoopAttemptContext
;
public
HbaseSQLReaderTask
(
Configuration
config
)
{
this
.
readerConfig
=
HbaseSQLHelper
.
parseConfig
(
config
);
pColumns
=
new
LinkedHashMap
<
String
,
PColumn
>();
}
private
void
getPColumns
()
throws
SQLException
{
Connection
con
=
DriverManager
.
getConnection
(
this
.
readerConfig
.
getConnectionString
());
PhoenixConnection
phoenixConnection
=
con
.
unwrap
(
PhoenixConnection
.
class
);
MetaDataClient
metaDataClient
=
new
MetaDataClient
(
phoenixConnection
);
PTable
table
=
metaDataClient
.
updateCache
(
""
,
this
.
readerConfig
.
getTableName
()).
getTable
();
List
<
String
>
columnNames
=
this
.
readerConfig
.
getColumns
();
for
(
PColumn
pColumn
:
table
.
getColumns
())
{
if
(
columnNames
.
contains
(
pColumn
.
getName
().
getString
()))
{
pColumns
.
put
(
pColumn
.
getName
().
getString
(),
pColumn
);
}
}
}
public
void
init
()
{
LOG
.
info
(
"reader table info: "
+
this
.
readerConfig
.
toString
());
try
{
this
.
getPColumns
();
}
catch
(
SQLException
e
)
{
throw
DataXException
.
asDataXException
(
HbaseSQLReaderErrorCode
.
GET_PHOENIX_CONNECTIONINFO_ERROR
,
"获取表的列出问题,重试,若还有问题请检查hbase集群状态,"
+
e
.
getMessage
());
}
this
.
phoenixInputFormat
=
new
PhoenixInputFormat
<
PhoenixRecordWritable
>();
String
splitBase64Str
=
this
.
readerConfig
.
getOriginalConfig
().
getString
(
Key
.
SPLIT_KEY
);
byte
[]
splitBytes
=
org
.
apache
.
commons
.
codec
.
binary
.
Base64
.
decodeBase64
(
splitBase64Str
);
TaskAttemptID
attemptId
=
new
TaskAttemptID
();
org
.
apache
.
hadoop
.
conf
.
Configuration
conf
=
HbaseSQLHelper
.
generatePhoenixConf
(
this
.
readerConfig
);
this
.
hadoopAttemptContext
=
new
TaskAttemptContextImpl
(
conf
,
attemptId
);
this
.
phoenixInputSplit
=
new
PhoenixInputSplit
();
try
{
HadoopSerializationUtil
.
deserialize
(
phoenixInputSplit
,
splitBytes
);
this
.
phoenixRecordReader
=
(
PhoenixRecordReader
)
phoenixInputFormat
.
createRecordReader
(
phoenixInputSplit
,
hadoopAttemptContext
);
}
catch
(
Exception
e
)
{
throw
DataXException
.
asDataXException
(
HbaseSQLReaderErrorCode
.
PHOENIX_CREATEREADER_ERROR
,
"创建phoenix的reader出现问题,请重试,若还有问题请检查hbase集群状态,"
+
e
.
getMessage
());
}
}
public
void
prepare
()
{
try
{
this
.
phoenixRecordReader
.
initialize
(
this
.
phoenixInputSplit
,
hadoopAttemptContext
);
}
catch
(
IOException
e
)
{
throw
DataXException
.
asDataXException
(
HbaseSQLReaderErrorCode
.
PHOENIX_READERINIT_ERROR
,
"phoenix的reader初始化出现问题,请重试,若还有问题请检查hbase集群状态"
+
e
.
getMessage
());
}
catch
(
InterruptedException
e
)
{
throw
DataXException
.
asDataXException
(
HbaseSQLReaderErrorCode
.
PHOENIX_READERINIT_ERROR
,
"phoenix的reader初始化被中断,请重试,"
+
e
.
getMessage
());
}
}
private
Column
convertPhoenixValueToDataxColumn
(
int
sqlType
,
Object
value
)
throws
IOException
{
Column
column
;
switch
(
sqlType
)
{
case
Types
.
CHAR
:
case
Types
.
VARCHAR
:
column
=
new
StringColumn
((
String
)
value
);
break
;
case
Types
.
BINARY
:
case
Types
.
VARBINARY
:
column
=
new
BytesColumn
((
byte
[])
value
);
break
;
case
Types
.
BOOLEAN
:
column
=
new
BoolColumn
((
Boolean
)
value
);
break
;
case
Types
.
INTEGER
:
column
=
new
LongColumn
((
Integer
)
value
);
break
;
case
Types
.
TINYINT
:
column
=
new
LongColumn
(((
Byte
)
value
).
longValue
());
break
;
case
Types
.
SMALLINT
:
column
=
new
LongColumn
(((
Short
)
value
).
longValue
());
break
;
case
Types
.
BIGINT
:
column
=
new
LongColumn
((
Long
)
value
);
break
;
case
Types
.
FLOAT
:
column
=
new
DoubleColumn
(((
Float
)
value
).
doubleValue
());
break
;
case
Types
.
DECIMAL
:
column
=
new
DoubleColumn
(((
BigDecimal
)
value
));
break
;
case
Types
.
DOUBLE
:
column
=
new
DoubleColumn
((
Double
)
value
);
break
;
case
Types
.
DATE
:
column
=
new
DateColumn
((
Date
)
value
);
break
;
case
Types
.
TIME
:
column
=
new
DateColumn
((
Time
)
value
);
break
;
case
Types
.
TIMESTAMP
:
column
=
new
DateColumn
((
Timestamp
)
value
);
break
;
default
:
throw
DataXException
.
asDataXException
(
HbaseSQLReaderErrorCode
.
PHOENIX_COLUMN_TYPE_CONVERT_ERROR
,
"遇到不可识别的phoenix类型,"
+
"sqlType :"
+
sqlType
);
}
return
column
;
}
private
void
constructRecordFromPhoenix
(
Record
record
,
Map
<
String
,
Object
>
phoenixRecord
)
throws
IOException
{
for
(
Map
.
Entry
<
String
,
PColumn
>
pColumnItem
:
this
.
pColumns
.
entrySet
())
{
Column
column
=
this
.
convertPhoenixValueToDataxColumn
(
pColumnItem
.
getValue
().
getDataType
().
getSqlType
(),
phoenixRecord
.
get
(
pColumnItem
.
getKey
()));
record
.
addColumn
(
column
);
}
}
public
boolean
readRecord
(
Record
record
)
throws
IOException
,
InterruptedException
{
boolean
hasNext
=
false
;
hasNext
=
this
.
phoenixRecordReader
.
nextKeyValue
();
if
(!
hasNext
)
return
hasNext
;
PhoenixRecordWritable
phoenixRecordWritable
=
(
PhoenixRecordWritable
)
this
.
phoenixRecordReader
.
getCurrentValue
();
Map
<
String
,
Object
>
phoenixRecord
=
phoenixRecordWritable
.
getResultMap
();
this
.
constructRecordFromPhoenix
(
record
,
phoenixRecord
);
return
hasNext
;
}
public
void
destroy
()
{
if
(
this
.
phoenixRecordReader
!=
null
)
{
try
{
this
.
phoenixRecordReader
.
close
();
}
catch
(
IOException
e
)
{
throw
DataXException
.
asDataXException
(
HbaseSQLReaderErrorCode
.
PHOENIX_READER_CLOSE_ERROR
,
"phoenix的reader close失败,请重试,若还有问题请检查hbase集群状态"
+
e
.
getMessage
());
}
}
}
}
hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/Key.java
0 → 100644
View file @
b901b62d
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
hbase11xsqlreader
;
import
org.apache.hadoop.hbase.HConstants
;
public
final
class
Key
{
public
final
static
String
MOCK_JOBID_IDENTIFIER
=
"phoenixreader"
;
public
final
static
int
MOCK_JOBID
=
1
;
public
final
static
String
SPLIT_KEY
=
"phoenixsplit"
;
/**
* 【必选】hbase集群配置,连接一个hbase集群需要的最小配置只有两个:zk和znode
*/
public
final
static
String
HBASE_CONFIG
=
"hbaseConfig"
;
public
final
static
String
HBASE_ZK_QUORUM
=
HConstants
.
ZOOKEEPER_QUORUM
;
public
final
static
String
HBASE_ZNODE_PARENT
=
HConstants
.
ZOOKEEPER_ZNODE_PARENT
;
/**
* 【必选】writer要写入的表的表名
*/
public
final
static
String
TABLE
=
"table"
;
/**
* 【必选】列配置
*/
public
final
static
String
COLUMN
=
"column"
;
}
hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/LocalStrings.properties
0 → 100644
View file @
b901b62d
errorcode.required_value
=
\u
60A8
\u
7F3A
\u5931\u
4E86
\u
5FC5
\u
987B
\u
586B
\u5199\u7684\u
53C2
\u6570\u
503C.
errorcode.illegal_value
=
\u
60A8
\u
586B
\u5199\u7684\u
53C2
\u6570\u
503C
\u
4E0D
\u5408\u
6CD5.
errorcode.get_phoenix_table_columns_error
=
\u
83B7
\u
53D6
\u8868\u7684\u5217\u
51FA
\u9519
.
errorcode.get_phoenix_connectioninfo_error
=
\u
83B7
\u
53D6phoenix
\u7684
connectioninfo
\u
51FA
\u9519
.
errorcode.get_phoenix_splits_error
=
\u
83B7
\u
53D6phoenix
\u7684
split
\u
4FE1
\u
606F
\u
65F6
\u
51FA
\u9519
.
errorcode.get_phoenix_createreader_error
=
\u
521B
\u
5EFAphoenix
\u7684
split
\u7684
reader
\u
65F6
\u
51FA
\u9519
.
errorcode.get_phoenix_readerinit_error
=
phoenix
\u7684
split
\u7684
reader
\u
521D
\u
59CB
\u5316\u
65F6
\u
51FA
\u9519
.
errorcode.get_phoenix_column_typeconvert_error
=
\u
5C06phoenix
\u5217\u7684\u
7C7B
\u
578B
\u
8F6C
\u6362\u
4E3Adatax
\u7684\u
7C7B
\u
578B
\u
65F6
\u
51FA
\u9519
.
errorcode.get_phoenix_record_read_error
=
\u
8BFB
\u
53D6phoenix
\u5177\u
4F53
\u7684\u
4E00
\u
884C
\u
65F6
\u
51FA
\u9519
.
errorcode.get_phoenix_reader_close_error
=
\u5173\u
95EDphoenix
\u3000
reader
\u
65F6
\u
51FA
\u9519
.
sqlhelper.1
=
\u
901A
\u
8FC7zkURL
\u
83B7
\u
53D6phoenix
\u7684
connectioninfo
\u
51FA
\u9519\u
FF0C
\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
670D
\u
52A1
\u
662F
\u5426\u
6B63
\u
5E38
sqlhelper.2
=
\u
83B7
\u
53D6
\u8868\u7684
split
\u
4FE1
\u
606F
\u
65F6
\u
51FA
\u
73B0
\u
4E86
\u
5F02
\u
5E38
\u
FF0C
\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
670D
\u
52A1
\u
662F
\u5426\u
6B63
\u
5E38
sqlhelper.3
=
\u
83B7
\u
53D6
\u8868\u7684
split
\u
4FE1
\u
606F
\u
65F6
\u
88AB
\u
4E2D
\u
65AD
\u
FF0C
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u8054\u
7CFBdatax
\u
7BA1
\u7406\u5458
sqlreadertask.1
=
\u
83B7
\u
53D6
\u8868\u7684\u5217\u
51FA
\u
95EE
\u9898\u
FF0C
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
sqlreadertask.2
=
\u
521B
\u
5EFAphoenix
\u7684
reader
\u
51FA
\u
73B0
\u
95EE
\u9898
,
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
sqlreadertask.3
=
phoenix
\u7684
reader
\u
521D
\u
59CB
\u5316\u
51FA
\u
73B0
\u
95EE
\u9898
,
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
sqlreadertask.4
=
phoenix
\u7684
reader
\u
521D
\u
59CB
\u5316\u
88AB
\u
4E2D
\u
65AD,
\u
8BF7
\u
91CD
\u
8BD5
sqlreadertask.5
=
\u9047\u5230\u
4E0D
\u
53EF
\u
8BC6
\u
522B
\u7684
phoenix
\u
7C7B
\u
578B
\u
FF0C
\u
8BF7
\u8054\u
7CFBhbase
\u
7BA1
\u7406\u5458
sqlreadertask.6
=
\u
8BFB
\u
53D6phoenix
\u7684
record
\u
65F6
\u
51FA
\u
73B0
\u
95EE
\u9898\u
FF0C
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
sqlreadertask.7
=
\u
8BFB
\u
53D6phoenix
\u7684
record
\u
65F6
\u
51FA
\u
73B0
\u
95EE
\u9898\u
FF0C
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
sqlreadertask.8
=
phoenix
\u7684
reader close
\u5931\u
8D25,
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
hbaseconfig.1
=
hbase
\u7684\u
914D
\u
7F6E
\u
4FE1
\u
606F
\u
4E0D
\u
80FD
\u
4E3A
\u
7A7A
hbaseconfig.2
=
hbase
\u7684\u
914D
\u
7F6E
\u
4FE1
\u
606F
\u6709\u
95EE
\u9898\u
FF0C
\u
8BF7
\u
53C2
\u8003\u6587\u6863\u
68C0
\u
67E5
\u
4E0B
hbaseconfig.3
=
zkquorum
\u
4E0D
\u
80FD
\u
4E3A
\u
7A7A
hbaseconfig.5
=
table
\u7684\u
540D
\u
5B57
\u
4E0D
\u
80FD
\u
4E3A
\u
7A7A
hbaseconfig.6
=
column
\u
53C2
\u6570\u
6CA1
\u6709\u
914D
\u
7F6E
hbaseconfig.7
=
\u
4ECEphoenix
\u
83B7
\u
53D6column
\u
51FA
\u9519\u
FF0C
\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/LocalStrings_en_US.properties
0 → 100644
View file @
b901b62d
errorcode.required_value
=
\u
60A8
\u
7F3A
\u5931\u
4E86
\u
5FC5
\u
987B
\u
586B
\u5199\u7684\u
53C2
\u6570\u
503C.
errorcode.illegal_value
=
\u
60A8
\u
586B
\u5199\u7684\u
53C2
\u6570\u
503C
\u
4E0D
\u5408\u
6CD5.
errorcode.get_phoenix_table_columns_error
=
\u
83B7
\u
53D6
\u8868\u7684\u5217\u
51FA
\u9519
.
errorcode.get_phoenix_connectioninfo_error
=
\u
83B7
\u
53D6phoenix
\u7684
connectioninfo
\u
51FA
\u9519
.
errorcode.get_phoenix_splits_error
=
\u
83B7
\u
53D6phoenix
\u7684
split
\u
4FE1
\u
606F
\u
65F6
\u
51FA
\u9519
.
errorcode.get_phoenix_createreader_error
=
\u
521B
\u
5EFAphoenix
\u7684
split
\u7684
reader
\u
65F6
\u
51FA
\u9519
.
errorcode.get_phoenix_readerinit_error
=
phoenix
\u7684
split
\u7684
reader
\u
521D
\u
59CB
\u5316\u
65F6
\u
51FA
\u9519
.
errorcode.get_phoenix_column_typeconvert_error
=
\u
5C06phoenix
\u5217\u7684\u
7C7B
\u
578B
\u
8F6C
\u6362\u
4E3Adatax
\u7684\u
7C7B
\u
578B
\u
65F6
\u
51FA
\u9519
.
errorcode.get_phoenix_record_read_error
=
\u
8BFB
\u
53D6phoenix
\u5177\u
4F53
\u7684\u
4E00
\u
884C
\u
65F6
\u
51FA
\u9519
.
errorcode.get_phoenix_reader_close_error
=
\u5173\u
95EDphoenix
\u3000
reader
\u
65F6
\u
51FA
\u9519
.
sqlhelper.1
=
\u
901A
\u
8FC7zkURL
\u
83B7
\u
53D6phoenix
\u7684
connectioninfo
\u
51FA
\u9519\u
FF0C
\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
670D
\u
52A1
\u
662F
\u5426\u
6B63
\u
5E38
sqlhelper.2
=
\u
83B7
\u
53D6
\u8868\u7684
split
\u
4FE1
\u
606F
\u
65F6
\u
51FA
\u
73B0
\u
4E86
\u
5F02
\u
5E38
\u
FF0C
\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
670D
\u
52A1
\u
662F
\u5426\u
6B63
\u
5E38
sqlhelper.3
=
\u
83B7
\u
53D6
\u8868\u7684
split
\u
4FE1
\u
606F
\u
65F6
\u
88AB
\u
4E2D
\u
65AD
\u
FF0C
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u8054\u
7CFBdatax
\u
7BA1
\u7406\u5458
sqlreadertask.1
=
\u
83B7
\u
53D6
\u8868\u7684\u5217\u
51FA
\u
95EE
\u9898\u
FF0C
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
sqlreadertask.2
=
\u
521B
\u
5EFAphoenix
\u7684
reader
\u
51FA
\u
73B0
\u
95EE
\u9898
,
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
sqlreadertask.3
=
phoenix
\u7684
reader
\u
521D
\u
59CB
\u5316\u
51FA
\u
73B0
\u
95EE
\u9898
,
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
sqlreadertask.4
=
phoenix
\u7684
reader
\u
521D
\u
59CB
\u5316\u
88AB
\u
4E2D
\u
65AD,
\u
8BF7
\u
91CD
\u
8BD5
sqlreadertask.5
=
\u9047\u5230\u
4E0D
\u
53EF
\u
8BC6
\u
522B
\u7684
phoenix
\u
7C7B
\u
578B
\u
FF0C
\u
8BF7
\u8054\u
7CFBhbase
\u
7BA1
\u7406\u5458
sqlreadertask.6
=
\u
8BFB
\u
53D6phoenix
\u7684
record
\u
65F6
\u
51FA
\u
73B0
\u
95EE
\u9898\u
FF0C
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
sqlreadertask.7
=
\u
8BFB
\u
53D6phoenix
\u7684
record
\u
65F6
\u
51FA
\u
73B0
\u
95EE
\u9898\u
FF0C
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
sqlreadertask.8
=
phoenix
\u7684
reader close
\u5931\u
8D25,
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
hbaseconfig.1
=
hbase
\u7684\u
914D
\u
7F6E
\u
4FE1
\u
606F
\u
4E0D
\u
80FD
\u
4E3A
\u
7A7A
hbaseconfig.2
=
hbase
\u7684\u
914D
\u
7F6E
\u
4FE1
\u
606F
\u6709\u
95EE
\u9898\u
FF0C
\u
8BF7
\u
53C2
\u8003\u6587\u6863\u
68C0
\u
67E5
\u
4E0B
hbaseconfig.3
=
zkquorum
\u
4E0D
\u
80FD
\u
4E3A
\u
7A7A
hbaseconfig.5
=
table
\u7684\u
540D
\u
5B57
\u
4E0D
\u
80FD
\u
4E3A
\u
7A7A
hbaseconfig.6
=
column
\u
53C2
\u6570\u
6CA1
\u6709\u
914D
\u
7F6E
hbaseconfig.7
=
\u
4ECEphoenix
\u
83B7
\u
53D6column
\u
51FA
\u9519\u
FF0C
\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/LocalStrings_ja_JP.properties
0 → 100644
View file @
b901b62d
errorcode.required_value
=
\u
60A8
\u
7F3A
\u5931\u
4E86
\u
5FC5
\u
987B
\u
586B
\u5199\u7684\u
53C2
\u6570\u
503C.
errorcode.illegal_value
=
\u
60A8
\u
586B
\u5199\u7684\u
53C2
\u6570\u
503C
\u
4E0D
\u5408\u
6CD5.
errorcode.get_phoenix_table_columns_error
=
\u
83B7
\u
53D6
\u8868\u7684\u5217\u
51FA
\u9519
.
errorcode.get_phoenix_connectioninfo_error
=
\u
83B7
\u
53D6phoenix
\u7684
connectioninfo
\u
51FA
\u9519
.
errorcode.get_phoenix_splits_error
=
\u
83B7
\u
53D6phoenix
\u7684
split
\u
4FE1
\u
606F
\u
65F6
\u
51FA
\u9519
.
errorcode.get_phoenix_createreader_error
=
\u
521B
\u
5EFAphoenix
\u7684
split
\u7684
reader
\u
65F6
\u
51FA
\u9519
.
errorcode.get_phoenix_readerinit_error
=
phoenix
\u7684
split
\u7684
reader
\u
521D
\u
59CB
\u5316\u
65F6
\u
51FA
\u9519
.
errorcode.get_phoenix_column_typeconvert_error
=
\u
5C06phoenix
\u5217\u7684\u
7C7B
\u
578B
\u
8F6C
\u6362\u
4E3Adatax
\u7684\u
7C7B
\u
578B
\u
65F6
\u
51FA
\u9519
.
errorcode.get_phoenix_record_read_error
=
\u
8BFB
\u
53D6phoenix
\u5177\u
4F53
\u7684\u
4E00
\u
884C
\u
65F6
\u
51FA
\u9519
.
errorcode.get_phoenix_reader_close_error
=
\u5173\u
95EDphoenix
\u3000
reader
\u
65F6
\u
51FA
\u9519
.
sqlhelper.1
=
\u
901A
\u
8FC7zkURL
\u
83B7
\u
53D6phoenix
\u7684
connectioninfo
\u
51FA
\u9519\u
FF0C
\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
670D
\u
52A1
\u
662F
\u5426\u
6B63
\u
5E38
sqlhelper.2
=
\u
83B7
\u
53D6
\u8868\u7684
split
\u
4FE1
\u
606F
\u
65F6
\u
51FA
\u
73B0
\u
4E86
\u
5F02
\u
5E38
\u
FF0C
\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
670D
\u
52A1
\u
662F
\u5426\u
6B63
\u
5E38
sqlhelper.3
=
\u
83B7
\u
53D6
\u8868\u7684
split
\u
4FE1
\u
606F
\u
65F6
\u
88AB
\u
4E2D
\u
65AD
\u
FF0C
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u8054\u
7CFBdatax
\u
7BA1
\u7406\u5458
sqlreadertask.1
=
\u
83B7
\u
53D6
\u8868\u7684\u5217\u
51FA
\u
95EE
\u9898\u
FF0C
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
sqlreadertask.2
=
\u
521B
\u
5EFAphoenix
\u7684
reader
\u
51FA
\u
73B0
\u
95EE
\u9898
,
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
sqlreadertask.3
=
phoenix
\u7684
reader
\u
521D
\u
59CB
\u5316\u
51FA
\u
73B0
\u
95EE
\u9898
,
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
sqlreadertask.4
=
phoenix
\u7684
reader
\u
521D
\u
59CB
\u5316\u
88AB
\u
4E2D
\u
65AD,
\u
8BF7
\u
91CD
\u
8BD5
sqlreadertask.5
=
\u9047\u5230\u
4E0D
\u
53EF
\u
8BC6
\u
522B
\u7684
phoenix
\u
7C7B
\u
578B
\u
FF0C
\u
8BF7
\u8054\u
7CFBhbase
\u
7BA1
\u7406\u5458
sqlreadertask.6
=
\u
8BFB
\u
53D6phoenix
\u7684
record
\u
65F6
\u
51FA
\u
73B0
\u
95EE
\u9898\u
FF0C
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
sqlreadertask.7
=
\u
8BFB
\u
53D6phoenix
\u7684
record
\u
65F6
\u
51FA
\u
73B0
\u
95EE
\u9898\u
FF0C
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
sqlreadertask.8
=
phoenix
\u7684
reader close
\u5931\u
8D25,
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
hbaseconfig.1
=
hbase
\u7684\u
914D
\u
7F6E
\u
4FE1
\u
606F
\u
4E0D
\u
80FD
\u
4E3A
\u
7A7A
hbaseconfig.2
=
hbase
\u7684\u
914D
\u
7F6E
\u
4FE1
\u
606F
\u6709\u
95EE
\u9898\u
FF0C
\u
8BF7
\u
53C2
\u8003\u6587\u6863\u
68C0
\u
67E5
\u
4E0B
hbaseconfig.3
=
zkquorum
\u
4E0D
\u
80FD
\u
4E3A
\u
7A7A
hbaseconfig.5
=
table
\u7684\u
540D
\u
5B57
\u
4E0D
\u
80FD
\u
4E3A
\u
7A7A
hbaseconfig.6
=
column
\u
53C2
\u6570\u
6CA1
\u6709\u
914D
\u
7F6E
hbaseconfig.7
=
\u
4ECEphoenix
\u
83B7
\u
53D6column
\u
51FA
\u9519\u
FF0C
\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/LocalStrings_zh_CN.properties
0 → 100644
View file @
b901b62d
errorcode.required_value
=
\u
60A8
\u
7F3A
\u5931\u
4E86
\u
5FC5
\u
987B
\u
586B
\u5199\u7684\u
53C2
\u6570\u
503C.
errorcode.illegal_value
=
\u
60A8
\u
586B
\u5199\u7684\u
53C2
\u6570\u
503C
\u
4E0D
\u5408\u
6CD5.
errorcode.get_phoenix_table_columns_error
=
\u
83B7
\u
53D6
\u8868\u7684\u5217\u
51FA
\u9519
.
errorcode.get_phoenix_connectioninfo_error
=
\u
83B7
\u
53D6phoenix
\u7684
connectioninfo
\u
51FA
\u9519
.
errorcode.get_phoenix_splits_error
=
\u
83B7
\u
53D6phoenix
\u7684
split
\u
4FE1
\u
606F
\u
65F6
\u
51FA
\u9519
.
errorcode.get_phoenix_createreader_error
=
\u
521B
\u
5EFAphoenix
\u7684
split
\u7684
reader
\u
65F6
\u
51FA
\u9519
.
errorcode.get_phoenix_readerinit_error
=
phoenix
\u7684
split
\u7684
reader
\u
521D
\u
59CB
\u5316\u
65F6
\u
51FA
\u9519
.
errorcode.get_phoenix_column_typeconvert_error
=
\u
5C06phoenix
\u5217\u7684\u
7C7B
\u
578B
\u
8F6C
\u6362\u
4E3Adatax
\u7684\u
7C7B
\u
578B
\u
65F6
\u
51FA
\u9519
.
errorcode.get_phoenix_record_read_error
=
\u
8BFB
\u
53D6phoenix
\u5177\u
4F53
\u7684\u
4E00
\u
884C
\u
65F6
\u
51FA
\u9519
.
errorcode.get_phoenix_reader_close_error
=
\u5173\u
95EDphoenix
\u3000
reader
\u
65F6
\u
51FA
\u9519
.
sqlhelper.1
=
\u
901A
\u
8FC7zkURL
\u
83B7
\u
53D6phoenix
\u7684
connectioninfo
\u
51FA
\u9519\u
FF0C
\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
670D
\u
52A1
\u
662F
\u5426\u
6B63
\u
5E38
sqlhelper.2
=
\u
83B7
\u
53D6
\u8868\u7684
split
\u
4FE1
\u
606F
\u
65F6
\u
51FA
\u
73B0
\u
4E86
\u
5F02
\u
5E38
\u
FF0C
\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
670D
\u
52A1
\u
662F
\u5426\u
6B63
\u
5E38
sqlhelper.3
=
\u
83B7
\u
53D6
\u8868\u7684
split
\u
4FE1
\u
606F
\u
65F6
\u
88AB
\u
4E2D
\u
65AD
\u
FF0C
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u8054\u
7CFBdatax
\u
7BA1
\u7406\u5458
sqlreadertask.1
=
\u
83B7
\u
53D6
\u8868\u7684\u5217\u
51FA
\u
95EE
\u9898\u
FF0C
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
sqlreadertask.2
=
\u
521B
\u
5EFAphoenix
\u7684
reader
\u
51FA
\u
73B0
\u
95EE
\u9898
,
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
sqlreadertask.3
=
phoenix
\u7684
reader
\u
521D
\u
59CB
\u5316\u
51FA
\u
73B0
\u
95EE
\u9898
,
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
sqlreadertask.4
=
phoenix
\u7684
reader
\u
521D
\u
59CB
\u5316\u
88AB
\u
4E2D
\u
65AD,
\u
8BF7
\u
91CD
\u
8BD5
sqlreadertask.5
=
\u9047\u5230\u
4E0D
\u
53EF
\u
8BC6
\u
522B
\u7684
phoenix
\u
7C7B
\u
578B
\u
FF0C
\u
8BF7
\u8054\u
7CFBhbase
\u
7BA1
\u7406\u5458
sqlreadertask.6
=
\u
8BFB
\u
53D6phoenix
\u7684
record
\u
65F6
\u
51FA
\u
73B0
\u
95EE
\u9898\u
FF0C
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
sqlreadertask.7
=
\u
8BFB
\u
53D6phoenix
\u7684
record
\u
65F6
\u
51FA
\u
73B0
\u
95EE
\u9898\u
FF0C
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
sqlreadertask.8
=
phoenix
\u7684
reader close
\u5931\u
8D25,
\u
8BF7
\u
91CD
\u
8BD5
\u
FF0C
\u
82E5
\u
8FD8
\u6709\u
95EE
\u9898\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
hbaseconfig.1
=
hbase
\u7684\u
914D
\u
7F6E
\u
4FE1
\u
606F
\u
4E0D
\u
80FD
\u
4E3A
\u
7A7A
hbaseconfig.2
=
hbase
\u7684\u
914D
\u
7F6E
\u
4FE1
\u
606F
\u6709\u
95EE
\u9898\u
FF0C
\u
8BF7
\u
53C2
\u8003\u6587\u6863\u
68C0
\u
67E5
\u
4E0B
hbaseconfig.3
=
zkquorum
\u
4E0D
\u
80FD
\u
4E3A
\u
7A7A
hbaseconfig.5
=
table
\u7684\u
540D
\u
5B57
\u
4E0D
\u
80FD
\u
4E3A
\u
7A7A
hbaseconfig.6
=
column
\u
53C2
\u6570\u
6CA1
\u6709\u
914D
\u
7F6E
hbaseconfig.7
=
\u
4ECEphoenix
\u
83B7
\u
53D6column
\u
51FA
\u9519\u
FF0C
\u
8BF7
\u
68C0
\u
67E5hbase
\u
96C6
\u
7FA4
\u
72B6
\u6001
hbase11xsqlreader/src/main/resources/plugin.json
0 → 100644
View file @
b901b62d
{
"name"
:
"hbase11xsqlreader"
,
"class"
:
"com.alibaba.datax.plugin.reader.hbase11xsqlreader.HbaseSQLReader"
,
"description"
:
"useScene: prod. mechanism: Scan to read data."
,
"developer"
:
"liwei.li, bug reported to : liwei.li@alibaba-inc.com"
}
hbase11xsqlreader/src/main/resources/plugin_job_template.json
0 → 100644
View file @
b901b62d
{
"name"
:
"hbase11sqlreader"
,
"parameter"
:
{
"hbaseConfig"
:
{
"hbase.zookeeper.quorum"
:
"hb-proxy-pub-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-003.hbase.rds.aliyuncs.com"
},
"table"
:
"TABLE1"
,
"column"
:
[
"ID"
,
"COL1"
]
}
}
hbase11xsqlreader/src/test/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelperTest.java
0 → 100644
View file @
b901b62d
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
hbase11xsqlreader
;
import
com.alibaba.datax.common.util.Configuration
;
import
org.junit.Test
;
import
java.util.List
;
import
static
junit
.
framework
.
Assert
.
assertEquals
;
/**
* Created by shf on 16/7/20.
*/
public
class
HbaseSQLHelperTest
{
private
String
jsonStr
=
"{\n"
+
" \"hbaseConfig\": {\n"
+
" \"hbase.zookeeper.quorum\": \"hb-proxy-pub-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-003.hbase.rds.aliyuncs.com\"\n"
+
" },\n"
+
" \"table\": \"TABLE1\",\n"
+
" \"column\": []\n"
+
" }"
;
@Test
public
void
testParseConfig
()
{
Configuration
config
=
Configuration
.
from
(
jsonStr
);
HbaseSQLReaderConfig
readerConfig
=
HbaseSQLHelper
.
parseConfig
(
config
);
System
.
out
.
println
(
"tablenae = "
+
readerConfig
.
getTableName
()
+
",zk = "
+
readerConfig
.
getZkUrl
());
assertEquals
(
"TABLE1"
,
readerConfig
.
getTableName
());
assertEquals
(
"hb-proxy-pub-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-003.hbase.rds.aliyuncs.com:2181"
,
readerConfig
.
getZkUrl
());
}
@Test
public
void
testSplit
()
{
Configuration
config
=
Configuration
.
from
(
jsonStr
);
HbaseSQLReaderConfig
readerConfig
=
HbaseSQLHelper
.
parseConfig
(
config
);
List
<
Configuration
>
splits
=
HbaseSQLHelper
.
split
(
readerConfig
);
System
.
out
.
println
(
"split size = "
+
splits
.
size
());
}
}
hbase11xsqlreader/src/test/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTaskTest.java
0 → 100644
View file @
b901b62d
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
hbase11xsqlreader
;
import
com.alibaba.datax.common.element.*
;
import
com.alibaba.datax.common.plugin.RecordSender
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.core.transport.record.DefaultRecord
;
import
org.junit.Test
;
import
java.io.IOException
;
import
java.util.List
;
import
static
junit
.
framework
.
Assert
.
assertEquals
;
import
static
org
.
mockito
.
Mockito
.
mock
;
import
static
org
.
mockito
.
Mockito
.
when
;
public
class
HbaseSQLReaderTaskTest
{
private
String
jsonStr
=
"{\n"
+
" \"hbaseConfig\": {\n"
+
" \"hbase.zookeeper.quorum\": \"hb-proxy-pub-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-003.hbase.rds.aliyuncs.com\"\n"
+
" },\n"
+
" \"table\": \"TABLE1\",\n"
+
" \"column\": []\n"
+
" }"
;
private
List
<
Configuration
>
generateSplitConfig
()
throws
IOException
,
InterruptedException
{
Configuration
config
=
Configuration
.
from
(
jsonStr
);
HbaseSQLReaderConfig
readerConfig
=
HbaseSQLHelper
.
parseConfig
(
config
);
List
<
Configuration
>
splits
=
HbaseSQLHelper
.
split
(
readerConfig
);
System
.
out
.
println
(
"split size = "
+
splits
.
size
());
return
splits
;
}
@Test
public
void
testReadRecord
()
throws
Exception
{
List
<
Configuration
>
splits
=
this
.
generateSplitConfig
();
int
allRecordNum
=
0
;
for
(
int
i
=
0
;
i
<
splits
.
size
();
i
++)
{
RecordSender
recordSender
=
mock
(
RecordSender
.
class
);
when
(
recordSender
.
createRecord
()).
thenReturn
(
new
DefaultRecord
());
Record
record
=
recordSender
.
createRecord
();
HbaseSQLReaderTask
hbase11SQLReaderTask
=
new
HbaseSQLReaderTask
(
splits
.
get
(
i
));
hbase11SQLReaderTask
.
init
();
hbase11SQLReaderTask
.
prepare
();
int
num
=
0
;
while
(
true
)
{
boolean
hasLine
=
false
;
try
{
hasLine
=
hbase11SQLReaderTask
.
readRecord
(
record
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
throw
e
;
}
if
(!
hasLine
)
break
;
num
++;
if
(
num
%
100
==
0
)
System
.
out
.
println
(
"record num is :"
+
num
+
",record is "
+
record
.
toString
());
when
(
recordSender
.
createRecord
()).
thenReturn
(
new
DefaultRecord
());
String
recordStr
=
""
;
for
(
int
j
=
0
;
j
<
record
.
getColumnNumber
();
j
++)
{
recordStr
+=
record
.
getColumn
(
j
).
asString
()
+
","
;
}
recordSender
.
sendToWriter
(
record
);
record
=
recordSender
.
createRecord
();
}
System
.
out
.
println
(
"split id is "
+
i
+
",record num = "
+
num
);
allRecordNum
+=
num
;
recordSender
.
flush
();
hbase11SQLReaderTask
.
destroy
();
}
System
.
out
.
println
(
"all record num = "
+
allRecordNum
);
assertEquals
(
10000
,
allRecordNum
);
}
}
package.xml
View file @
b901b62d
...
@@ -294,6 +294,13 @@
...
@@ -294,6 +294,13 @@
</includes>
</includes>
<outputDirectory>
datax
</outputDirectory>
<outputDirectory>
datax
</outputDirectory>
</fileSet>
</fileSet>
<fileSet>
<directory>
hbase11xsqlreader/target/datax/
</directory>
<includes>
<include>
**/*.*
</include>
</includes>
<outputDirectory>
datax
</outputDirectory>
</fileSet>
<fileSet>
<fileSet>
<directory>
elasticsearchwriter/target/datax/
</directory>
<directory>
elasticsearchwriter/target/datax/
</directory>
<includes>
<includes>
...
...
pom.xml
View file @
b901b62d
...
@@ -83,6 +83,7 @@
...
@@ -83,6 +83,7 @@
<module>
hbase11xwriter
</module>
<module>
hbase11xwriter
</module>
<module>
hbase094xwriter
</module>
<module>
hbase094xwriter
</module>
<module>
hbase11xsqlwriter
</module>
<module>
hbase11xsqlwriter
</module>
<module>
hbase11xsqlreader
</module>
<module>
elasticsearchwriter
</module>
<module>
elasticsearchwriter
</module>
<!-- common support module -->
<!-- common support module -->
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment