Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
D
DataX
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
risk-feature
DataX
Commits
928300f3
Unverified
Commit
928300f3
authored
Nov 08, 2019
by
Trafalgar
Committed by
GitHub
Nov 08, 2019
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #495 from asdf2014/github_tsdb_reader
Add TSDB Reader
parents
aec036b0
dd19fd43
Changes
24
Hide whitespace changes
Inline
Side-by-side
Showing
24 changed files
with
2289 additions
and
0 deletions
+2289
-0
pom.xml
pom.xml
+1
-0
tsdbreader.md
tsdbreader/doc/tsdbreader.md
+587
-0
pom.xml
tsdbreader/pom.xml
+146
-0
package.xml
tsdbreader/src/main/assembly/package.xml
+35
-0
Constant.java
.../com/alibaba/datax/plugin/reader/tsdbreader/Constant.java
+29
-0
Key.java
.../java/com/alibaba/datax/plugin/reader/tsdbreader/Key.java
+36
-0
TSDBReader.java
...om/alibaba/datax/plugin/reader/tsdbreader/TSDBReader.java
+320
-0
TSDBReaderErrorCode.java
...a/datax/plugin/reader/tsdbreader/TSDBReaderErrorCode.java
+40
-0
Connection4TSDB.java
.../datax/plugin/reader/tsdbreader/conn/Connection4TSDB.java
+88
-0
DataPoint4MultiFieldsTSDB.java
...gin/reader/tsdbreader/conn/DataPoint4MultiFieldsTSDB.java
+68
-0
DataPoint4TSDB.java
...a/datax/plugin/reader/tsdbreader/conn/DataPoint4TSDB.java
+68
-0
MultiFieldQueryResult.java
.../plugin/reader/tsdbreader/conn/MultiFieldQueryResult.java
+64
-0
QueryResult.java
...baba/datax/plugin/reader/tsdbreader/conn/QueryResult.java
+64
-0
TSDBConnection.java
...a/datax/plugin/reader/tsdbreader/conn/TSDBConnection.java
+94
-0
TSDBDump.java
...alibaba/datax/plugin/reader/tsdbreader/conn/TSDBDump.java
+318
-0
HttpUtils.java
...libaba/datax/plugin/reader/tsdbreader/util/HttpUtils.java
+67
-0
TSDBUtils.java
...libaba/datax/plugin/reader/tsdbreader/util/TSDBUtils.java
+68
-0
TimeUtils.java
...libaba/datax/plugin/reader/tsdbreader/util/TimeUtils.java
+38
-0
plugin.json
tsdbreader/src/main/resources/plugin.json
+10
-0
plugin_job_template.json
tsdbreader/src/main/resources/plugin_job_template.json
+29
-0
TSDBConnectionTest.java
...tax/plugin/reader/tsdbreader/conn/TSDBConnectionTest.java
+30
-0
Const.java
...om/alibaba/datax/plugin/reader/tsdbreader/util/Const.java
+17
-0
HttpUtilsTest.java
...ba/datax/plugin/reader/tsdbreader/util/HttpUtilsTest.java
+39
-0
TimeUtilsTest.java
...ba/datax/plugin/reader/tsdbreader/util/TimeUtilsTest.java
+33
-0
No files found.
pom.xml
View file @
928300f3
...
@@ -63,6 +63,7 @@
...
@@ -63,6 +63,7 @@
<module>
rdbmsreader
</module>
<module>
rdbmsreader
</module>
<module>
hbase11xreader
</module>
<module>
hbase11xreader
</module>
<module>
hbase094xreader
</module>
<module>
hbase094xreader
</module>
<module>
tsdbreader
</module>
<module>
opentsdbreader
</module>
<module>
opentsdbreader
</module>
<module>
cassandrareader
</module>
<module>
cassandrareader
</module>
...
...
tsdbreader/doc/tsdbreader.md
0 → 100644
View file @
928300f3
# TSDBReader 插件文档
__
_
## 1 快速介绍
TSDBReader 插件实现了从阿里云 TSDB 读取数据。阿里云时间序列数据库 (
**T**
ime
**S**
eries
**D**
ata
**b**
ase , 简称 TSDB) 是一种集时序数据高效读写,压缩存储,实时计算能力为一体的数据库服务,可广泛应用于物联网和互联网领域,实现对设备及业务服务的实时监控,实时预测告警。详见 TSDB 的阿里云
[
官网
](
https://cn.aliyun.com/product/hitsdb
)
。
## 2 实现原理
在底层实现上,TSDBReader 通过 HTTP 请求链接到 阿里云 TSDB 实例,利用
`/api/query`
或者
`/api/mquery`
接口将数据点扫描出来(更多细节详见:
[
时序数据库 TSDB - HTTP API 概览
](
https://help.aliyun.com/document_detail/63557.html
)
)。而整个同步的过程,是通过时间线和查询时间线范围进行切分。
## 3 功能说明
### 3.1 配置样例
*
配置一个从 阿里云 TSDB 数据库同步抽取数据到本地的作业,并以
**时序数据**
的格式输出:
时序数据样例:
```
json
{
"metric"
:
"m","tags"
:{
"app"
:
"a19","cluster"
:
"c5","group"
:
"g10","ip"
:
"i999","zone"
:
"z1"},"timestamp"
:
1546272263
,
"value"
:
1
}
```
```
json
{
"job"
:
{
"content"
:
[
{
"reader"
:
{
"name"
:
"tsdbreader"
,
"parameter"
:
{
"sinkDbType"
:
"TSDB"
,
"endpoint"
:
"http://localhost:8242"
,
"column"
:
[
"m"
],
"splitIntervalMs"
:
60000
,
"beginDateTime"
:
"2019-01-01 00:00:00"
,
"endDateTime"
:
"2019-01-01 01:00:00"
}
},
"writer"
:
{
"name"
:
"streamwriter"
,
"parameter"
:
{
"encoding"
:
"UTF-8"
,
"print"
:
true
}
}
}
],
"setting"
:
{
"speed"
:
{
"channel"
:
3
}
}
}
}
```
*
配置一个从 阿里云 TSDB 数据库同步抽取数据到本地的作业,并以
**关系型数据**
的格式输出:
关系型数据样例:
```
txt
m 1546272125 a1 c1 g2 i3021 z4 1.0
```
```
json
{
"job"
:
{
"content"
:
[
{
"reader"
:
{
"name"
:
"tsdbreader"
,
"parameter"
:
{
"sinkDbType"
:
"RDB"
,
"endpoint"
:
"http://localhost:8242"
,
"column"
:
[
"__metric__"
,
"__ts__"
,
"app"
,
"cluster"
,
"group"
,
"ip"
,
"zone"
,
"__value__"
],
"metric"
:
[
"m"
],
"splitIntervalMs"
:
60000
,
"beginDateTime"
:
"2019-01-01 00:00:00"
,
"endDateTime"
:
"2019-01-01 01:00:00"
}
},
"writer"
:
{
"name"
:
"streamwriter"
,
"parameter"
:
{
"encoding"
:
"UTF-8"
,
"print"
:
true
}
}
}
],
"setting"
:
{
"speed"
:
{
"channel"
:
3
}
}
}
}
```
*
配置一个从 阿里云 TSDB 数据库同步抽取
**单值**
数据到 ADB 的作业:
```
json
{
"job"
:
{
"content"
:
[
{
"reader"
:
{
"name"
:
"tsdbreader"
,
"parameter"
:
{
"sinkDbType"
:
"RDB"
,
"endpoint"
:
"http://localhost:8242"
,
"column"
:
[
"__metric__"
,
"__ts__"
,
"app"
,
"cluster"
,
"group"
,
"ip"
,
"zone"
,
"__value__"
],
"metric"
:
[
"m"
],
"splitIntervalMs"
:
60000
,
"beginDateTime"
:
"2019-01-01 00:00:00"
,
"endDateTime"
:
"2019-01-01 01:00:00"
}
},
"writer"
:
{
"name"
:
"adswriter"
,
"parameter"
:
{
"username"
:
"******"
,
"password"
:
"******"
,
"column"
:
[
"`metric`"
,
"`ts`"
,
"`app`"
,
"`cluster`"
,
"`group`"
,
"`ip`"
,
"`zone`"
,
"`value`"
],
"url"
:
"http://localhost:3306"
,
"schema"
:
"datax_test"
,
"table"
:
"datax_test"
,
"writeMode"
:
"insert"
,
"opIndex"
:
"0"
,
"batchSize"
:
"2"
}
}
}
],
"setting"
:
{
"speed"
:
{
"channel"
:
3
}
}
}
}
```
*
配置一个从 阿里云 TSDB 数据库同步抽取
**多值**
数据到 ADB 的作业:
```
json
{
"job"
:
{
"content"
:
[
{
"reader"
:
{
"name"
:
"tsdbreader"
,
"parameter"
:
{
"sinkDbType"
:
"RDB"
,
"endpoint"
:
"http://localhost:8242"
,
"column"
:
[
"__metric__"
,
"__ts__"
,
"app"
,
"cluster"
,
"group"
,
"ip"
,
"zone"
,
"load"
,
"memory"
,
"cpu"
],
"metric"
:
[
"m_field"
],
"field"
:
{
"m_field"
:
[
"load"
,
"memory"
,
"cpu"
]
},
"splitIntervalMs"
:
60000
,
"beginDateTime"
:
"2019-01-01 00:00:00"
,
"endDateTime"
:
"2019-01-01 01:00:00"
}
},
"writer"
:
{
"name"
:
"adswriter"
,
"parameter"
:
{
"username"
:
"******"
,
"password"
:
"******"
,
"column"
:
[
"`metric`"
,
"`ts`"
,
"`app`"
,
"`cluster`"
,
"`group`"
,
"`ip`"
,
"`zone`"
,
"`load`"
,
"`memory`"
,
"`cpu`"
],
"url"
:
"http://localhost:3306"
,
"schema"
:
"datax_test"
,
"table"
:
"datax_test_multi_field"
,
"writeMode"
:
"insert"
,
"opIndex"
:
"0"
,
"batchSize"
:
"2"
}
}
}
],
"setting"
:
{
"speed"
:
{
"channel"
:
3
}
}
}
}
```
*
配置一个从 阿里云 TSDB 数据库同步抽取
**单值**
数据到 ADB 的作业,并指定过滤部分时间线:
```
json
{
"job"
:
{
"content"
:
[
{
"reader"
:
{
"name"
:
"tsdbreader"
,
"parameter"
:
{
"sinkDbType"
:
"RDB"
,
"endpoint"
:
"http://localhost:8242"
,
"column"
:
[
"__metric__"
,
"__ts__"
,
"app"
,
"cluster"
,
"group"
,
"ip"
,
"zone"
,
"__value__"
],
"metric"
:
[
"m"
],
"tag"
:
{
"m"
:
{
"app"
:
"a1"
,
"cluster"
:
"c1"
}
},
"splitIntervalMs"
:
60000
,
"beginDateTime"
:
"2019-01-01 00:00:00"
,
"endDateTime"
:
"2019-01-01 01:00:00"
}
},
"writer"
:
{
"name"
:
"adswriter"
,
"parameter"
:
{
"username"
:
"******"
,
"password"
:
"******"
,
"column"
:
[
"`metric`"
,
"`ts`"
,
"`app`"
,
"`cluster`"
,
"`group`"
,
"`ip`"
,
"`zone`"
,
"`value`"
],
"url"
:
"http://localhost:3306"
,
"schema"
:
"datax_test"
,
"table"
:
"datax_test"
,
"writeMode"
:
"insert"
,
"opIndex"
:
"0"
,
"batchSize"
:
"2"
}
}
}
],
"setting"
:
{
"speed"
:
{
"channel"
:
3
}
}
}
}
```
*
配置一个从 阿里云 TSDB 数据库同步抽取
**多值**
数据到 ADB 的作业,并指定过滤部分时间线:
```
json
{
"job"
:
{
"content"
:
[
{
"reader"
:
{
"name"
:
"tsdbreader"
,
"parameter"
:
{
"sinkDbType"
:
"RDB"
,
"endpoint"
:
"http://localhost:8242"
,
"column"
:
[
"__metric__"
,
"__ts__"
,
"app"
,
"cluster"
,
"group"
,
"ip"
,
"zone"
,
"load"
,
"memory"
,
"cpu"
],
"metric"
:
[
"m_field"
],
"field"
:
{
"m_field"
:
[
"load"
,
"memory"
,
"cpu"
]
},
"tag"
:
{
"m_field"
:
{
"ip"
:
"i999"
}
},
"splitIntervalMs"
:
60000
,
"beginDateTime"
:
"2019-01-01 00:00:00"
,
"endDateTime"
:
"2019-01-01 01:00:00"
}
},
"writer"
:
{
"name"
:
"adswriter"
,
"parameter"
:
{
"username"
:
"******"
,
"password"
:
"******"
,
"column"
:
[
"`metric`"
,
"`ts`"
,
"`app`"
,
"`cluster`"
,
"`group`"
,
"`ip`"
,
"`zone`"
,
"`load`"
,
"`memory`"
,
"`cpu`"
],
"url"
:
"http://localhost:3306"
,
"schema"
:
"datax_test"
,
"table"
:
"datax_test_multi_field"
,
"writeMode"
:
"insert"
,
"opIndex"
:
"0"
,
"batchSize"
:
"2"
}
}
}
],
"setting"
:
{
"speed"
:
{
"channel"
:
3
}
}
}
}
```
*
配置一个从 阿里云 TSDB 数据库同步抽取
**单值**
数据到另一个 阿里云 TSDB 数据库 的作业:
```
json
{
"job"
:
{
"content"
:
[
{
"reader"
:
{
"name"
:
"tsdbreader"
,
"parameter"
:
{
"sinkDbType"
:
"TSDB"
,
"endpoint"
:
"http://localhost:8242"
,
"column"
:
[
"m"
],
"splitIntervalMs"
:
60000
,
"beginDateTime"
:
"2019-01-01 00:00:00"
,
"endDateTime"
:
"2019-01-01 01:00:00"
}
},
"writer"
:
{
"name"
:
"tsdbwriter"
,
"parameter"
:
{
"endpoint"
:
"http://localhost:8240"
}
}
}
],
"setting"
:
{
"speed"
:
{
"channel"
:
3
}
}
}
}
```
*
配置一个从 阿里云 TSDB 数据库同步抽取
**多值**
数据到另一个 阿里云 TSDB 数据库 的作业:
```
json
{
"job"
:
{
"content"
:
[
{
"reader"
:
{
"name"
:
"tsdbreader"
,
"parameter"
:
{
"sinkDbType"
:
"TSDB"
,
"endpoint"
:
"http://localhost:8242"
,
"column"
:
[
"m_field"
],
"field"
:
{
"m_field"
:
[
"load"
,
"memory"
,
"cpu"
]
},
"splitIntervalMs"
:
60000
,
"beginDateTime"
:
"2019-01-01 00:00:00"
,
"endDateTime"
:
"2019-01-01 01:00:00"
}
},
"writer"
:
{
"name"
:
"tsdbwriter"
,
"parameter"
:
{
"multiField"
:
true
,
"endpoint"
:
"http://localhost:8240"
}
}
}
],
"setting"
:
{
"speed"
:
{
"channel"
:
3
}
}
}
}
```
### 3.2 参数说明
*
**name**
*
描述:本插件的名称
*
必选:是
*
默认值:tsdbreader
*
**parameter**
*
**sinkDbType**
*
描述:目标数据库的类型
*
必选:否
*
默认值:TSDB
*
注意:目前支持 TSDB 和 RDB 两个取值。其中,TSDB 包括 阿里云 TSDB、OpenTSDB、InfluxDB、Prometheus 和 TimeScale。RDB 包括 ADB、MySQL、Oracle、PostgreSQL 和 DRDS 等。
*
**endpoint**
*
描述:阿里云 TSDB 的 HTTP 连接地址
*
必选:是
*
格式:http://IP:Port
*
默认值:无
*
**column**
*
描述:TSDB 场景下:数据迁移任务需要迁移的 Metric 列表;RDB 场景下:映射到关系型数据库中的表字段,且增加
`__metric__`
、
`__ts__`
和
`__value__`
三个字段,其中
`__metric__`
用于映射度量字段,
`__ts__`
用于映射 timestamp 字段,而
`__value__`
仅适用于单值场景,用于映射度量值,多值场景下,直接指定 field 字段即可
*
必选:是
*
默认值:无
*
**metric**
*
描述:仅适用于 RDB 场景下,表示数据迁移任务需要迁移的 Metric 列表
*
必选:否
*
默认值:无
*
**field**
*
描述:仅适用于多值场景下,表示数据迁移任务需要迁移的 Field 列表
*
必选:否
*
默认值:无
*
**tag**
*
描述:数据迁移任务需要迁移的 TagK 和 TagV,用于进一步过滤时间线
*
必选:否
*
默认值:无
*
**splitIntervalMs**
*
描述:用于 DataX 内部切分 Task,每个 Task 只查询一小部分的时间段
*
必选:是
*
默认值:无
*
注意:单位是 ms 毫秒
*
**beginDateTime**
*
描述:和 endDateTime 配合使用,用于指定哪个时间段内的数据点,需要被迁移
*
必选:是
*
格式:
`yyyy-MM-dd HH:mm:ss`
*
默认值:无
*
注意:指定起止时间会自动忽略分钟和秒,转为整点时刻,例如 2019-4-18 的
[
3:35, 4:55) 会被转为
[
3:00, 4:00)
*
**endDateTime**
*
描述:和 beginDateTime 配合使用,用于指定哪个时间段内的数据点,需要被迁移
*
必选:是
*
格式:
`yyyy-MM-dd HH:mm:ss`
*
默认值:无
*
注意:指定起止时间会自动忽略分钟和秒,转为整点时刻,例如 2019-4-18 的
[
3:35, 4:55) 会被转为
[
3:00, 4:00)
### 3.3 类型转换
| DataX 内部类型 | TSDB 数据类型 |
| -------------- | ------------------------------------------------------------ |
| String | TSDB 数据点序列化字符串,包括 timestamp、metric、tags、fields 和 value |
## 4 约束限制
### 4.2 如果存在某一个 Metric 下在一个小时范围内的数据量过大,可能需要通过 `-j` 参数调整 JVM 内存大小
考虑到下游 Writer 如果写入速度不及 TSDB Reader 的查询数据,可能会存在积压的情况,因此需要适当地调整 JVM 参数。以"从 阿里云 TSDB 数据库同步抽取数据到本地的作业"为例,启动命令如下:
```
bash
python datax/bin/datax.py tsdb2stream.json
-j
"-Xms4096m -Xmx4096m"
```
### 4.3 指定起止时间会自动被转为整点时刻
指定起止时间会自动被转为整点时刻,例如 2019-4-18 的
`[3:35, 3:55)`
会被转为
`[3:00, 4:00)`
tsdbreader/pom.xml
0 → 100644
View file @
928300f3
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<parent>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-all
</artifactId>
<version>
0.0.1-SNAPSHOT
</version>
</parent>
<artifactId>
tsdbreader
</artifactId>
<name>
tsdbreader
</name>
<packaging>
jar
</packaging>
<properties>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<!-- common -->
<commons-lang3.version>
3.3.2
</commons-lang3.version>
<!-- http -->
<httpclient.version>
4.4
</httpclient.version>
<commons-io.version>
2.4
</commons-io.version>
<!-- json -->
<fastjson.version>
1.2.28
</fastjson.version>
<!-- test -->
<junit4.version>
4.12
</junit4.version>
<!-- time -->
<joda-time.version>
2.9.9
</joda-time.version>
</properties>
<dependencies>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-common
</artifactId>
<version>
${datax-project-version}
</version>
<exclusions>
<exclusion>
<artifactId>
slf4j-log4j12
</artifactId>
<groupId>
org.slf4j
</groupId>
</exclusion>
<exclusion>
<artifactId>
fastjson
</artifactId>
<groupId>
com.alibaba
</groupId>
</exclusion>
<exclusion>
<artifactId>
commons-math3
</artifactId>
<groupId>
org.apache.commons
</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
</dependency>
<dependency>
<groupId>
ch.qos.logback
</groupId>
<artifactId>
logback-classic
</artifactId>
</dependency>
<!-- common -->
<dependency>
<groupId>
org.apache.commons
</groupId>
<artifactId>
commons-lang3
</artifactId>
<version>
${commons-lang3.version}
</version>
</dependency>
<!-- http -->
<dependency>
<groupId>
org.apache.httpcomponents
</groupId>
<artifactId>
httpclient
</artifactId>
<version>
${httpclient.version}
</version>
</dependency>
<dependency>
<groupId>
commons-io
</groupId>
<artifactId>
commons-io
</artifactId>
<version>
${commons-io.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.httpcomponents
</groupId>
<artifactId>
fluent-hc
</artifactId>
<version>
${httpclient.version}
</version>
</dependency>
<!-- json -->
<dependency>
<groupId>
com.alibaba
</groupId>
<artifactId>
fastjson
</artifactId>
<version>
${fastjson.version}
</version>
</dependency>
<!-- time -->
<dependency>
<groupId>
joda-time
</groupId>
<artifactId>
joda-time
</artifactId>
<version>
${joda-time.version}
</version>
</dependency>
<!-- test -->
<dependency>
<groupId>
junit
</groupId>
<artifactId>
junit
</artifactId>
<version>
${junit4.version}
</version>
<scope>
test
</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>
maven-compiler-plugin
</artifactId>
<configuration>
<source>
${jdk-version}
</source>
<target>
${jdk-version}
</target>
<encoding>
${project-sourceEncoding}
</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>
maven-assembly-plugin
</artifactId>
<configuration>
<descriptors>
<descriptor>
src/main/assembly/package.xml
</descriptor>
</descriptors>
<finalName>
datax
</finalName>
</configuration>
<executions>
<execution>
<id>
dwzip
</id>
<phase>
package
</phase>
<goals>
<goal>
single
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
tsdbreader/src/main/assembly/package.xml
0 → 100755
View file @
928300f3
<assembly
xmlns=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"
>
<id></id>
<formats>
<format>
dir
</format>
</formats>
<includeBaseDirectory>
false
</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>
src/main/resources
</directory>
<includes>
<include>
plugin.json
</include>
<include>
plugin_job_template.json
</include>
</includes>
<outputDirectory>
plugin/reader/tsdbreader
</outputDirectory>
</fileSet>
<fileSet>
<directory>
target/
</directory>
<includes>
<include>
tsdbreader-0.0.1-SNAPSHOT.jar
</include>
</includes>
<outputDirectory>
plugin/reader/tsdbreader
</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>
false
</useProjectArtifact>
<outputDirectory>
plugin/reader/tsdbreader/libs
</outputDirectory>
<scope>
runtime
</scope>
</dependencySet>
</dependencySets>
</assembly>
tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Constant.java
0 → 100644
View file @
928300f3
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
tsdbreader
;
import
java.util.HashSet
;
import
java.util.Set
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Constant
*
* @author Benedict Jin
* @since 2019-10-21
*/
public
final
class
Constant
{
static
final
String
DEFAULT_DATA_FORMAT
=
"yyyy-MM-dd HH:mm:ss"
;
public
static
final
String
METRIC_SPECIFY_KEY
=
"__metric__"
;
public
static
final
String
TS_SPECIFY_KEY
=
"__ts__"
;
public
static
final
String
VALUE_SPECIFY_KEY
=
"__value__"
;
static
final
Set
<
String
>
MUST_CONTAINED_SPECIFY_KEYS
=
new
HashSet
<>();
static
{
MUST_CONTAINED_SPECIFY_KEYS
.
add
(
METRIC_SPECIFY_KEY
);
MUST_CONTAINED_SPECIFY_KEYS
.
add
(
TS_SPECIFY_KEY
);
// __value__ 在多值场景下,可以不指定
}
}
tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Key.java
0 → 100644
View file @
928300f3
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
tsdbreader
;
import
java.util.HashSet
;
import
java.util.Set
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Key
*
* @author Benedict Jin
* @since 2019-10-21
*/
public
class
Key
{
// TSDB for OpenTSDB / InfluxDB / TimeScale / Prometheus etc.
// RDB for MySQL / ADB etc.
static
final
String
SINK_DB_TYPE
=
"sinkDbType"
;
static
final
String
ENDPOINT
=
"endpoint"
;
static
final
String
COLUMN
=
"column"
;
static
final
String
METRIC
=
"metric"
;
static
final
String
FIELD
=
"field"
;
static
final
String
TAG
=
"tag"
;
static
final
String
INTERVAL_DATE_TIME
=
"splitIntervalMs"
;
static
final
String
BEGIN_DATE_TIME
=
"beginDateTime"
;
static
final
String
END_DATE_TIME
=
"endDateTime"
;
static
final
Integer
INTERVAL_DATE_TIME_DEFAULT_VALUE
=
60
;
static
final
String
TYPE_DEFAULT_VALUE
=
"TSDB"
;
static
final
Set
<
String
>
TYPE_SET
=
new
HashSet
<>();
static
{
TYPE_SET
.
add
(
"TSDB"
);
TYPE_SET
.
add
(
"RDB"
);
}
}
tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReader.java
0 → 100755
View file @
928300f3
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
tsdbreader
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.plugin.RecordSender
;
import
com.alibaba.datax.common.spi.Reader
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.reader.tsdbreader.conn.TSDBConnection
;
import
com.alibaba.datax.plugin.reader.tsdbreader.util.TimeUtils
;
import
com.alibaba.fastjson.JSON
;
import
org.apache.commons.lang3.StringUtils
;
import
org.joda.time.DateTime
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.text.ParseException
;
import
java.text.SimpleDateFormat
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.List
;
import
java.util.Map
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:TSDB Reader
*
* @author Benedict Jin
* @since 2019-10-21
*/
@SuppressWarnings
(
"unused"
)
public
class
TSDBReader
extends
Reader
{
public
static
class
Job
extends
Reader
.
Job
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
Job
.
class
);
private
Configuration
originalConfig
;
@Override
public
void
init
()
{
this
.
originalConfig
=
super
.
getPluginJobConf
();
String
type
=
originalConfig
.
getString
(
Key
.
SINK_DB_TYPE
,
Key
.
TYPE_DEFAULT_VALUE
);
if
(
StringUtils
.
isBlank
(
type
))
{
throw
DataXException
.
asDataXException
(
TSDBReaderErrorCode
.
REQUIRED_VALUE
,
"The parameter ["
+
Key
.
SINK_DB_TYPE
+
"] is not set."
);
}
if
(!
Key
.
TYPE_SET
.
contains
(
type
))
{
throw
DataXException
.
asDataXException
(
TSDBReaderErrorCode
.
ILLEGAL_VALUE
,
"The parameter ["
+
Key
.
SINK_DB_TYPE
+
"] should be one of ["
+
JSON
.
toJSONString
(
Key
.
TYPE_SET
)
+
"]."
);
}
String
address
=
originalConfig
.
getString
(
Key
.
ENDPOINT
);
if
(
StringUtils
.
isBlank
(
address
))
{
throw
DataXException
.
asDataXException
(
TSDBReaderErrorCode
.
REQUIRED_VALUE
,
"The parameter ["
+
Key
.
ENDPOINT
+
"] is not set."
);
}
// tagK / field could be empty
if
(
"TSDB"
.
equals
(
type
))
{
List
<
String
>
columns
=
originalConfig
.
getList
(
Key
.
COLUMN
,
String
.
class
);
if
(
columns
==
null
||
columns
.
isEmpty
())
{
throw
DataXException
.
asDataXException
(
TSDBReaderErrorCode
.
REQUIRED_VALUE
,
"The parameter ["
+
Key
.
COLUMN
+
"] is not set."
);
}
}
else
{
List
<
String
>
columns
=
originalConfig
.
getList
(
Key
.
COLUMN
,
String
.
class
);
if
(
columns
==
null
||
columns
.
isEmpty
())
{
throw
DataXException
.
asDataXException
(
TSDBReaderErrorCode
.
REQUIRED_VALUE
,
"The parameter ["
+
Key
.
COLUMN
+
"] is not set."
);
}
for
(
String
specifyKey
:
Constant
.
MUST_CONTAINED_SPECIFY_KEYS
)
{
if
(!
columns
.
contains
(
specifyKey
))
{
throw
DataXException
.
asDataXException
(
TSDBReaderErrorCode
.
ILLEGAL_VALUE
,
"The parameter ["
+
Key
.
COLUMN
+
"] should contain "
+
JSON
.
toJSONString
(
Constant
.
MUST_CONTAINED_SPECIFY_KEYS
)
+
"."
);
}
}
final
List
<
String
>
metrics
=
originalConfig
.
getList
(
Key
.
METRIC
,
String
.
class
);
if
(
metrics
==
null
||
metrics
.
isEmpty
())
{
throw
DataXException
.
asDataXException
(
TSDBReaderErrorCode
.
REQUIRED_VALUE
,
"The parameter ["
+
Key
.
METRIC
+
"] is not set."
);
}
}
Integer
splitIntervalMs
=
originalConfig
.
getInt
(
Key
.
INTERVAL_DATE_TIME
,
Key
.
INTERVAL_DATE_TIME_DEFAULT_VALUE
);
if
(
splitIntervalMs
<=
0
)
{
throw
DataXException
.
asDataXException
(
TSDBReaderErrorCode
.
ILLEGAL_VALUE
,
"The parameter ["
+
Key
.
INTERVAL_DATE_TIME
+
"] should be great than zero."
);
}
SimpleDateFormat
format
=
new
SimpleDateFormat
(
Constant
.
DEFAULT_DATA_FORMAT
);
String
startTime
=
originalConfig
.
getString
(
Key
.
BEGIN_DATE_TIME
);
Long
startDate
;
if
(
startTime
==
null
||
startTime
.
trim
().
length
()
==
0
)
{
throw
DataXException
.
asDataXException
(
TSDBReaderErrorCode
.
REQUIRED_VALUE
,
"The parameter ["
+
Key
.
BEGIN_DATE_TIME
+
"] is not set."
);
}
else
{
try
{
startDate
=
format
.
parse
(
startTime
).
getTime
();
}
catch
(
ParseException
e
)
{
throw
DataXException
.
asDataXException
(
TSDBReaderErrorCode
.
ILLEGAL_VALUE
,
"The parameter ["
+
Key
.
BEGIN_DATE_TIME
+
"] needs to conform to the ["
+
Constant
.
DEFAULT_DATA_FORMAT
+
"] format."
);
}
}
String
endTime
=
originalConfig
.
getString
(
Key
.
END_DATE_TIME
);
Long
endDate
;
if
(
endTime
==
null
||
endTime
.
trim
().
length
()
==
0
)
{
throw
DataXException
.
asDataXException
(
TSDBReaderErrorCode
.
REQUIRED_VALUE
,
"The parameter ["
+
Key
.
END_DATE_TIME
+
"] is not set."
);
}
else
{
try
{
endDate
=
format
.
parse
(
endTime
).
getTime
();
}
catch
(
ParseException
e
)
{
throw
DataXException
.
asDataXException
(
TSDBReaderErrorCode
.
ILLEGAL_VALUE
,
"The parameter ["
+
Key
.
END_DATE_TIME
+
"] needs to conform to the ["
+
Constant
.
DEFAULT_DATA_FORMAT
+
"] format."
);
}
}
if
(
startDate
>=
endDate
)
{
throw
DataXException
.
asDataXException
(
TSDBReaderErrorCode
.
ILLEGAL_VALUE
,
"The parameter ["
+
Key
.
BEGIN_DATE_TIME
+
"] should be less than the parameter ["
+
Key
.
END_DATE_TIME
+
"]."
);
}
}
@Override
public
void
prepare
()
{
}
@Override
public
List
<
Configuration
>
split
(
int
adviceNumber
)
{
List
<
Configuration
>
configurations
=
new
ArrayList
<>();
// get metrics
String
type
=
originalConfig
.
getString
(
Key
.
SINK_DB_TYPE
,
Key
.
TYPE_DEFAULT_VALUE
);
List
<
String
>
columns4TSDB
=
null
;
List
<
String
>
columns4RDB
=
null
;
List
<
String
>
metrics
=
null
;
if
(
"TSDB"
.
equals
(
type
))
{
columns4TSDB
=
originalConfig
.
getList
(
Key
.
COLUMN
,
String
.
class
);
}
else
{
columns4RDB
=
originalConfig
.
getList
(
Key
.
COLUMN
,
String
.
class
);
metrics
=
originalConfig
.
getList
(
Key
.
METRIC
,
String
.
class
);
}
// get time interval
Integer
splitIntervalMs
=
originalConfig
.
getInt
(
Key
.
INTERVAL_DATE_TIME
,
Key
.
INTERVAL_DATE_TIME_DEFAULT_VALUE
);
// get time range
SimpleDateFormat
format
=
new
SimpleDateFormat
(
Constant
.
DEFAULT_DATA_FORMAT
);
long
startTime
;
try
{
startTime
=
format
.
parse
(
originalConfig
.
getString
(
Key
.
BEGIN_DATE_TIME
)).
getTime
();
}
catch
(
ParseException
e
)
{
throw
DataXException
.
asDataXException
(
TSDBReaderErrorCode
.
ILLEGAL_VALUE
,
"解析["
+
Key
.
BEGIN_DATE_TIME
+
"]失败."
,
e
);
}
long
endTime
;
try
{
endTime
=
format
.
parse
(
originalConfig
.
getString
(
Key
.
END_DATE_TIME
)).
getTime
();
}
catch
(
ParseException
e
)
{
throw
DataXException
.
asDataXException
(
TSDBReaderErrorCode
.
ILLEGAL_VALUE
,
"解析["
+
Key
.
END_DATE_TIME
+
"]失败."
,
e
);
}
if
(
TimeUtils
.
isSecond
(
startTime
))
{
startTime
*=
1000
;
}
if
(
TimeUtils
.
isSecond
(
endTime
))
{
endTime
*=
1000
;
}
DateTime
startDateTime
=
new
DateTime
(
TimeUtils
.
getTimeInHour
(
startTime
));
DateTime
endDateTime
=
new
DateTime
(
TimeUtils
.
getTimeInHour
(
endTime
));
if
(
"TSDB"
.
equals
(
type
))
{
// split by metric
for
(
String
column
:
columns4TSDB
)
{
// split by time in hour
while
(
startDateTime
.
isBefore
(
endDateTime
))
{
Configuration
clone
=
this
.
originalConfig
.
clone
();
clone
.
set
(
Key
.
COLUMN
,
Collections
.
singletonList
(
column
));
clone
.
set
(
Key
.
BEGIN_DATE_TIME
,
startDateTime
.
getMillis
());
startDateTime
=
startDateTime
.
plusMillis
(
splitIntervalMs
);
// Make sure the time interval is [start, end).
clone
.
set
(
Key
.
END_DATE_TIME
,
startDateTime
.
getMillis
()
-
1
);
configurations
.
add
(
clone
);
LOG
.
info
(
"Configuration: {}"
,
JSON
.
toJSONString
(
clone
));
}
}
}
else
{
// split by metric
for
(
String
metric
:
metrics
)
{
// split by time in hour
while
(
startDateTime
.
isBefore
(
endDateTime
))
{
Configuration
clone
=
this
.
originalConfig
.
clone
();
clone
.
set
(
Key
.
COLUMN
,
columns4RDB
);
clone
.
set
(
Key
.
METRIC
,
Collections
.
singletonList
(
metric
));
clone
.
set
(
Key
.
BEGIN_DATE_TIME
,
startDateTime
.
getMillis
());
startDateTime
=
startDateTime
.
plusMillis
(
splitIntervalMs
);
// Make sure the time interval is [start, end).
clone
.
set
(
Key
.
END_DATE_TIME
,
startDateTime
.
getMillis
()
-
1
);
configurations
.
add
(
clone
);
LOG
.
info
(
"Configuration: {}"
,
JSON
.
toJSONString
(
clone
));
}
}
}
return
configurations
;
}
@Override
public
void
post
()
{
}
@Override
public
void
destroy
()
{
}
}
public
static
class
Task
extends
Reader
.
Task
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
Task
.
class
);
private
String
type
;
private
List
<
String
>
columns4TSDB
=
null
;
private
List
<
String
>
columns4RDB
=
null
;
private
List
<
String
>
metrics
=
null
;
private
Map
<
String
,
Object
>
fields
;
private
Map
<
String
,
Object
>
tags
;
private
TSDBConnection
conn
;
private
Long
startTime
;
private
Long
endTime
;
@Override
public
void
init
()
{
Configuration
readerSliceConfig
=
super
.
getPluginJobConf
();
LOG
.
info
(
"getPluginJobConf: {}"
,
JSON
.
toJSONString
(
readerSliceConfig
));
this
.
type
=
readerSliceConfig
.
getString
(
Key
.
SINK_DB_TYPE
);
if
(
"TSDB"
.
equals
(
type
))
{
columns4TSDB
=
readerSliceConfig
.
getList
(
Key
.
COLUMN
,
String
.
class
);
}
else
{
columns4RDB
=
readerSliceConfig
.
getList
(
Key
.
COLUMN
,
String
.
class
);
metrics
=
readerSliceConfig
.
getList
(
Key
.
METRIC
,
String
.
class
);
}
this
.
fields
=
readerSliceConfig
.
getMap
(
Key
.
FIELD
);
this
.
tags
=
readerSliceConfig
.
getMap
(
Key
.
TAG
);
String
address
=
readerSliceConfig
.
getString
(
Key
.
ENDPOINT
);
conn
=
new
TSDBConnection
(
address
);
this
.
startTime
=
readerSliceConfig
.
getLong
(
Key
.
BEGIN_DATE_TIME
);
this
.
endTime
=
readerSliceConfig
.
getLong
(
Key
.
END_DATE_TIME
);
}
@Override
public
void
prepare
()
{
}
@Override
@SuppressWarnings
(
"unchecked"
)
public
void
startRead
(
RecordSender
recordSender
)
{
try
{
if
(
"TSDB"
.
equals
(
type
))
{
for
(
String
metric
:
columns4TSDB
)
{
final
Map
<
String
,
String
>
tags
=
this
.
tags
==
null
?
null
:
(
Map
<
String
,
String
>)
this
.
tags
.
get
(
metric
);
if
(
fields
==
null
||
!
fields
.
containsKey
(
metric
))
{
conn
.
sendDPs
(
metric
,
tags
,
this
.
startTime
,
this
.
endTime
,
recordSender
);
}
else
{
conn
.
sendDPs
(
metric
,
(
List
<
String
>)
fields
.
get
(
metric
),
tags
,
this
.
startTime
,
this
.
endTime
,
recordSender
);
}
}
}
else
{
for
(
String
metric
:
metrics
)
{
final
Map
<
String
,
String
>
tags
=
this
.
tags
==
null
?
null
:
(
Map
<
String
,
String
>)
this
.
tags
.
get
(
metric
);
if
(
fields
==
null
||
!
fields
.
containsKey
(
metric
))
{
conn
.
sendRecords
(
metric
,
tags
,
startTime
,
endTime
,
columns4RDB
,
recordSender
);
}
else
{
conn
.
sendRecords
(
metric
,
(
List
<
String
>)
fields
.
get
(
metric
),
tags
,
startTime
,
endTime
,
columns4RDB
,
recordSender
);
}
}
}
}
catch
(
Exception
e
)
{
throw
DataXException
.
asDataXException
(
TSDBReaderErrorCode
.
ILLEGAL_VALUE
,
"获取或发送数据点的过程中出错!"
,
e
);
}
}
@Override
public
void
post
()
{
}
@Override
public
void
destroy
()
{
}
}
}
tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReaderErrorCode.java
0 → 100755
View file @
928300f3
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
tsdbreader
;
import
com.alibaba.datax.common.spi.ErrorCode
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:TSDB Reader Error Code
*
* @author Benedict Jin
* @since 2019-10-21
*/
public
enum
TSDBReaderErrorCode
implements
ErrorCode
{
REQUIRED_VALUE
(
"TSDBReader-00"
,
"缺失必要的值"
),
ILLEGAL_VALUE
(
"TSDBReader-01"
,
"值非法"
);
private
final
String
code
;
private
final
String
description
;
TSDBReaderErrorCode
(
String
code
,
String
description
)
{
this
.
code
=
code
;
this
.
description
=
description
;
}
@Override
public
String
getCode
()
{
return
this
.
code
;
}
@Override
public
String
getDescription
()
{
return
this
.
description
;
}
@Override
public
String
toString
()
{
return
String
.
format
(
"Code:[%s], Description:[%s]. "
,
this
.
code
,
this
.
description
);
}
}
tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/Connection4TSDB.java
0 → 100644
View file @
928300f3
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
tsdbreader
.
conn
;
import
com.alibaba.datax.common.plugin.RecordSender
;
import
java.util.List
;
import
java.util.Map
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Connection for TSDB-like databases
*
* @author Benedict Jin
* @since 2019-10-21
*/
public
interface
Connection4TSDB
{
/**
* Get the address of Database.
*
* @return host+ip
*/
String
address
();
/**
* Get the version of Database.
*
* @return version
*/
String
version
();
/**
* Get these configurations.
*
* @return configs
*/
String
config
();
/**
* Get the list of supported version.
*
* @return version list
*/
String
[]
getSupportVersionPrefix
();
/**
* Send data points for TSDB with single field.
*/
void
sendDPs
(
String
metric
,
Map
<
String
,
String
>
tags
,
Long
start
,
Long
end
,
RecordSender
recordSender
)
throws
Exception
;
/**
* Send data points for TSDB with multi fields.
*/
void
sendDPs
(
String
metric
,
List
<
String
>
fields
,
Map
<
String
,
String
>
tags
,
Long
start
,
Long
end
,
RecordSender
recordSender
)
throws
Exception
;
/**
* Send data points for RDB with single field.
*/
void
sendRecords
(
String
metric
,
Map
<
String
,
String
>
tags
,
Long
start
,
Long
end
,
List
<
String
>
columns4RDB
,
RecordSender
recordSender
)
throws
Exception
;
/**
* Send data points for RDB with multi fields.
*/
void
sendRecords
(
String
metric
,
List
<
String
>
fields
,
Map
<
String
,
String
>
tags
,
Long
start
,
Long
end
,
List
<
String
>
columns4RDB
,
RecordSender
recordSender
)
throws
Exception
;
/**
* Put data point.
*
* @param dp data point
* @return whether the data point is written successfully
*/
boolean
put
(
DataPoint4TSDB
dp
);
/**
* Put data points.
*
* @param dps data points
* @return whether the data point is written successfully
*/
boolean
put
(
List
<
DataPoint4TSDB
>
dps
);
/**
* Whether current version is supported.
*
* @return true: supported; false: not yet!
*/
boolean
isSupported
();
}
tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/DataPoint4MultiFieldsTSDB.java
0 → 100644
View file @
928300f3
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
tsdbreader
.
conn
;
import
com.alibaba.fastjson.JSON
;
import
java.util.Map
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:DataPoint for TSDB with Multi Fields
*
* @author Benedict Jin
* @since 2019-10-21
*/
public
class
DataPoint4MultiFieldsTSDB
{
private
long
timestamp
;
private
String
metric
;
private
Map
<
String
,
Object
>
tags
;
private
Map
<
String
,
Object
>
fields
;
public
DataPoint4MultiFieldsTSDB
()
{
}
public
DataPoint4MultiFieldsTSDB
(
long
timestamp
,
String
metric
,
Map
<
String
,
Object
>
tags
,
Map
<
String
,
Object
>
fields
)
{
this
.
timestamp
=
timestamp
;
this
.
metric
=
metric
;
this
.
tags
=
tags
;
this
.
fields
=
fields
;
}
public
long
getTimestamp
()
{
return
timestamp
;
}
public
void
setTimestamp
(
long
timestamp
)
{
this
.
timestamp
=
timestamp
;
}
public
String
getMetric
()
{
return
metric
;
}
public
void
setMetric
(
String
metric
)
{
this
.
metric
=
metric
;
}
public
Map
<
String
,
Object
>
getTags
()
{
return
tags
;
}
public
void
setTags
(
Map
<
String
,
Object
>
tags
)
{
this
.
tags
=
tags
;
}
public
Map
<
String
,
Object
>
getFields
()
{
return
fields
;
}
public
void
setFields
(
Map
<
String
,
Object
>
fields
)
{
this
.
fields
=
fields
;
}
@Override
public
String
toString
()
{
return
JSON
.
toJSONString
(
this
);
}
}
tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/DataPoint4TSDB.java
0 → 100644
View file @
928300f3
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
tsdbreader
.
conn
;
import
com.alibaba.fastjson.JSON
;
import
java.util.Map
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:DataPoint for TSDB
*
* @author Benedict Jin
* @since 2019-10-21
*/
public
class
DataPoint4TSDB
{
private
long
timestamp
;
private
String
metric
;
private
Map
<
String
,
Object
>
tags
;
private
Object
value
;
public
DataPoint4TSDB
()
{
}
public
DataPoint4TSDB
(
long
timestamp
,
String
metric
,
Map
<
String
,
Object
>
tags
,
Object
value
)
{
this
.
timestamp
=
timestamp
;
this
.
metric
=
metric
;
this
.
tags
=
tags
;
this
.
value
=
value
;
}
public
long
getTimestamp
()
{
return
timestamp
;
}
public
void
setTimestamp
(
long
timestamp
)
{
this
.
timestamp
=
timestamp
;
}
public
String
getMetric
()
{
return
metric
;
}
public
void
setMetric
(
String
metric
)
{
this
.
metric
=
metric
;
}
public
Map
<
String
,
Object
>
getTags
()
{
return
tags
;
}
public
void
setTags
(
Map
<
String
,
Object
>
tags
)
{
this
.
tags
=
tags
;
}
public
Object
getValue
()
{
return
value
;
}
public
void
setValue
(
Object
value
)
{
this
.
value
=
value
;
}
@Override
public
String
toString
()
{
return
JSON
.
toJSONString
(
this
);
}
}
tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/MultiFieldQueryResult.java
0 → 100644
View file @
928300f3
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
tsdbreader
.
conn
;
import
java.util.List
;
import
java.util.Map
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Multi Field Query Result
*
* @author Benedict Jin
* @since 2019-10-22
*/
public
class
MultiFieldQueryResult
{
private
String
metric
;
private
Map
<
String
,
Object
>
tags
;
private
List
<
String
>
aggregatedTags
;
private
List
<
String
>
columns
;
private
List
<
List
<
Object
>>
values
;
public
MultiFieldQueryResult
()
{
}
public
String
getMetric
()
{
return
metric
;
}
public
void
setMetric
(
String
metric
)
{
this
.
metric
=
metric
;
}
public
Map
<
String
,
Object
>
getTags
()
{
return
tags
;
}
public
void
setTags
(
Map
<
String
,
Object
>
tags
)
{
this
.
tags
=
tags
;
}
public
List
<
String
>
getAggregatedTags
()
{
return
aggregatedTags
;
}
public
void
setAggregatedTags
(
List
<
String
>
aggregatedTags
)
{
this
.
aggregatedTags
=
aggregatedTags
;
}
public
List
<
String
>
getColumns
()
{
return
columns
;
}
public
void
setColumns
(
List
<
String
>
columns
)
{
this
.
columns
=
columns
;
}
public
List
<
List
<
Object
>>
getValues
()
{
return
values
;
}
public
void
setValues
(
List
<
List
<
Object
>>
values
)
{
this
.
values
=
values
;
}
}
tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/QueryResult.java
0 → 100644
View file @
928300f3
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
tsdbreader
.
conn
;
import
java.util.List
;
import
java.util.Map
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Query Result
*
* @author Benedict Jin
* @since 2019-09-19
*/
public
class
QueryResult
{
private
String
metricName
;
private
Map
<
String
,
Object
>
tags
;
private
List
<
String
>
groupByTags
;
private
List
<
String
>
aggregatedTags
;
private
Map
<
String
,
Object
>
dps
;
public
QueryResult
()
{
}
public
String
getMetricName
()
{
return
metricName
;
}
public
void
setMetricName
(
String
metricName
)
{
this
.
metricName
=
metricName
;
}
public
Map
<
String
,
Object
>
getTags
()
{
return
tags
;
}
public
void
setTags
(
Map
<
String
,
Object
>
tags
)
{
this
.
tags
=
tags
;
}
public
List
<
String
>
getGroupByTags
()
{
return
groupByTags
;
}
public
void
setGroupByTags
(
List
<
String
>
groupByTags
)
{
this
.
groupByTags
=
groupByTags
;
}
public
List
<
String
>
getAggregatedTags
()
{
return
aggregatedTags
;
}
public
void
setAggregatedTags
(
List
<
String
>
aggregatedTags
)
{
this
.
aggregatedTags
=
aggregatedTags
;
}
public
Map
<
String
,
Object
>
getDps
()
{
return
dps
;
}
public
void
setDps
(
Map
<
String
,
Object
>
dps
)
{
this
.
dps
=
dps
;
}
}
tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnection.java
0 → 100644
View file @
928300f3
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
tsdbreader
.
conn
;
import
com.alibaba.datax.common.plugin.RecordSender
;
import
com.alibaba.datax.plugin.reader.tsdbreader.util.TSDBUtils
;
import
com.alibaba.fastjson.JSON
;
import
org.apache.commons.lang3.StringUtils
;
import
java.util.List
;
import
java.util.Map
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:TSDB Connection
*
* @author Benedict Jin
* @since 2019-10-21
*/
public
class
TSDBConnection
implements
Connection4TSDB
{
private
String
address
;
public
TSDBConnection
(
String
address
)
{
this
.
address
=
address
;
}
@Override
public
String
address
()
{
return
address
;
}
@Override
public
String
version
()
{
return
TSDBUtils
.
version
(
address
);
}
@Override
public
String
config
()
{
return
TSDBUtils
.
config
(
address
);
}
@Override
public
String
[]
getSupportVersionPrefix
()
{
return
new
String
[]{
"2.4"
,
"2.5"
};
}
@Override
public
void
sendDPs
(
String
metric
,
Map
<
String
,
String
>
tags
,
Long
start
,
Long
end
,
RecordSender
recordSender
)
throws
Exception
{
TSDBDump
.
dump4TSDB
(
this
,
metric
,
tags
,
start
,
end
,
recordSender
);
}
@Override
public
void
sendDPs
(
String
metric
,
List
<
String
>
fields
,
Map
<
String
,
String
>
tags
,
Long
start
,
Long
end
,
RecordSender
recordSender
)
throws
Exception
{
TSDBDump
.
dump4TSDB
(
this
,
metric
,
fields
,
tags
,
start
,
end
,
recordSender
);
}
@Override
public
void
sendRecords
(
String
metric
,
Map
<
String
,
String
>
tags
,
Long
start
,
Long
end
,
List
<
String
>
columns4RDB
,
RecordSender
recordSender
)
throws
Exception
{
TSDBDump
.
dump4RDB
(
this
,
metric
,
tags
,
start
,
end
,
columns4RDB
,
recordSender
);
}
@Override
public
void
sendRecords
(
String
metric
,
List
<
String
>
fields
,
Map
<
String
,
String
>
tags
,
Long
start
,
Long
end
,
List
<
String
>
columns4RDB
,
RecordSender
recordSender
)
throws
Exception
{
TSDBDump
.
dump4RDB
(
this
,
metric
,
fields
,
tags
,
start
,
end
,
columns4RDB
,
recordSender
);
}
@Override
public
boolean
put
(
DataPoint4TSDB
dp
)
{
return
false
;
}
@Override
public
boolean
put
(
List
<
DataPoint4TSDB
>
dps
)
{
return
false
;
}
@Override
public
boolean
isSupported
()
{
String
versionJson
=
version
();
if
(
StringUtils
.
isBlank
(
versionJson
))
{
throw
new
RuntimeException
(
"Cannot get the version!"
);
}
String
version
=
JSON
.
parseObject
(
versionJson
).
getString
(
"version"
);
if
(
StringUtils
.
isBlank
(
version
))
{
return
false
;
}
for
(
String
prefix
:
getSupportVersionPrefix
())
{
if
(
version
.
startsWith
(
prefix
))
{
return
true
;
}
}
return
false
;
}
}
tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBDump.java
0 → 100644
View file @
928300f3
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
tsdbreader
.
conn
;
import
com.alibaba.datax.common.element.*
;
import
com.alibaba.datax.common.plugin.RecordSender
;
import
com.alibaba.datax.plugin.reader.tsdbreader.Constant
;
import
com.alibaba.datax.plugin.reader.tsdbreader.util.HttpUtils
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.parser.Feature
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.HashMap
;
import
java.util.LinkedList
;
import
java.util.List
;
import
java.util.Map
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:TSDB Dump
*
* @author Benedict Jin
* @since 2019-10-21
*/
final
class
TSDBDump
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
TSDBDump
.
class
);
private
static
final
String
QUERY
=
"/api/query"
;
private
static
final
String
QUERY_MULTI_FIELD
=
"/api/mquery"
;
static
{
JSON
.
DEFAULT_PARSER_FEATURE
&=
~
Feature
.
UseBigDecimal
.
getMask
();
}
private
TSDBDump
()
{
}
static
void
dump4TSDB
(
TSDBConnection
conn
,
String
metric
,
Map
<
String
,
String
>
tags
,
Long
start
,
Long
end
,
RecordSender
sender
)
throws
Exception
{
LOG
.
info
(
"conn address: {}, metric: {}, start: {}, end: {}"
,
conn
.
address
(),
metric
,
start
,
end
);
String
res
=
queryRange4SingleField
(
conn
,
metric
,
tags
,
start
,
end
);
List
<
String
>
dps
=
getDps4TSDB
(
metric
,
res
);
if
(
dps
==
null
||
dps
.
isEmpty
())
{
return
;
}
sendTSDBDps
(
sender
,
dps
);
}
static
void
dump4TSDB
(
TSDBConnection
conn
,
String
metric
,
List
<
String
>
fields
,
Map
<
String
,
String
>
tags
,
Long
start
,
Long
end
,
RecordSender
sender
)
throws
Exception
{
LOG
.
info
(
"conn address: {}, metric: {}, start: {}, end: {}"
,
conn
.
address
(),
metric
,
start
,
end
);
String
res
=
queryRange4MultiFields
(
conn
,
metric
,
fields
,
tags
,
start
,
end
);
List
<
String
>
dps
=
getDps4TSDB
(
metric
,
fields
,
res
);
if
(
dps
==
null
||
dps
.
isEmpty
())
{
return
;
}
sendTSDBDps
(
sender
,
dps
);
}
static
void
dump4RDB
(
TSDBConnection
conn
,
String
metric
,
Map
<
String
,
String
>
tags
,
Long
start
,
Long
end
,
List
<
String
>
columns4RDB
,
RecordSender
sender
)
throws
Exception
{
LOG
.
info
(
"conn address: {}, metric: {}, start: {}, end: {}"
,
conn
.
address
(),
metric
,
start
,
end
);
String
res
=
queryRange4SingleField
(
conn
,
metric
,
tags
,
start
,
end
);
List
<
DataPoint4TSDB
>
dps
=
getDps4RDB
(
metric
,
res
);
if
(
dps
==
null
||
dps
.
isEmpty
())
{
return
;
}
for
(
DataPoint4TSDB
dp
:
dps
)
{
final
Record
record
=
sender
.
createRecord
();
final
Map
<
String
,
Object
>
tagKV
=
dp
.
getTags
();
for
(
String
column
:
columns4RDB
)
{
if
(
Constant
.
METRIC_SPECIFY_KEY
.
equals
(
column
))
{
record
.
addColumn
(
new
StringColumn
(
dp
.
getMetric
()));
}
else
if
(
Constant
.
TS_SPECIFY_KEY
.
equals
(
column
))
{
record
.
addColumn
(
new
LongColumn
(
dp
.
getTimestamp
()));
}
else
if
(
Constant
.
VALUE_SPECIFY_KEY
.
equals
(
column
))
{
record
.
addColumn
(
getColumn
(
dp
.
getValue
()));
}
else
{
final
Object
tagk
=
tagKV
.
get
(
column
);
if
(
tagk
==
null
)
{
continue
;
}
record
.
addColumn
(
getColumn
(
tagk
));
}
}
sender
.
sendToWriter
(
record
);
}
}
static
void
dump4RDB
(
TSDBConnection
conn
,
String
metric
,
List
<
String
>
fields
,
Map
<
String
,
String
>
tags
,
Long
start
,
Long
end
,
List
<
String
>
columns4RDB
,
RecordSender
sender
)
throws
Exception
{
LOG
.
info
(
"conn address: {}, metric: {}, start: {}, end: {}"
,
conn
.
address
(),
metric
,
start
,
end
);
String
res
=
queryRange4MultiFields
(
conn
,
metric
,
fields
,
tags
,
start
,
end
);
List
<
DataPoint4TSDB
>
dps
=
getDps4RDB
(
metric
,
fields
,
res
);
if
(
dps
==
null
||
dps
.
isEmpty
())
{
return
;
}
for
(
DataPoint4TSDB
dp
:
dps
)
{
final
Record
record
=
sender
.
createRecord
();
final
Map
<
String
,
Object
>
tagKV
=
dp
.
getTags
();
for
(
String
column
:
columns4RDB
)
{
if
(
Constant
.
METRIC_SPECIFY_KEY
.
equals
(
column
))
{
record
.
addColumn
(
new
StringColumn
(
dp
.
getMetric
()));
}
else
if
(
Constant
.
TS_SPECIFY_KEY
.
equals
(
column
))
{
record
.
addColumn
(
new
LongColumn
(
dp
.
getTimestamp
()));
}
else
{
final
Object
tagvOrField
=
tagKV
.
get
(
column
);
if
(
tagvOrField
==
null
)
{
continue
;
}
record
.
addColumn
(
getColumn
(
tagvOrField
));
}
}
sender
.
sendToWriter
(
record
);
}
}
private
static
Column
getColumn
(
Object
value
)
throws
Exception
{
Column
valueColumn
;
if
(
value
instanceof
Double
)
{
valueColumn
=
new
DoubleColumn
((
Double
)
value
);
}
else
if
(
value
instanceof
Long
)
{
valueColumn
=
new
LongColumn
((
Long
)
value
);
}
else
if
(
value
instanceof
String
)
{
valueColumn
=
new
StringColumn
((
String
)
value
);
}
else
{
throw
new
Exception
(
String
.
format
(
"value 不支持类型: [%s]"
,
value
.
getClass
().
getSimpleName
()));
}
return
valueColumn
;
}
private
static
String
queryRange4SingleField
(
TSDBConnection
conn
,
String
metric
,
Map
<
String
,
String
>
tags
,
Long
start
,
Long
end
)
throws
Exception
{
String
tagKV
=
getFilterByTags
(
tags
);
String
body
=
"{\n"
+
" \"start\": "
+
start
+
",\n"
+
" \"end\": "
+
end
+
",\n"
+
" \"queries\": [\n"
+
" {\n"
+
" \"aggregator\": \"none\",\n"
+
" \"metric\": \""
+
metric
+
"\"\n"
+
(
tagKV
==
null
?
""
:
tagKV
)
+
" }\n"
+
" ]\n"
+
"}"
;
return
HttpUtils
.
post
(
conn
.
address
()
+
QUERY
,
body
);
}
private
static
String
queryRange4MultiFields
(
TSDBConnection
conn
,
String
metric
,
List
<
String
>
fields
,
Map
<
String
,
String
>
tags
,
Long
start
,
Long
end
)
throws
Exception
{
// fields
StringBuilder
fieldBuilder
=
new
StringBuilder
();
fieldBuilder
.
append
(
"\"fields\":["
);
for
(
int
i
=
0
;
i
<
fields
.
size
();
i
++)
{
fieldBuilder
.
append
(
"{\"field\": \""
).
append
(
fields
.
get
(
i
)).
append
(
"\",\"aggregator\": \"none\"}"
);
if
(
i
!=
fields
.
size
()
-
1
)
{
fieldBuilder
.
append
(
","
);
}
}
fieldBuilder
.
append
(
"]"
);
// tagkv
String
tagKV
=
getFilterByTags
(
tags
);
String
body
=
"{\n"
+
" \"start\": "
+
start
+
",\n"
+
" \"end\": "
+
end
+
",\n"
+
" \"queries\": [\n"
+
" {\n"
+
" \"aggregator\": \"none\",\n"
+
" \"metric\": \""
+
metric
+
"\",\n"
+
fieldBuilder
.
toString
()
+
(
tagKV
==
null
?
""
:
tagKV
)
+
" }\n"
+
" ]\n"
+
"}"
;
return
HttpUtils
.
post
(
conn
.
address
()
+
QUERY_MULTI_FIELD
,
body
);
}
private
static
String
getFilterByTags
(
Map
<
String
,
String
>
tags
)
{
if
(
tags
!=
null
&&
!
tags
.
isEmpty
())
{
// tagKV = ",\"tags:\":" + JSON.toJSONString(tags);
StringBuilder
tagBuilder
=
new
StringBuilder
();
tagBuilder
.
append
(
",\"filters\":["
);
int
count
=
1
;
final
int
size
=
tags
.
size
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
tags
.
entrySet
())
{
final
String
tagK
=
entry
.
getKey
();
final
String
tagV
=
entry
.
getValue
();
tagBuilder
.
append
(
"{\"type\":\"literal_or\",\"tagk\":\""
).
append
(
tagK
)
.
append
(
"\",\"filter\":\""
).
append
(
tagV
).
append
(
"\",\"groupBy\":false}"
);
if
(
count
!=
size
)
{
tagBuilder
.
append
(
","
);
}
count
++;
}
tagBuilder
.
append
(
"]"
);
return
tagBuilder
.
toString
();
}
return
null
;
}
private
static
List
<
String
>
getDps4TSDB
(
String
metric
,
String
dps
)
{
final
List
<
QueryResult
>
jsonArray
=
JSON
.
parseArray
(
dps
,
QueryResult
.
class
);
if
(
jsonArray
.
size
()
==
0
)
{
return
null
;
}
List
<
String
>
dpsArr
=
new
LinkedList
<>();
for
(
QueryResult
queryResult
:
jsonArray
)
{
final
Map
<
String
,
Object
>
tags
=
queryResult
.
getTags
();
final
Map
<
String
,
Object
>
points
=
queryResult
.
getDps
();
for
(
Map
.
Entry
<
String
,
Object
>
entry
:
points
.
entrySet
())
{
final
String
ts
=
entry
.
getKey
();
final
Object
value
=
entry
.
getValue
();
DataPoint4TSDB
dp
=
new
DataPoint4TSDB
();
dp
.
setMetric
(
metric
);
dp
.
setTags
(
tags
);
dp
.
setTimestamp
(
Long
.
parseLong
(
ts
));
dp
.
setValue
(
value
);
dpsArr
.
add
(
dp
.
toString
());
}
}
return
dpsArr
;
}
private
static
List
<
String
>
getDps4TSDB
(
String
metric
,
List
<
String
>
fields
,
String
dps
)
{
final
List
<
MultiFieldQueryResult
>
jsonArray
=
JSON
.
parseArray
(
dps
,
MultiFieldQueryResult
.
class
);
if
(
jsonArray
.
size
()
==
0
)
{
return
null
;
}
List
<
String
>
dpsArr
=
new
LinkedList
<>();
for
(
MultiFieldQueryResult
queryResult
:
jsonArray
)
{
final
Map
<
String
,
Object
>
tags
=
queryResult
.
getTags
();
final
List
<
List
<
Object
>>
values
=
queryResult
.
getValues
();
for
(
List
<
Object
>
value
:
values
)
{
final
String
ts
=
value
.
get
(
0
).
toString
();
Map
<
String
,
Object
>
fieldsAndValues
=
new
HashMap
<>();
for
(
int
i
=
0
;
i
<
fields
.
size
();
i
++)
{
fieldsAndValues
.
put
(
fields
.
get
(
i
),
value
.
get
(
i
+
1
));
}
final
DataPoint4MultiFieldsTSDB
dp
=
new
DataPoint4MultiFieldsTSDB
();
dp
.
setMetric
(
metric
);
dp
.
setTimestamp
(
Long
.
parseLong
(
ts
));
dp
.
setTags
(
tags
);
dp
.
setFields
(
fieldsAndValues
);
dpsArr
.
add
(
dp
.
toString
());
}
}
return
dpsArr
;
}
private
static
List
<
DataPoint4TSDB
>
getDps4RDB
(
String
metric
,
String
dps
)
{
final
List
<
QueryResult
>
jsonArray
=
JSON
.
parseArray
(
dps
,
QueryResult
.
class
);
if
(
jsonArray
.
size
()
==
0
)
{
return
null
;
}
List
<
DataPoint4TSDB
>
dpsArr
=
new
LinkedList
<>();
for
(
QueryResult
queryResult
:
jsonArray
)
{
final
Map
<
String
,
Object
>
tags
=
queryResult
.
getTags
();
final
Map
<
String
,
Object
>
points
=
queryResult
.
getDps
();
for
(
Map
.
Entry
<
String
,
Object
>
entry
:
points
.
entrySet
())
{
final
String
ts
=
entry
.
getKey
();
final
Object
value
=
entry
.
getValue
();
final
DataPoint4TSDB
dp
=
new
DataPoint4TSDB
();
dp
.
setMetric
(
metric
);
dp
.
setTags
(
tags
);
dp
.
setTimestamp
(
Long
.
parseLong
(
ts
));
dp
.
setValue
(
value
);
dpsArr
.
add
(
dp
);
}
}
return
dpsArr
;
}
private
static
List
<
DataPoint4TSDB
>
getDps4RDB
(
String
metric
,
List
<
String
>
fields
,
String
dps
)
{
final
List
<
MultiFieldQueryResult
>
jsonArray
=
JSON
.
parseArray
(
dps
,
MultiFieldQueryResult
.
class
);
if
(
jsonArray
.
size
()
==
0
)
{
return
null
;
}
List
<
DataPoint4TSDB
>
dpsArr
=
new
LinkedList
<>();
for
(
MultiFieldQueryResult
queryResult
:
jsonArray
)
{
final
Map
<
String
,
Object
>
tags
=
queryResult
.
getTags
();
final
List
<
List
<
Object
>>
values
=
queryResult
.
getValues
();
for
(
List
<
Object
>
value
:
values
)
{
final
String
ts
=
value
.
get
(
0
).
toString
();
Map
<
String
,
Object
>
tagsTmp
=
new
HashMap
<>(
tags
);
for
(
int
i
=
0
;
i
<
fields
.
size
();
i
++)
{
tagsTmp
.
put
(
fields
.
get
(
i
),
value
.
get
(
i
+
1
));
}
final
DataPoint4TSDB
dp
=
new
DataPoint4TSDB
();
dp
.
setMetric
(
metric
);
dp
.
setTimestamp
(
Long
.
parseLong
(
ts
));
dp
.
setTags
(
tagsTmp
);
dpsArr
.
add
(
dp
);
}
}
return
dpsArr
;
}
private
static
void
sendTSDBDps
(
RecordSender
sender
,
List
<
String
>
dps
)
{
for
(
String
dp
:
dps
)
{
StringColumn
tsdbColumn
=
new
StringColumn
(
dp
);
Record
record
=
sender
.
createRecord
();
record
.
addColumn
(
tsdbColumn
);
sender
.
sendToWriter
(
record
);
}
}
}
tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/HttpUtils.java
0 → 100644
View file @
928300f3
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
tsdbreader
.
util
;
import
com.alibaba.fastjson.JSON
;
import
org.apache.http.client.fluent.Content
;
import
org.apache.http.client.fluent.Request
;
import
org.apache.http.entity.ContentType
;
import
java.nio.charset.StandardCharsets
;
import
java.util.Map
;
import
java.util.concurrent.TimeUnit
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:HttpUtils
*
* @author Benedict Jin
* @since 2019-10-21
*/
public
final
class
HttpUtils
{
public
final
static
int
CONNECT_TIMEOUT_DEFAULT_IN_MILL
=
(
int
)
TimeUnit
.
SECONDS
.
toMillis
(
60
);
public
final
static
int
SOCKET_TIMEOUT_DEFAULT_IN_MILL
=
(
int
)
TimeUnit
.
SECONDS
.
toMillis
(
60
);
private
HttpUtils
()
{
}
public
static
String
get
(
String
url
)
throws
Exception
{
Content
content
=
Request
.
Get
(
url
)
.
connectTimeout
(
CONNECT_TIMEOUT_DEFAULT_IN_MILL
)
.
socketTimeout
(
SOCKET_TIMEOUT_DEFAULT_IN_MILL
)
.
execute
()
.
returnContent
();
if
(
content
==
null
)
{
return
null
;
}
return
content
.
asString
(
StandardCharsets
.
UTF_8
);
}
public
static
String
post
(
String
url
,
Map
<
String
,
Object
>
params
)
throws
Exception
{
return
post
(
url
,
JSON
.
toJSONString
(
params
),
CONNECT_TIMEOUT_DEFAULT_IN_MILL
,
SOCKET_TIMEOUT_DEFAULT_IN_MILL
);
}
public
static
String
post
(
String
url
,
String
params
)
throws
Exception
{
return
post
(
url
,
params
,
CONNECT_TIMEOUT_DEFAULT_IN_MILL
,
SOCKET_TIMEOUT_DEFAULT_IN_MILL
);
}
public
static
String
post
(
String
url
,
Map
<
String
,
Object
>
params
,
int
connectTimeoutInMill
,
int
socketTimeoutInMill
)
throws
Exception
{
return
post
(
url
,
JSON
.
toJSONString
(
params
),
connectTimeoutInMill
,
socketTimeoutInMill
);
}
public
static
String
post
(
String
url
,
String
params
,
int
connectTimeoutInMill
,
int
socketTimeoutInMill
)
throws
Exception
{
Content
content
=
Request
.
Post
(
url
)
.
connectTimeout
(
connectTimeoutInMill
)
.
socketTimeout
(
socketTimeoutInMill
)
.
addHeader
(
"Content-Type"
,
"application/json"
)
.
bodyString
(
params
,
ContentType
.
APPLICATION_JSON
)
.
execute
()
.
returnContent
();
if
(
content
==
null
)
{
return
null
;
}
return
content
.
asString
(
StandardCharsets
.
UTF_8
);
}
}
tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TSDBUtils.java
0 → 100644
View file @
928300f3
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
tsdbreader
.
util
;
import
com.alibaba.datax.plugin.reader.tsdbreader.conn.DataPoint4TSDB
;
import
com.alibaba.fastjson.JSON
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.List
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:TSDB Utils
*
* @author Benedict Jin
* @since 2019-10-21
*/
public
final
class
TSDBUtils
{
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
TSDBUtils
.
class
);
private
TSDBUtils
()
{
}
public
static
String
version
(
String
address
)
{
String
url
=
String
.
format
(
"%s/api/version"
,
address
);
String
rsp
;
try
{
rsp
=
HttpUtils
.
get
(
url
);
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
e
);
}
return
rsp
;
}
public
static
String
config
(
String
address
)
{
String
url
=
String
.
format
(
"%s/api/config"
,
address
);
String
rsp
;
try
{
rsp
=
HttpUtils
.
get
(
url
);
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
e
);
}
return
rsp
;
}
public
static
boolean
put
(
String
address
,
List
<
DataPoint4TSDB
>
dps
)
{
return
put
(
address
,
JSON
.
toJSON
(
dps
));
}
public
static
boolean
put
(
String
address
,
DataPoint4TSDB
dp
)
{
return
put
(
address
,
JSON
.
toJSON
(
dp
));
}
private
static
boolean
put
(
String
address
,
Object
o
)
{
String
url
=
String
.
format
(
"%s/api/put"
,
address
);
String
rsp
;
try
{
rsp
=
HttpUtils
.
post
(
url
,
o
.
toString
());
// If successful, the returned content should be null.
assert
rsp
==
null
;
}
catch
(
Exception
e
)
{
LOGGER
.
error
(
"Address: {}, DataPoints: {}"
,
url
,
o
);
throw
new
RuntimeException
(
e
);
}
return
true
;
}
}
tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TimeUtils.java
0 → 100644
View file @
928300f3
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
tsdbreader
.
util
;
import
java.util.concurrent.TimeUnit
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:TimeUtils
*
* @author Benedict Jin
* @since 2019-10-21
*/
public
final
class
TimeUtils
{
private
TimeUtils
()
{
}
private
static
final
long
SECOND_MASK
=
0xFFFFFFFF00000000
L
;
private
static
final
long
HOUR_IN_MILL
=
TimeUnit
.
HOURS
.
toMillis
(
1
);
/**
* Weather the timestamp is second.
*
* @param ts timestamp
*/
public
static
boolean
isSecond
(
long
ts
)
{
return
(
ts
&
SECOND_MASK
)
==
0
;
}
/**
* Get the hour.
*
* @param ms time in millisecond
*/
public
static
long
getTimeInHour
(
long
ms
)
{
return
ms
-
ms
%
HOUR_IN_MILL
;
}
}
tsdbreader/src/main/resources/plugin.json
0 → 100755
View file @
928300f3
{
"name"
:
"tsdbreader"
,
"class"
:
"com.alibaba.datax.plugin.reader.tsdbreader.TSDBReader"
,
"description"
:
{
"useScene"
:
"从 TSDB 中摄取数据点"
,
"mechanism"
:
"通过 /api/query 接口查询出符合条件的数据点"
,
"warn"
:
"指定起止时间会自动忽略分钟和秒,转为整点时刻,例如 2019-4-18 的 [3:35, 4:55) 会被转为 [3:00, 4:00)"
},
"developer"
:
"Benedict Jin"
}
tsdbreader/src/main/resources/plugin_job_template.json
0 → 100644
View file @
928300f3
{
"name"
:
"tsdbreader"
,
"parameter"
:
{
"sinkDbType"
:
"RDB"
,
"endpoint"
:
"http://localhost:8242"
,
"column"
:
[
"__metric__"
,
"__ts__"
,
"app"
,
"cluster"
,
"group"
,
"ip"
,
"zone"
,
"__value__"
],
"metric"
:
[
"m"
],
"tag"
:
{
"m"
:
{
"app"
:
"a1"
,
"cluster"
:
"c1"
}
},
"splitIntervalMs"
:
60000
,
"beginDateTime"
:
"2019-01-01 00:00:00"
,
"endDateTime"
:
"2019-01-01 01:00:00"
}
}
tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnectionTest.java
0 → 100644
View file @
928300f3
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
tsdbreader
.
conn
;
import
org.junit.Assert
;
import
org.junit.Ignore
;
import
org.junit.Test
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:TSDB Connection4TSDB Test
*
* @author Benedict Jin
* @since 2019-10-21
*/
@Ignore
public
class
TSDBConnectionTest
{
private
static
final
String
TSDB_ADDRESS
=
"http://localhost:8242"
;
@Test
public
void
testVersion
()
{
String
version
=
new
TSDBConnection
(
TSDB_ADDRESS
).
version
();
Assert
.
assertNotNull
(
version
);
}
@Test
public
void
testIsSupported
()
{
Assert
.
assertTrue
(
new
TSDBConnection
(
TSDB_ADDRESS
).
isSupported
());
}
}
tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/util/Const.java
0 → 100644
View file @
928300f3
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
tsdbreader
.
util
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Const
*
* @author Benedict Jin
* @since 2019-10-21
*/
final
class
Const
{
private
Const
()
{
}
static
final
String
TSDB_ADDRESS
=
"http://localhost:8242"
;
}
tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/util/HttpUtilsTest.java
0 → 100644
View file @
928300f3
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
tsdbreader
.
util
;
import
org.junit.Assert
;
import
org.junit.Ignore
;
import
org.junit.Test
;
import
java.util.HashMap
;
import
java.util.Map
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:HttpUtils Test
*
* @author Benedict Jin
* @since 2019-10-21
*/
@Ignore
public
class
HttpUtilsTest
{
@Test
public
void
testSimpleCase
()
throws
Exception
{
String
url
=
"https://httpbin.org/post"
;
Map
<
String
,
Object
>
params
=
new
HashMap
<>();
params
.
put
(
"foo"
,
"bar"
);
String
rsp
=
HttpUtils
.
post
(
url
,
params
);
System
.
out
.
println
(
rsp
);
Assert
.
assertNotNull
(
rsp
);
}
@Test
public
void
testGet
()
throws
Exception
{
String
url
=
String
.
format
(
"%s/api/version"
,
Const
.
TSDB_ADDRESS
);
String
rsp
=
HttpUtils
.
get
(
url
);
System
.
out
.
println
(
rsp
);
Assert
.
assertNotNull
(
rsp
);
}
}
tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TimeUtilsTest.java
0 → 100644
View file @
928300f3
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
tsdbreader
.
util
;
import
org.junit.Assert
;
import
org.junit.Test
;
import
java.text.ParseException
;
import
java.text.SimpleDateFormat
;
import
java.util.Date
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:com.alibaba.datax.common.util
*
* @author Benedict Jin
* @since 2019-10-21
*/
public
class
TimeUtilsTest
{
@Test
public
void
testIsSecond
()
{
Assert
.
assertFalse
(
TimeUtils
.
isSecond
(
System
.
currentTimeMillis
()));
Assert
.
assertTrue
(
TimeUtils
.
isSecond
(
System
.
currentTimeMillis
()
/
1000
));
}
@Test
public
void
testGetTimeInHour
()
throws
ParseException
{
SimpleDateFormat
sdf
=
new
SimpleDateFormat
(
"yyyy-MM-dd HH:mm:ss"
);
Date
date
=
sdf
.
parse
(
"2019-04-18 15:32:33"
);
long
timeInHour
=
TimeUtils
.
getTimeInHour
(
date
.
getTime
());
Assert
.
assertEquals
(
"2019-04-18 15:00:00"
,
sdf
.
format
(
timeInHour
));
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment