Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
D
DataX
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
risk-feature
DataX
Commits
4d70b5ab
Commit
4d70b5ab
authored
Apr 26, 2019
by
asdf2014
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Add OpenTSDB reader and TSDB writer
parent
eb1ea0bc
Changes
44
Hide whitespace changes
Inline
Side-by-side
Showing
44 changed files
with
2776 additions
and
6 deletions
+2776
-6
.gitignore
.gitignore
+155
-6
opentsdbreader.md
opentsdbreader/doc/opentsdbreader.md
+209
-0
pom.xml
opentsdbreader/pom.xml
+156
-0
package.xml
opentsdbreader/src/main/assembly/package.xml
+35
-0
CliQuery.java
...n/java/com/alibaba/datax/plugin/reader/conn/CliQuery.java
+104
-0
Connection4TSDB.java
...com/alibaba/datax/plugin/reader/conn/Connection4TSDB.java
+77
-0
DataPoint4TSDB.java
.../com/alibaba/datax/plugin/reader/conn/DataPoint4TSDB.java
+68
-0
DumpSeries.java
...java/com/alibaba/datax/plugin/reader/conn/DumpSeries.java
+96
-0
OpenTSDBConnection.java
.../alibaba/datax/plugin/reader/conn/OpenTSDBConnection.java
+78
-0
OpenTSDBDump.java
...va/com/alibaba/datax/plugin/reader/conn/OpenTSDBDump.java
+48
-0
Constant.java
.../alibaba/datax/plugin/reader/opentsdbreader/Constant.java
+14
-0
Key.java
...a/com/alibaba/datax/plugin/reader/opentsdbreader/Key.java
+17
-0
OpenTSDBReader.java
...ba/datax/plugin/reader/opentsdbreader/OpenTSDBReader.java
+207
-0
OpenTSDBReaderErrorCode.java
...plugin/reader/opentsdbreader/OpenTSDBReaderErrorCode.java
+40
-0
HttpUtils.java
.../java/com/alibaba/datax/plugin/reader/util/HttpUtils.java
+68
-0
TSDBUtils.java
.../java/com/alibaba/datax/plugin/reader/util/TSDBUtils.java
+68
-0
TimeUtils.java
.../java/com/alibaba/datax/plugin/reader/util/TimeUtils.java
+38
-0
plugin.json
opentsdbreader/src/main/resources/plugin.json
+10
-0
plugin_job_template.json
opentsdbreader/src/main/resources/plugin_job_template.json
+11
-0
OpenTSDBConnectionTest.java
...baba/datax/plugin/reader/conn/OpenTSDBConnectionTest.java
+30
-0
Const.java
...test/java/com/alibaba/datax/plugin/reader/util/Const.java
+18
-0
HttpUtilsTest.java
...a/com/alibaba/datax/plugin/reader/util/HttpUtilsTest.java
+39
-0
TSDBTest.java
...t/java/com/alibaba/datax/plugin/reader/util/TSDBTest.java
+28
-0
TimeUtilsTest.java
...a/com/alibaba/datax/plugin/reader/util/TimeUtilsTest.java
+33
-0
package.xml
package.xml
+14
-0
pom.xml
pom.xml
+2
-0
tsdbhttpwriter.md
tsdbwriter/doc/tsdbhttpwriter.md
+187
-0
pom.xml
tsdbwriter/pom.xml
+136
-0
package.xml
tsdbwriter/src/main/assembly/package.xml
+35
-0
Connection4TSDB.java
...com/alibaba/datax/plugin/writer/conn/Connection4TSDB.java
+85
-0
DataPoint4TSDB.java
.../com/alibaba/datax/plugin/writer/conn/DataPoint4TSDB.java
+68
-0
TSDBConnection.java
.../com/alibaba/datax/plugin/writer/conn/TSDBConnection.java
+86
-0
Constant.java
.../com/alibaba/datax/plugin/writer/tsdbwriter/Constant.java
+16
-0
Key.java
.../java/com/alibaba/datax/plugin/writer/tsdbwriter/Key.java
+17
-0
TSDBWriter.java
...om/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriter.java
+171
-0
TSDBWriterErrorCode.java
...a/datax/plugin/writer/tsdbwriter/TSDBWriterErrorCode.java
+41
-0
HttpUtils.java
.../java/com/alibaba/datax/plugin/writer/util/HttpUtils.java
+68
-0
TSDBUtils.java
.../java/com/alibaba/datax/plugin/writer/util/TSDBUtils.java
+72
-0
plugin.json
tsdbwriter/src/main/resources/plugin.json
+10
-0
plugin_job_template.json
tsdbwriter/src/main/resources/plugin_job_template.json
+6
-0
TSDBConnectionTest.java
.../alibaba/datax/plugin/writer/conn/TSDBConnectionTest.java
+30
-0
Const.java
...test/java/com/alibaba/datax/plugin/writer/util/Const.java
+18
-0
HttpUtilsTest.java
...a/com/alibaba/datax/plugin/writer/util/HttpUtilsTest.java
+39
-0
TSDBTest.java
...t/java/com/alibaba/datax/plugin/writer/util/TSDBTest.java
+28
-0
No files found.
.gitignore
View file @
4d70b5ab
/target/
.classpath
.project
.settings
# Created by .ignore support plugin (hsz.mobi)
.DS_Store
/logs/
.idea/
.AppleDouble
.LSOverride
Icon
._*
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk
*.class
*.log
*.ctxt
.mtj.tmp/
*.jar
*.war
*.nar
*.ear
*.zip
*.tar.gz
*.rar
hs_err_pid*
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/dictionaries
.idea/**/shelf
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
.idea/**/dbnavigator.xml
.idea/**/gradle.xml
.idea/**/libraries
cmake-build-debug/
cmake-build-release/
.idea/**/mongoSettings.xml
*.iws
out/
.idea_modules/
atlassian-ide-plugin.xml
.idea/replstate.xml
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
.idea/httpRequests
target/
pom.xml.tag
pom.xml.releaseBackup
pom.xml.versionsBackup
pom.xml.next
release.properties
dependency-reduced-pom.xml
buildNumber.properties
.mvn/timing.properties
!/.mvn/wrapper/maven-wrapper.jar
.idea
*.iml
out
gen### Python template
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
*.manifest
*.spec
pip-log.txt
pip-delete-this-directory.txt
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
*.mo
*.pot
*.log
local_settings.py
db.sqlite3
instance/
.webassets-cache
.scrapy
docs/_build/
target/
.ipynb_checkpoints
.python-version
celerybeat-schedule
*.sage.py
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
.spyderproject
.spyproject
.ropeproject
/site
.mypy_cache/
.metadata
bin/
tmp/
*.tmp
*.bak
*.swp
*~.nib
local.properties
.settings/
.loadpath
.recommenders
.externalToolBuilders/
*.launch
*.pydevproject
.cproject
.autotools
.factorypath
.buildpath
.target
.tern-project
.texlipse
.springBeans
.recommenders/
.cache-main
.scala_dependencies
.worksheet
opentsdbreader/doc/opentsdbreader.md
0 → 100644
View file @
4d70b5ab
# OpenTSDBReader 插件文档
__
_
## 1 快速介绍
OpenTSDBReader 插件实现了从 OpenTSDB 读取数据。OpenTSDB 是主要由 Yahoo 维护的、可扩展的、分布式时序数据库,与阿里巴巴自研 TSDB 的关系与区别详见阿里云官网:《
[
相比 OpenTSDB 优势
](
https://help.aliyun.com/document_detail/113368.html
)
》
## 2 实现原理
在底层实现上,OpenTSDBReader 通过 HTTP 请求链接到 OpenTSDB 实例,利用
`/api/config`
接口获取到其底层存储 HBase 的连接信息,再利用 AsyncHBase 框架连接 HBase,通过 Scan 的方式将数据点扫描出来。整个同步的过程通过 metric 和时间段进行切分,即某个 metric 在某一个小时内的数据迁移,组合成一个迁移 Task。
## 3 功能说明
### 3.1 配置样例
*
配置一个从 OpenTSDB 数据库同步抽取数据到本地的作业:
```
json
{
"job"
:
{
"content"
:
[
{
"reader"
:
{
"name"
:
"opentsdbreader"
,
"parameter"
:
{
"endpoint"
:
"http://localhost:4242"
,
"column"
:
[
"m"
],
"beginDateTime"
:
"2019-01-01 00:00:00"
,
"endDateTime"
:
"2019-01-01 03:00:00"
}
},
"writer"
:
{
"name"
:
"streamwriter"
,
"parameter"
:
{
"encoding"
:
"UTF-8"
,
"print"
:
true
}
}
}
],
"setting"
:
{
"speed"
:
{
"channel"
:
1
}
}
}
}
```
### 3.2 参数说明
*
**name**
*
描述:本插件的名称
*
必选:是
*
默认值:opentsdbreader
*
**parameter**
*
**endpoint**
*
描述:OpenTSDB 的 HTTP 连接地址
*
必选:是
*
格式:http://IP:Port
*
默认值:无
*
**column**
*
描述:数据迁移任务需要迁移的 Metric 列表
*
必选:是
*
默认值:无
*
**beginDateTime**
*
描述:和 endDateTime 配合使用,用于指定哪个时间段内的数据点,需要被迁移
*
必选:是
*
格式:
`yyyy-MM-dd HH:mm:ss`
*
默认值:无
*
注意:指定起止时间会自动忽略分钟和秒,转为整点时刻,例如 2019-4-18 的
[
3:35, 4:55) 会被转为
[
3:00, 4:00)
*
**endDateTime**
*
描述:和 beginDateTime 配合使用,用于指定哪个时间段内的数据点,需要被迁移
*
必选:是
*
格式:
`yyyy-MM-dd HH:mm:ss`
*
默认值:无
*
注意:指定起止时间会自动忽略分钟和秒,转为整点时刻,例如 2019-4-18 的
[
3:35, 4:55) 会被转为
[
3:00, 4:00)
### 3.3 类型转换
| DataX 内部类型 | TSDB 数据类型 |
| -------------- | ------------------------------------------------------------ |
| String | TSDB 数据点序列化字符串,包括 timestamp、metric、tags 和 value |
## 4 性能报告
### 4.1 环境准备
#### 4.1.1 数据特征
从 Metric、时间线、Value 和 采集周期 四个方面来描述:
##### metric
固定指定一个 metric 为
`m`
。
##### tagkv
前四个 tagkv 全排列,形成
`10 * 20 * 100 * 100 = 2000000`
条时间线,最后 IP 对应 2000000 条时间线从 1 开始自增。
|
**tag_k**
|
**tag_v**
|
| --------- | ------------- |
| zone | z1~z10 |
| cluster | c1~c20 |
| group | g1~100 |
| app | a1~a100 |
| ip | ip1~ip2000000 |
##### value
度量值为
[
1, 100
]
区间内的随机值
##### interval
采集周期为 10 秒,持续摄入 3 小时,总数据量为
`3 * 60 * 60 / 10 * 2000000 = 2,160,000,000`
个数据点。
#### 4.1.2 机器参数
OpenTSDB Reader 机型: 64C256G
HBase 机型: 8C16G
*
5
#### 4.1.3 DataX jvm 参数
"-Xms4096m -Xmx4096m"
### 4.2 测试报告
| 通道数| DataX 速度 (Rec/s) |DataX 流量 (MB/s)|
|--------| --------|--------|
|1| 215428 | 25.65 |
|2| 424994 | 50.60 |
|3| 603132 | 71.81 |
## 5 约束限制
### 5.1 需要确保与 OpenTSDB 底层存储的网络是连通的
具体缘由详见 6.1
### 5.2 如果存在某一个 Metric 下在一个小时范围内的数据量过大,可能需要通过 `-j` 参数调整 JVM 内存大小
考虑到下游 Writer 如果写入速度不及 OpenTSDB reader 的查询数据,可能会存在积压的情况,因此需要适当地调整 JVM 参数。以"从 OpenTSDB 数据库同步抽取数据到本地的作业"为例,启动命令如下:
```
bash
python datax/bin/datax.py opentsdb2stream.json
-j
"-Xms4096m -Xmx4096m"
```
### 5.3 指定起止时间会自动被转为整点时刻
指定起止时间会自动被转为整点时刻,例如 2019-4-18 的
`[3:35, 3:55)`
会被转为
`[3:00, 4:00)`
### 5.4 目前只支持兼容 OpenTSDB 2.3.x
其他版本暂不保证兼容
## 6 FAQ
***
**Q:为什么需要连接 OpenTSDB 的底层存储,为什么不直接使用 `/api/query` 查询获取数据点?**
A:因为通过 OpenTSDB 的 HTTP 接口(
`/api/query`
)来读取数据的话,经内部压测发现,在大数据量的情况下,会导致 OpenTSDB 的异步框架会报 CallBack 过多的问题;所以,采用了直连底层 HBase 存储,通过 Scan 的方式来扫描数据点,来避免这个问题。另外,还考虑到,可以通过指定 metric 和时间范围,可以顺序地 Scan HBase 表,提高查询效率。
opentsdbreader/pom.xml
0 → 100644
View file @
4d70b5ab
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<parent>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-all
</artifactId>
<version>
0.0.1-SNAPSHOT
</version>
</parent>
<artifactId>
opentsdbreader
</artifactId>
<name>
opentsdbreader
</name>
<packaging>
jar
</packaging>
<properties>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<!-- common -->
<commons-lang3.version>
3.3.2
</commons-lang3.version>
<!-- http -->
<httpclient.version>
4.4
</httpclient.version>
<commons-io.version>
2.4
</commons-io.version>
<!-- json -->
<fastjson.version>
1.2.28
</fastjson.version>
<!-- opentsdb -->
<opentsdb.version>
2.3.2
</opentsdb.version>
<!-- test -->
<junit4.version>
4.12
</junit4.version>
<!-- time -->
<joda-time.version>
2.9.9
</joda-time.version>
</properties>
<dependencies>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-common
</artifactId>
<version>
${datax-project-version}
</version>
<exclusions>
<exclusion>
<artifactId>
slf4j-log4j12
</artifactId>
<groupId>
org.slf4j
</groupId>
</exclusion>
<exclusion>
<artifactId>
fastjson
</artifactId>
<groupId>
com.alibaba
</groupId>
</exclusion>
<exclusion>
<artifactId>
commons-math3
</artifactId>
<groupId>
org.apache.commons
</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
</dependency>
<dependency>
<groupId>
ch.qos.logback
</groupId>
<artifactId>
logback-classic
</artifactId>
</dependency>
<!-- common -->
<dependency>
<groupId>
org.apache.commons
</groupId>
<artifactId>
commons-lang3
</artifactId>
<version>
${commons-lang3.version}
</version>
</dependency>
<!-- http -->
<dependency>
<groupId>
org.apache.httpcomponents
</groupId>
<artifactId>
httpclient
</artifactId>
<version>
${httpclient.version}
</version>
</dependency>
<dependency>
<groupId>
commons-io
</groupId>
<artifactId>
commons-io
</artifactId>
<version>
${commons-io.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.httpcomponents
</groupId>
<artifactId>
fluent-hc
</artifactId>
<version>
${httpclient.version}
</version>
</dependency>
<!-- json -->
<dependency>
<groupId>
com.alibaba
</groupId>
<artifactId>
fastjson
</artifactId>
<version>
${fastjson.version}
</version>
</dependency>
<!-- opentsdb -->
<dependency>
<groupId>
net.opentsdb
</groupId>
<artifactId>
opentsdb
</artifactId>
<version>
${opentsdb.version}
</version>
</dependency>
<!-- time -->
<dependency>
<groupId>
joda-time
</groupId>
<artifactId>
joda-time
</artifactId>
<version>
${joda-time.version}
</version>
</dependency>
<!-- test -->
<dependency>
<groupId>
junit
</groupId>
<artifactId>
junit
</artifactId>
<version>
${junit4.version}
</version>
<scope>
test
</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>
maven-compiler-plugin
</artifactId>
<configuration>
<source>
1.6
</source>
<target>
1.6
</target>
<encoding>
${project-sourceEncoding}
</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>
maven-assembly-plugin
</artifactId>
<configuration>
<descriptors>
<descriptor>
src/main/assembly/package.xml
</descriptor>
</descriptors>
<finalName>
datax
</finalName>
</configuration>
<executions>
<execution>
<id>
dwzip
</id>
<phase>
package
</phase>
<goals>
<goal>
single
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
opentsdbreader/src/main/assembly/package.xml
0 → 100755
View file @
4d70b5ab
<assembly
xmlns=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"
>
<id></id>
<formats>
<format>
dir
</format>
</formats>
<includeBaseDirectory>
false
</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>
src/main/resources
</directory>
<includes>
<include>
plugin.json
</include>
<include>
plugin_job_template.json
</include>
</includes>
<outputDirectory>
plugin/reader/opentsdbreader
</outputDirectory>
</fileSet>
<fileSet>
<directory>
target/
</directory>
<includes>
<include>
opentsdbreader-0.0.1-SNAPSHOT.jar
</include>
</includes>
<outputDirectory>
plugin/reader/opentsdbreader
</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>
false
</useProjectArtifact>
<outputDirectory>
plugin/reader/opentsdbreader/libs
</outputDirectory>
<scope>
runtime
</scope>
</dependencySet>
</dependencySets>
</assembly>
opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/CliQuery.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
conn
;
import
net.opentsdb.core.*
;
import
net.opentsdb.utils.DateTime
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:CliQuery
*
* @author Benedict Jin
* @since 2019-04-17
*/
final
class
CliQuery
{
/**
* Parses the query from the command lines.
*
* @param args The command line arguments.
* @param tsdb The TSDB to use.
* @param queries The list in which {@link Query}s will be appended.
*/
static
void
parseCommandLineQuery
(
final
String
[]
args
,
final
TSDB
tsdb
,
final
ArrayList
<
Query
>
queries
)
{
long
start_ts
=
DateTime
.
parseDateTimeString
(
args
[
0
],
null
);
if
(
start_ts
>=
0
)
{
start_ts
/=
1000
;
}
long
end_ts
=
-
1
;
if
(
args
.
length
>
3
)
{
// see if we can detect an end time
try
{
if
(
args
[
1
].
charAt
(
0
)
!=
'+'
&&
(
args
[
1
].
indexOf
(
':'
)
>=
0
||
args
[
1
].
indexOf
(
'/'
)
>=
0
||
args
[
1
].
indexOf
(
'-'
)
>=
0
||
Long
.
parseLong
(
args
[
1
])
>
0
))
{
end_ts
=
DateTime
.
parseDateTimeString
(
args
[
1
],
null
);
}
}
catch
(
NumberFormatException
ignore
)
{
// ignore it as it means the third parameter is likely the aggregator
}
}
// temp fixup to seconds from ms until the rest of TSDB supports ms
// Note you can't append this to the DateTime.parseDateTimeString() call as
// it clobbers -1 results
if
(
end_ts
>=
0
)
{
end_ts
/=
1000
;
}
int
i
=
end_ts
<
0
?
1
:
2
;
while
(
i
<
args
.
length
&&
args
[
i
].
charAt
(
0
)
==
'+'
)
{
i
++;
}
while
(
i
<
args
.
length
)
{
final
Aggregator
agg
=
Aggregators
.
get
(
args
[
i
++]);
final
boolean
rate
=
"rate"
.
equals
(
args
[
i
]);
RateOptions
rate_options
=
new
RateOptions
(
false
,
Long
.
MAX_VALUE
,
RateOptions
.
DEFAULT_RESET_VALUE
);
if
(
rate
)
{
i
++;
long
counterMax
=
Long
.
MAX_VALUE
;
long
resetValue
=
RateOptions
.
DEFAULT_RESET_VALUE
;
if
(
args
[
i
].
startsWith
(
"counter"
))
{
String
[]
parts
=
Tags
.
splitString
(
args
[
i
],
','
);
if
(
parts
.
length
>=
2
&&
parts
[
1
].
length
()
>
0
)
{
counterMax
=
Long
.
parseLong
(
parts
[
1
]);
}
if
(
parts
.
length
>=
3
&&
parts
[
2
].
length
()
>
0
)
{
resetValue
=
Long
.
parseLong
(
parts
[
2
]);
}
rate_options
=
new
RateOptions
(
true
,
counterMax
,
resetValue
);
i
++;
}
}
final
boolean
downsample
=
"downsample"
.
equals
(
args
[
i
]);
if
(
downsample
)
{
i
++;
}
final
long
interval
=
downsample
?
Long
.
parseLong
(
args
[
i
++])
:
0
;
final
Aggregator
sampler
=
downsample
?
Aggregators
.
get
(
args
[
i
++])
:
null
;
final
String
metric
=
args
[
i
++];
final
HashMap
<
String
,
String
>
tags
=
new
HashMap
<
String
,
String
>();
while
(
i
<
args
.
length
&&
args
[
i
].
indexOf
(
' '
,
1
)
<
0
&&
args
[
i
].
indexOf
(
'='
,
1
)
>
0
)
{
Tags
.
parse
(
tags
,
args
[
i
++]);
}
final
Query
query
=
tsdb
.
newQuery
();
query
.
setStartTime
(
start_ts
);
if
(
end_ts
>
0
)
{
query
.
setEndTime
(
end_ts
);
}
query
.
setTimeSeries
(
metric
,
tags
,
agg
,
rate
,
rate_options
);
if
(
downsample
)
{
query
.
downsample
(
interval
,
sampler
);
}
queries
.
add
(
query
);
}
}
}
opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/Connection4TSDB.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
conn
;
import
com.alibaba.datax.common.plugin.RecordSender
;
import
java.util.List
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Connection for TSDB-like databases
*
* @author Benedict Jin
* @since 2019-03-29
*/
public
interface
Connection4TSDB
{
/**
* Get the address of Database.
*
* @return host+ip
*/
String
address
();
/**
* Get the version of Database.
*
* @return version
*/
String
version
();
/**
* Get these configurations.
*
* @return configs
*/
String
config
();
/**
* Get the list of supported version.
*
* @return version list
*/
String
[]
getSupportVersionPrefix
();
/**
* Send data points by metric & start time & end time.
*
* @param metric metric
* @param start startTime
* @param end endTime
* @param recordSender sender
*/
void
sendDPs
(
String
metric
,
Long
start
,
Long
end
,
RecordSender
recordSender
)
throws
Exception
;
/**
* Put data point.
*
* @param dp data point
* @return whether the data point is written successfully
*/
boolean
put
(
DataPoint4TSDB
dp
);
/**
* Put data points.
*
* @param dps data points
* @return whether the data point is written successfully
*/
boolean
put
(
List
<
DataPoint4TSDB
>
dps
);
/**
* Whether current version is supported.
*
* @return true: supported; false: not yet!
*/
boolean
isSupported
();
}
opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/DataPoint4TSDB.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
conn
;
import
com.alibaba.fastjson.JSON
;
import
java.util.Map
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:DataPoint for TSDB
*
* @author Benedict Jin
* @since 2019-04-10
*/
public
class
DataPoint4TSDB
{
private
long
timestamp
;
private
String
metric
;
private
Map
<
String
,
String
>
tags
;
private
Object
value
;
public
DataPoint4TSDB
()
{
}
public
DataPoint4TSDB
(
long
timestamp
,
String
metric
,
Map
<
String
,
String
>
tags
,
Object
value
)
{
this
.
timestamp
=
timestamp
;
this
.
metric
=
metric
;
this
.
tags
=
tags
;
this
.
value
=
value
;
}
public
long
getTimestamp
()
{
return
timestamp
;
}
public
void
setTimestamp
(
long
timestamp
)
{
this
.
timestamp
=
timestamp
;
}
public
String
getMetric
()
{
return
metric
;
}
public
void
setMetric
(
String
metric
)
{
this
.
metric
=
metric
;
}
public
Map
<
String
,
String
>
getTags
()
{
return
tags
;
}
public
void
setTags
(
Map
<
String
,
String
>
tags
)
{
this
.
tags
=
tags
;
}
public
Object
getValue
()
{
return
value
;
}
public
void
setValue
(
Object
value
)
{
this
.
value
=
value
;
}
@Override
public
String
toString
()
{
return
JSON
.
toJSONString
(
this
);
}
}
opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/DumpSeries.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
conn
;
import
com.alibaba.datax.common.element.Record
;
import
com.alibaba.datax.common.element.StringColumn
;
import
com.alibaba.datax.common.plugin.RecordSender
;
import
net.opentsdb.core.*
;
import
net.opentsdb.core.Internal.Cell
;
import
org.hbase.async.KeyValue
;
import
org.hbase.async.Scanner
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.*
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Tool to dump the data straight from HBase
*
* @author Benedict Jin
* @since 2019-04-17
*/
final
class
DumpSeries
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
DumpSeries
.
class
);
/**
* Dump all data points with special metric and time range, then send them all by {@link RecordSender}.
*/
static
void
doDump
(
TSDB
tsdb
,
String
[]
args
,
RecordSender
sender
)
throws
Exception
{
final
ArrayList
<
Query
>
queries
=
new
ArrayList
<
Query
>();
CliQuery
.
parseCommandLineQuery
(
args
,
tsdb
,
queries
);
List
<
DataPoint4TSDB
>
dps
=
new
LinkedList
<
DataPoint4TSDB
>();
for
(
final
Query
query
:
queries
)
{
final
List
<
Scanner
>
scanners
=
Internal
.
getScanners
(
query
);
for
(
Scanner
scanner
:
scanners
)
{
ArrayList
<
ArrayList
<
KeyValue
>>
rows
;
while
((
rows
=
scanner
.
nextRows
().
join
())
!=
null
)
{
for
(
final
ArrayList
<
KeyValue
>
row
:
rows
)
{
final
byte
[]
key
=
row
.
get
(
0
).
key
();
final
long
baseTime
=
Internal
.
baseTime
(
tsdb
,
key
);
final
String
metric
=
Internal
.
metricName
(
tsdb
,
key
);
for
(
final
KeyValue
kv
:
row
)
{
formatKeyValue
(
dps
,
tsdb
,
kv
,
baseTime
,
metric
);
for
(
DataPoint4TSDB
dp
:
dps
)
{
StringColumn
tsdbColumn
=
new
StringColumn
(
dp
.
toString
());
Record
record
=
sender
.
createRecord
();
record
.
addColumn
(
tsdbColumn
);
sender
.
sendToWriter
(
record
);
}
dps
.
clear
();
}
}
}
}
}
}
/**
* Parse KeyValue into data points.
*/
private
static
void
formatKeyValue
(
final
List
<
DataPoint4TSDB
>
dps
,
final
TSDB
tsdb
,
final
KeyValue
kv
,
final
long
baseTime
,
final
String
metric
)
{
Map
<
String
,
String
>
tagKVs
=
Internal
.
getTags
(
tsdb
,
kv
.
key
());
final
byte
[]
qualifier
=
kv
.
qualifier
();
final
int
q_len
=
qualifier
.
length
;
if
(!
AppendDataPoints
.
isAppendDataPoints
(
qualifier
)
&&
q_len
%
2
!=
0
)
{
// custom data object, not a data point
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"Not a data point"
);
}
}
else
if
(
q_len
==
2
||
q_len
==
4
&&
Internal
.
inMilliseconds
(
qualifier
))
{
// regular data point
final
Cell
cell
=
Internal
.
parseSingleValue
(
kv
);
if
(
cell
==
null
)
{
throw
new
IllegalDataException
(
"Unable to parse row: "
+
kv
);
}
dps
.
add
(
new
DataPoint4TSDB
(
cell
.
absoluteTimestamp
(
baseTime
),
metric
,
tagKVs
,
cell
.
parseValue
()));
}
else
{
final
Collection
<
Cell
>
cells
;
if
(
q_len
==
3
)
{
// append data points
cells
=
new
AppendDataPoints
().
parseKeyValue
(
tsdb
,
kv
);
}
else
{
// compacted column
cells
=
Internal
.
extractDataPoints
(
kv
);
}
for
(
Cell
cell
:
cells
)
{
dps
.
add
(
new
DataPoint4TSDB
(
cell
.
absoluteTimestamp
(
baseTime
),
metric
,
tagKVs
,
cell
.
parseValue
()));
}
}
}
}
opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/OpenTSDBConnection.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
conn
;
import
com.alibaba.datax.common.plugin.RecordSender
;
import
com.alibaba.datax.plugin.reader.util.TSDBUtils
;
import
com.alibaba.fastjson.JSON
;
import
org.apache.commons.lang3.StringUtils
;
import
java.util.List
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:OpenTSDB Connection
*
* @author Benedict Jin
* @since 2019-03-29
*/
public
class
OpenTSDBConnection
implements
Connection4TSDB
{
private
String
address
;
public
OpenTSDBConnection
(
String
address
)
{
this
.
address
=
address
;
}
@Override
public
String
address
()
{
return
address
;
}
@Override
public
String
version
()
{
return
TSDBUtils
.
version
(
address
);
}
@Override
public
String
config
()
{
return
TSDBUtils
.
config
(
address
);
}
@Override
public
String
[]
getSupportVersionPrefix
()
{
return
new
String
[]{
"2.3"
};
}
@Override
public
void
sendDPs
(
String
metric
,
Long
start
,
Long
end
,
RecordSender
recordSender
)
throws
Exception
{
OpenTSDBDump
.
dump
(
this
,
metric
,
start
,
end
,
recordSender
);
}
@Override
public
boolean
put
(
DataPoint4TSDB
dp
)
{
return
false
;
}
@Override
public
boolean
put
(
List
<
DataPoint4TSDB
>
dps
)
{
return
false
;
}
@Override
public
boolean
isSupported
()
{
String
versionJson
=
version
();
if
(
StringUtils
.
isBlank
(
versionJson
))
{
throw
new
RuntimeException
(
"Cannot get the version!"
);
}
String
version
=
JSON
.
parseObject
(
versionJson
).
getString
(
"version"
);
if
(
StringUtils
.
isBlank
(
version
))
{
return
false
;
}
for
(
String
prefix
:
getSupportVersionPrefix
())
{
if
(
version
.
startsWith
(
prefix
))
{
return
true
;
}
}
return
false
;
}
}
opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/conn/OpenTSDBDump.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
conn
;
import
com.alibaba.datax.common.plugin.RecordSender
;
import
com.alibaba.fastjson.JSON
;
import
net.opentsdb.core.TSDB
;
import
net.opentsdb.utils.Config
;
import
java.util.Map
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:OpenTSDB Dump
*
* @author Benedict Jin
* @since 2019-04-15
*/
final
class
OpenTSDBDump
{
private
static
TSDB
TSDB_INSTANCE
;
private
OpenTSDBDump
()
{
}
static
void
dump
(
OpenTSDBConnection
conn
,
String
metric
,
Long
start
,
Long
end
,
RecordSender
sender
)
throws
Exception
{
DumpSeries
.
doDump
(
getTSDB
(
conn
),
new
String
[]{
start
+
""
,
end
+
""
,
"none"
,
metric
},
sender
);
}
private
static
TSDB
getTSDB
(
OpenTSDBConnection
conn
)
{
if
(
TSDB_INSTANCE
==
null
)
{
synchronized
(
TSDB
.
class
)
{
if
(
TSDB_INSTANCE
==
null
)
{
try
{
Config
config
=
new
Config
(
false
);
Map
configurations
=
JSON
.
parseObject
(
conn
.
config
(),
Map
.
class
);
for
(
Object
key
:
configurations
.
keySet
())
{
config
.
overrideConfig
(
key
.
toString
(),
configurations
.
get
(
key
.
toString
()).
toString
());
}
TSDB_INSTANCE
=
new
TSDB
(
config
);
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
"Cannot init OpenTSDB connection!"
);
}
}
}
}
return
TSDB_INSTANCE
;
}
}
opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/opentsdbreader/Constant.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
opentsdbreader
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Key
*
* @author Benedict Jin
* @since 2019-04-18
*/
public
final
class
Constant
{
static
final
String
DEFAULT_DATA_FORMAT
=
"yyyy-MM-dd HH:mm:ss"
;
}
opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/opentsdbreader/Key.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
opentsdbreader
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Key
*
* @author Benedict Jin
* @since 2019-04-18
*/
public
class
Key
{
static
final
String
ENDPOINT
=
"endpoint"
;
static
final
String
COLUMN
=
"column"
;
static
final
String
BEGIN_DATE_TIME
=
"beginDateTime"
;
static
final
String
END_DATE_TIME
=
"endDateTime"
;
}
opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/opentsdbreader/OpenTSDBReader.java
0 → 100755
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
opentsdbreader
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.plugin.RecordSender
;
import
com.alibaba.datax.common.spi.Reader
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.plugin.reader.conn.OpenTSDBConnection
;
import
com.alibaba.datax.plugin.reader.util.TimeUtils
;
import
com.alibaba.fastjson.JSON
;
import
org.apache.commons.lang3.StringUtils
;
import
org.joda.time.DateTime
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.text.ParseException
;
import
java.text.SimpleDateFormat
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.List
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Key
*
* @author Benedict Jin
* @since 2019-04-18
*/
@SuppressWarnings
(
"unused"
)
public
class
OpenTSDBReader
extends
Reader
{
public
static
class
Job
extends
Reader
.
Job
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
Job
.
class
);
private
Configuration
originalConfig
;
@Override
public
void
init
()
{
this
.
originalConfig
=
super
.
getPluginJobConf
();
String
address
=
originalConfig
.
getString
(
Key
.
ENDPOINT
);
if
(
StringUtils
.
isBlank
(
address
))
{
throw
DataXException
.
asDataXException
(
OpenTSDBReaderErrorCode
.
REQUIRED_VALUE
,
"The parameter ["
+
Key
.
ENDPOINT
+
"] is not set."
);
}
List
<
String
>
columns
=
originalConfig
.
getList
(
Key
.
COLUMN
,
String
.
class
);
if
(
columns
==
null
||
columns
.
isEmpty
())
{
throw
DataXException
.
asDataXException
(
OpenTSDBReaderErrorCode
.
REQUIRED_VALUE
,
"The parameter ["
+
Key
.
COLUMN
+
"] is not set."
);
}
SimpleDateFormat
format
=
new
SimpleDateFormat
(
Constant
.
DEFAULT_DATA_FORMAT
);
String
startTime
=
originalConfig
.
getString
(
Key
.
BEGIN_DATE_TIME
);
Long
startDate
;
if
(
startTime
==
null
||
startTime
.
trim
().
length
()
==
0
)
{
throw
DataXException
.
asDataXException
(
OpenTSDBReaderErrorCode
.
REQUIRED_VALUE
,
"The parameter ["
+
Key
.
BEGIN_DATE_TIME
+
"] is not set."
);
}
else
{
try
{
startDate
=
format
.
parse
(
startTime
).
getTime
();
}
catch
(
ParseException
e
)
{
throw
DataXException
.
asDataXException
(
OpenTSDBReaderErrorCode
.
ILLEGAL_VALUE
,
"The parameter ["
+
Key
.
BEGIN_DATE_TIME
+
"] needs to conform to the ["
+
Constant
.
DEFAULT_DATA_FORMAT
+
"] format."
);
}
}
String
endTime
=
originalConfig
.
getString
(
Key
.
END_DATE_TIME
);
Long
endDate
;
if
(
endTime
==
null
||
endTime
.
trim
().
length
()
==
0
)
{
throw
DataXException
.
asDataXException
(
OpenTSDBReaderErrorCode
.
REQUIRED_VALUE
,
"The parameter ["
+
Key
.
END_DATE_TIME
+
"] is not set."
);
}
else
{
try
{
endDate
=
format
.
parse
(
endTime
).
getTime
();
}
catch
(
ParseException
e
)
{
throw
DataXException
.
asDataXException
(
OpenTSDBReaderErrorCode
.
ILLEGAL_VALUE
,
"The parameter ["
+
Key
.
END_DATE_TIME
+
"] needs to conform to the ["
+
Constant
.
DEFAULT_DATA_FORMAT
+
"] format."
);
}
}
if
(
startDate
>=
endDate
)
{
throw
DataXException
.
asDataXException
(
OpenTSDBReaderErrorCode
.
ILLEGAL_VALUE
,
"The parameter ["
+
Key
.
BEGIN_DATE_TIME
+
"] should be less than the parameter ["
+
Key
.
END_DATE_TIME
+
"]."
);
}
}
@Override
public
void
prepare
()
{
}
@Override
public
List
<
Configuration
>
split
(
int
adviceNumber
)
{
List
<
Configuration
>
configurations
=
new
ArrayList
<
Configuration
>();
// get metrics
List
<
String
>
columns
=
originalConfig
.
getList
(
Key
.
COLUMN
,
String
.
class
);
// get time range
SimpleDateFormat
format
=
new
SimpleDateFormat
(
Constant
.
DEFAULT_DATA_FORMAT
);
long
startTime
;
try
{
startTime
=
format
.
parse
(
originalConfig
.
getString
(
Key
.
BEGIN_DATE_TIME
)).
getTime
();
}
catch
(
ParseException
e
)
{
throw
DataXException
.
asDataXException
(
OpenTSDBReaderErrorCode
.
ILLEGAL_VALUE
,
"解析["
+
Key
.
BEGIN_DATE_TIME
+
"]失败."
,
e
);
}
long
endTime
;
try
{
endTime
=
format
.
parse
(
originalConfig
.
getString
(
Key
.
END_DATE_TIME
)).
getTime
();
}
catch
(
ParseException
e
)
{
throw
DataXException
.
asDataXException
(
OpenTSDBReaderErrorCode
.
ILLEGAL_VALUE
,
"解析["
+
Key
.
END_DATE_TIME
+
"]失败."
,
e
);
}
if
(
TimeUtils
.
isSecond
(
startTime
))
{
startTime
*=
1000
;
}
if
(
TimeUtils
.
isSecond
(
endTime
))
{
endTime
*=
1000
;
}
DateTime
startDateTime
=
new
DateTime
(
TimeUtils
.
getTimeInHour
(
startTime
));
DateTime
endDateTime
=
new
DateTime
(
TimeUtils
.
getTimeInHour
(
endTime
));
// split by metric
for
(
String
column
:
columns
)
{
// split by time in hour
while
(
startDateTime
.
isBefore
(
endDateTime
))
{
Configuration
clone
=
this
.
originalConfig
.
clone
();
clone
.
set
(
Key
.
COLUMN
,
Collections
.
singletonList
(
column
));
clone
.
set
(
Key
.
BEGIN_DATE_TIME
,
startDateTime
.
getMillis
());
startDateTime
=
startDateTime
.
plusHours
(
1
);
// Make sure the time interval is [start, end).
// Because net.opentsdb.core.Query.setEndTime means less than or equal to the end time.
clone
.
set
(
Key
.
END_DATE_TIME
,
startDateTime
.
getMillis
()
-
1
);
configurations
.
add
(
clone
);
LOG
.
info
(
"Configuration: {}"
,
JSON
.
toJSONString
(
clone
));
}
}
return
configurations
;
}
@Override
public
void
post
()
{
}
@Override
public
void
destroy
()
{
}
}
public
static
class
Task
extends
Reader
.
Task
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
Task
.
class
);
private
List
<
String
>
columns
;
private
OpenTSDBConnection
conn
;
private
Long
startTime
;
private
Long
endTime
;
@Override
public
void
init
()
{
Configuration
readerSliceConfig
=
super
.
getPluginJobConf
();
LOG
.
info
(
"getPluginJobConf: {}"
,
JSON
.
toJSONString
(
readerSliceConfig
));
this
.
columns
=
readerSliceConfig
.
getList
(
Key
.
COLUMN
,
String
.
class
);
String
address
=
readerSliceConfig
.
getString
(
Key
.
ENDPOINT
);
conn
=
new
OpenTSDBConnection
(
address
);
this
.
startTime
=
readerSliceConfig
.
getLong
(
Key
.
BEGIN_DATE_TIME
);
this
.
endTime
=
readerSliceConfig
.
getLong
(
Key
.
END_DATE_TIME
);
}
@Override
public
void
prepare
()
{
}
@Override
public
void
startRead
(
RecordSender
recordSender
)
{
try
{
for
(
String
column
:
columns
)
{
conn
.
sendDPs
(
column
,
this
.
startTime
,
this
.
endTime
,
recordSender
);
}
}
catch
(
Exception
e
)
{
throw
DataXException
.
asDataXException
(
OpenTSDBReaderErrorCode
.
ILLEGAL_VALUE
,
"获取或发送数据点的过程中出错!"
,
e
);
}
}
@Override
public
void
post
()
{
}
@Override
public
void
destroy
()
{
}
}
}
opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/opentsdbreader/OpenTSDBReaderErrorCode.java
0 → 100755
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
opentsdbreader
;
import
com.alibaba.datax.common.spi.ErrorCode
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:OpenTSDB Reader Error Code
*
* @author Benedict Jin
* @since 2019-04-18
*/
public
enum
OpenTSDBReaderErrorCode
implements
ErrorCode
{
REQUIRED_VALUE
(
"OpenTSDBReader-00"
,
"缺失必要的值"
),
ILLEGAL_VALUE
(
"OpenTSDBReader-01"
,
"值非法"
);
private
final
String
code
;
private
final
String
description
;
OpenTSDBReaderErrorCode
(
String
code
,
String
description
)
{
this
.
code
=
code
;
this
.
description
=
description
;
}
@Override
public
String
getCode
()
{
return
this
.
code
;
}
@Override
public
String
getDescription
()
{
return
this
.
description
;
}
@Override
public
String
toString
()
{
return
String
.
format
(
"Code:[%s], Description:[%s]. "
,
this
.
code
,
this
.
description
);
}
}
opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/util/HttpUtils.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
util
;
import
com.alibaba.fastjson.JSON
;
import
org.apache.http.client.fluent.Content
;
import
org.apache.http.client.fluent.Request
;
import
org.apache.http.entity.ContentType
;
import
java.nio.charset.Charset
;
import
java.util.Map
;
import
java.util.concurrent.TimeUnit
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:HttpUtils
*
* @author Benedict Jin
* @since 2019-03-29
*/
public
final
class
HttpUtils
{
public
final
static
Charset
UTF_8
=
Charset
.
forName
(
"UTF-8"
);
public
final
static
int
CONNECT_TIMEOUT_DEFAULT_IN_MILL
=
(
int
)
TimeUnit
.
SECONDS
.
toMillis
(
60
);
public
final
static
int
SOCKET_TIMEOUT_DEFAULT_IN_MILL
=
(
int
)
TimeUnit
.
SECONDS
.
toMillis
(
60
);
private
HttpUtils
()
{
}
public
static
String
get
(
String
url
)
throws
Exception
{
Content
content
=
Request
.
Get
(
url
)
.
connectTimeout
(
CONNECT_TIMEOUT_DEFAULT_IN_MILL
)
.
socketTimeout
(
SOCKET_TIMEOUT_DEFAULT_IN_MILL
)
.
execute
()
.
returnContent
();
if
(
content
==
null
)
{
return
null
;
}
return
content
.
asString
(
UTF_8
);
}
public
static
String
post
(
String
url
,
Map
<
String
,
Object
>
params
)
throws
Exception
{
return
post
(
url
,
JSON
.
toJSONString
(
params
),
CONNECT_TIMEOUT_DEFAULT_IN_MILL
,
SOCKET_TIMEOUT_DEFAULT_IN_MILL
);
}
public
static
String
post
(
String
url
,
String
params
)
throws
Exception
{
return
post
(
url
,
params
,
CONNECT_TIMEOUT_DEFAULT_IN_MILL
,
SOCKET_TIMEOUT_DEFAULT_IN_MILL
);
}
public
static
String
post
(
String
url
,
Map
<
String
,
Object
>
params
,
int
connectTimeoutInMill
,
int
socketTimeoutInMill
)
throws
Exception
{
return
post
(
url
,
JSON
.
toJSONString
(
params
),
connectTimeoutInMill
,
socketTimeoutInMill
);
}
public
static
String
post
(
String
url
,
String
params
,
int
connectTimeoutInMill
,
int
socketTimeoutInMill
)
throws
Exception
{
Content
content
=
Request
.
Post
(
url
)
.
connectTimeout
(
connectTimeoutInMill
)
.
socketTimeout
(
socketTimeoutInMill
)
.
addHeader
(
"Content-Type"
,
"application/json"
)
.
bodyString
(
params
,
ContentType
.
APPLICATION_JSON
)
.
execute
()
.
returnContent
();
if
(
content
==
null
)
{
return
null
;
}
return
content
.
asString
(
UTF_8
);
}
}
opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/util/TSDBUtils.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
util
;
import
com.alibaba.datax.plugin.reader.conn.DataPoint4TSDB
;
import
com.alibaba.fastjson.JSON
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.List
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:TSDB Utils
*
* @author Benedict Jin
* @since 2019-03-29
*/
public
final
class
TSDBUtils
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
TSDBUtils
.
class
);
private
TSDBUtils
()
{
}
public
static
String
version
(
String
address
)
{
String
url
=
String
.
format
(
"%s/api/version"
,
address
);
String
rsp
;
try
{
rsp
=
HttpUtils
.
get
(
url
);
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
e
);
}
return
rsp
;
}
public
static
String
config
(
String
address
)
{
String
url
=
String
.
format
(
"%s/api/config"
,
address
);
String
rsp
;
try
{
rsp
=
HttpUtils
.
get
(
url
);
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
e
);
}
return
rsp
;
}
public
static
boolean
put
(
String
address
,
List
<
DataPoint4TSDB
>
dps
)
{
return
put
(
address
,
JSON
.
toJSON
(
dps
));
}
public
static
boolean
put
(
String
address
,
DataPoint4TSDB
dp
)
{
return
put
(
address
,
JSON
.
toJSON
(
dp
));
}
private
static
boolean
put
(
String
address
,
Object
o
)
{
String
url
=
String
.
format
(
"%s/api/put"
,
address
);
String
rsp
;
try
{
rsp
=
HttpUtils
.
post
(
url
,
o
.
toString
());
// If successful, the returned content should be null.
assert
rsp
==
null
;
}
catch
(
Exception
e
)
{
LOG
.
error
(
"Address: {}, DataPoints: {}"
,
url
,
o
);
throw
new
RuntimeException
(
e
);
}
return
true
;
}
}
opentsdbreader/src/main/java/com/alibaba/datax/plugin/reader/util/TimeUtils.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
util
;
import
java.util.concurrent.TimeUnit
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:TimeUtils
*
* @author Benedict Jin
* @since 2019-04-22
*/
public
final
class
TimeUtils
{
private
TimeUtils
()
{
}
private
static
final
long
SECOND_MASK
=
0xFFFFFFFF00000000
L
;
private
static
final
long
HOUR_IN_MILL
=
TimeUnit
.
HOURS
.
toMillis
(
1
);
/**
* Weather the timestamp is second.
*
* @param ts timestamp
*/
public
static
boolean
isSecond
(
long
ts
)
{
return
(
ts
&
SECOND_MASK
)
==
0
;
}
/**
* Get the hour.
*
* @param ms time in millisecond
*/
public
static
long
getTimeInHour
(
long
ms
)
{
return
ms
-
ms
%
HOUR_IN_MILL
;
}
}
opentsdbreader/src/main/resources/plugin.json
0 → 100755
View file @
4d70b5ab
{
"name"
:
"opentsdbreader"
,
"class"
:
"com.alibaba.datax.plugin.reader.opentsdbreader.OpenTSDBReader"
,
"description"
:
{
"useScene"
:
"从 OpenTSDB 中摄取数据点"
,
"mechanism"
:
"根据时间和 metric 直连底层 HBase 存储,从而 Scan 出符合条件的数据点"
,
"warn"
:
"指定起止时间会自动忽略分钟和秒,转为整点时刻,例如 2019-4-18 的 [3:35, 4:55) 会被转为 [3:00, 4:00)"
},
"developer"
:
"Benedict Jin"
}
opentsdbreader/src/main/resources/plugin_job_template.json
0 → 100644
View file @
4d70b5ab
{
"name"
:
"opentsdbreader"
,
"parameter"
:
{
"endpoint"
:
"http://localhost:8242"
,
"column"
:
[
"m"
],
"startTime"
:
"2019-01-01 00:00:00"
,
"endTime"
:
"2019-01-01 01:00:00"
}
}
opentsdbreader/src/test/java/com/alibaba/datax/plugin/reader/conn/OpenTSDBConnectionTest.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
conn
;
import
org.junit.Assert
;
import
org.junit.Ignore
;
import
org.junit.Test
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:OpenTSDB Connection4TSDB Test
*
* @author Benedict Jin
* @since 2019-03-29
*/
@Ignore
public
class
OpenTSDBConnectionTest
{
private
static
final
String
OPENTSDB_ADDRESS
=
"http://localhost:8242"
;
@Test
public
void
testVersion
()
{
String
version
=
new
OpenTSDBConnection
(
OPENTSDB_ADDRESS
).
version
();
Assert
.
assertNotNull
(
version
);
}
@Test
public
void
testIsSupported
()
{
Assert
.
assertTrue
(
new
OpenTSDBConnection
(
OPENTSDB_ADDRESS
).
isSupported
());
}
}
opentsdbreader/src/test/java/com/alibaba/datax/plugin/reader/util/Const.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
util
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Const
*
* @author Benedict Jin
* @since 2019-03-29
*/
final
class
Const
{
private
Const
()
{
}
static
final
String
OPENTSDB_ADDRESS
=
"http://localhost:8242"
;
static
final
String
TSDB_ADDRESS
=
"http://localhost:8240"
;
}
opentsdbreader/src/test/java/com/alibaba/datax/plugin/reader/util/HttpUtilsTest.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
util
;
import
org.junit.Assert
;
import
org.junit.Ignore
;
import
org.junit.Test
;
import
java.util.HashMap
;
import
java.util.Map
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:HttpUtils Test
*
* @author Benedict Jin
* @since 2019-03-29
*/
@Ignore
public
class
HttpUtilsTest
{
@Test
public
void
testSimpleCase
()
throws
Exception
{
String
url
=
"https://httpbin.org/post"
;
Map
<
String
,
Object
>
params
=
new
HashMap
<
String
,
Object
>();
params
.
put
(
"foo"
,
"bar"
);
String
rsp
=
HttpUtils
.
post
(
url
,
params
);
System
.
out
.
println
(
rsp
);
Assert
.
assertNotNull
(
rsp
);
}
@Test
public
void
testGet
()
throws
Exception
{
String
url
=
String
.
format
(
"%s/api/version"
,
Const
.
OPENTSDB_ADDRESS
);
String
rsp
=
HttpUtils
.
get
(
url
);
System
.
out
.
println
(
rsp
);
Assert
.
assertNotNull
(
rsp
);
}
}
opentsdbreader/src/test/java/com/alibaba/datax/plugin/reader/util/TSDBTest.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
util
;
import
org.junit.Assert
;
import
org.junit.Ignore
;
import
org.junit.Test
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:TSDB Test
*
* @author Benedict Jin
* @since 2019-04-11
*/
@Ignore
public
class
TSDBTest
{
@Test
public
void
testVersion
()
{
String
version
=
TSDBUtils
.
version
(
Const
.
TSDB_ADDRESS
);
Assert
.
assertNotNull
(
version
);
System
.
out
.
println
(
version
);
version
=
TSDBUtils
.
version
(
Const
.
OPENTSDB_ADDRESS
);
Assert
.
assertNotNull
(
version
);
System
.
out
.
println
(
version
);
}
}
opentsdbreader/src/test/java/com/alibaba/datax/plugin/reader/util/TimeUtilsTest.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
reader
.
util
;
import
org.junit.Assert
;
import
org.junit.Test
;
import
java.text.ParseException
;
import
java.text.SimpleDateFormat
;
import
java.util.Date
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:com.alibaba.datax.common.util
*
* @author Benedict Jin
* @since 2019-04-22
*/
public
class
TimeUtilsTest
{
@Test
public
void
testIsSecond
()
{
Assert
.
assertFalse
(
TimeUtils
.
isSecond
(
System
.
currentTimeMillis
()));
Assert
.
assertTrue
(
TimeUtils
.
isSecond
(
System
.
currentTimeMillis
()
/
1000
));
}
@Test
public
void
testGetTimeInHour
()
throws
ParseException
{
SimpleDateFormat
sdf
=
new
SimpleDateFormat
(
"yyyy-MM-dd HH:mm:ss"
);
Date
date
=
sdf
.
parse
(
"2019-04-18 15:32:33"
);
long
timeInHour
=
TimeUtils
.
getTimeInHour
(
date
.
getTime
());
Assert
.
assertEquals
(
"2019-04-18 15:00:00"
,
sdf
.
format
(
timeInHour
));
}
}
package.xml
View file @
4d70b5ab
...
...
@@ -159,6 +159,13 @@
</includes>
<outputDirectory>
datax
</outputDirectory>
</fileSet>
<fileSet>
<directory>
opentsdbreader/target/datax/
</directory>
<includes>
<include>
**/*.*
</include>
</includes>
<outputDirectory>
datax
</outputDirectory>
</fileSet>
<!-- writer -->
<fileSet>
...
...
@@ -322,5 +329,12 @@
</includes>
<outputDirectory>
datax
</outputDirectory>
</fileSet>
<fileSet>
<directory>
tsdbwriter/target/datax/
</directory>
<includes>
<include>
**/*.*
</include>
</includes>
<outputDirectory>
datax
</outputDirectory>
</fileSet>
</fileSets>
</assembly>
pom.xml
View file @
4d70b5ab
...
...
@@ -62,6 +62,7 @@
<module>
rdbmsreader
</module>
<module>
hbase11xreader
</module>
<module>
hbase094xreader
</module>
<module>
opentsdbreader
</module>
<!-- writer -->
<module>
mysqlwriter
</module>
...
...
@@ -85,6 +86,7 @@
<module>
hbase11xsqlwriter
</module>
<module>
hbase11xsqlreader
</module>
<module>
elasticsearchwriter
</module>
<module>
tsdbwriter
</module>
<!-- common support module -->
<module>
plugin-rdbms-util
</module>
...
...
tsdbwriter/doc/tsdbhttpwriter.md
0 → 100644
View file @
4d70b5ab
# TSDBWriter 插件文档
__
_
## 1 快速介绍
TSDBWriter 插件实现了将数据点写入到阿里巴巴自研 TSDB 数据库中(后续简称 TSDB)。
时间序列数据库(Time Series Database , 简称 TSDB)是一种高性能,低成本,稳定可靠的在线时序数据库服务;提供高效读写,高压缩比存储、时序数据插值及聚合计算,广泛应用于物联网(IoT)设备监控系统 ,企业能源管理系统(EMS),生产安全监控系统,电力检测系统等行业场景。 TSDB 提供百万级时序数据秒级写入,高压缩比低成本存储、预降采样、插值、多维聚合计算,查询结果可视化功能;解决由于设备采集点数量巨大,数据采集频率高,造成的存储成本高,写入和查询分析效率低的问题。更多关于 TSDB 的介绍,详见
[
阿里云 TSDB 官网
](
https://help.aliyun.com/product/54825.html
)
。
## 2 实现原理
通过 HTTP 连接 TSDB 实例,并通过
`/api/put`
接口将数据点写入。关于写入接口详见 TSDB 的
[
接口说明文档
](
https://help.aliyun.com/document_detail/59939.html
)
。
## 3 功能说明
### 3.1 配置样例
*
配置一个从 OpenTSDB 数据库同步抽取数据到 TSDB:
```
json
{
"job"
:
{
"content"
:
[
{
"reader"
:
{
"name"
:
"opentsdbreader"
,
"parameter"
:
{
"endpoint"
:
"http://localhost:4242"
,
"column"
:
[
"m"
],
"startTime"
:
"2019-01-01 00:00:00"
,
"endTime"
:
"2019-01-01 03:00:00"
}
},
"writer"
:
{
"name"
:
"tsdbhttpwriter"
,
"parameter"
:
{
"endpoint"
:
"http://localhost:8242"
}
}
}
],
"setting"
:
{
"speed"
:
{
"channel"
:
1
}
}
}
}
```
### 3.2 参数说明
*
**name**
*
描述:本插件的名称
*
必选:是
*
默认值:tsdbhttpwriter
*
**parameter**
*
**endpoint**
*
描述:TSDB 的 HTTP 连接地址
*
必选:是
*
格式:http://IP:Port
*
默认值:无
*
**batchSize**
*
描述:每次批量数据的条数
*
必选:否
*
格式:int,需要保证大于 0
*
默认值:100
*
**maxRetryTime**
*
描述:失败后重试的次数
*
必选:否
*
格式:int,需要保证大于 1
*
默认值:3
*
**ignoreWriteError**
*
描述:如果设置为 true,则忽略写入错误,继续写入;否则,多次重试后仍写入失败的话,则会终止写入任务
*
必选:否
*
格式:bool
*
默认值:false
### 3.3 类型转换
| DataX 内部类型 | TSDB 数据类型 |
| -------------- | ------------------------------------------------------------ |
| String | TSDB 数据点序列化字符串,包括 timestamp、metric、tags 和 value |
## 4 性能报告
### 4.1 环境准备
#### 4.1.1 数据特征
从 Metric、时间线、Value 和 采集周期 四个方面来描述:
##### metric
固定指定一个 metric 为
`m`
。
##### tagkv
前四个 tagkv 全排列,形成
`10 * 20 * 100 * 100 = 2000000`
条时间线,最后 IP 对应 2000000 条时间线从 1 开始自增。
|
**tag_k**
|
**tag_v**
|
| --------- | ------------- |
| zone | z1~z10 |
| cluster | c1~c20 |
| group | g1~100 |
| app | a1~a100 |
| ip | ip1~ip2000000 |
##### value
度量值为
[
1, 100
]
区间内的随机值
##### interval
采集周期为 10 秒,持续摄入 3 小时,总数据量为
`3 * 60 * 60 / 10 * 2000000 = 2,160,000,000`
个数据点。
#### 4.1.2 机器参数
TSDB Writer 机型: 64C256G
HBase 机型: 8C16G
*
5
#### 4.1.3 DataX jvm 参数
"-Xms4096m -Xmx4096m"
### 4.2 测试报告
| 通道数 | DataX 速度 (Rec/s) | DataX 流量 (MB/s) |
| ------ | ------------------ | ----------------- |
| 1 | 129753 | 15.45 |
| 2 | 284953 | 33.70 |
| 3 | 385868 | 45.71 |
## 5 约束限制
### 5.1 目前只支持兼容 TSDB 2.4.x 及以上版本
其他版本暂不保证兼容
## 6 FAQ
tsdbwriter/pom.xml
0 → 100644
View file @
4d70b5ab
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<parent>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-all
</artifactId>
<version>
0.0.1-SNAPSHOT
</version>
</parent>
<artifactId>
tsdbwriter
</artifactId>
<name>
tsdbwriter
</name>
<packaging>
jar
</packaging>
<properties>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<!-- common -->
<commons-lang3.version>
3.3.2
</commons-lang3.version>
<!-- http -->
<httpclient.version>
4.4
</httpclient.version>
<commons-io.version>
2.4
</commons-io.version>
<!-- json -->
<fastjson.version>
1.2.28
</fastjson.version>
<!-- test -->
<junit4.version>
4.12
</junit4.version>
</properties>
<dependencies>
<dependency>
<groupId>
com.alibaba.datax
</groupId>
<artifactId>
datax-common
</artifactId>
<version>
${datax-project-version}
</version>
<exclusions>
<exclusion>
<artifactId>
slf4j-log4j12
</artifactId>
<groupId>
org.slf4j
</groupId>
</exclusion>
<exclusion>
<artifactId>
fastjson
</artifactId>
<groupId>
com.alibaba
</groupId>
</exclusion>
<exclusion>
<artifactId>
commons-math3
</artifactId>
<groupId>
org.apache.commons
</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
</dependency>
<dependency>
<groupId>
ch.qos.logback
</groupId>
<artifactId>
logback-classic
</artifactId>
</dependency>
<!-- common -->
<dependency>
<groupId>
org.apache.commons
</groupId>
<artifactId>
commons-lang3
</artifactId>
<version>
${commons-lang3.version}
</version>
</dependency>
<!-- http -->
<dependency>
<groupId>
org.apache.httpcomponents
</groupId>
<artifactId>
httpclient
</artifactId>
<version>
${httpclient.version}
</version>
</dependency>
<dependency>
<groupId>
commons-io
</groupId>
<artifactId>
commons-io
</artifactId>
<version>
${commons-io.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.httpcomponents
</groupId>
<artifactId>
fluent-hc
</artifactId>
<version>
${httpclient.version}
</version>
</dependency>
<!-- json -->
<dependency>
<groupId>
com.alibaba
</groupId>
<artifactId>
fastjson
</artifactId>
<version>
${fastjson.version}
</version>
</dependency>
<!-- test -->
<dependency>
<groupId>
junit
</groupId>
<artifactId>
junit
</artifactId>
<version>
${junit4.version}
</version>
<scope>
test
</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>
maven-compiler-plugin
</artifactId>
<configuration>
<source>
1.6
</source>
<target>
1.6
</target>
<encoding>
${project-sourceEncoding}
</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>
maven-assembly-plugin
</artifactId>
<configuration>
<descriptors>
<descriptor>
src/main/assembly/package.xml
</descriptor>
</descriptors>
<finalName>
datax
</finalName>
</configuration>
<executions>
<execution>
<id>
dwzip
</id>
<phase>
package
</phase>
<goals>
<goal>
single
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
tsdbwriter/src/main/assembly/package.xml
0 → 100755
View file @
4d70b5ab
<assembly
xmlns=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"
>
<id></id>
<formats>
<format>
dir
</format>
</formats>
<includeBaseDirectory>
false
</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>
src/main/resources
</directory>
<includes>
<include>
plugin.json
</include>
<include>
plugin_job_template.json
</include>
</includes>
<outputDirectory>
plugin/writer/tsdbwriter
</outputDirectory>
</fileSet>
<fileSet>
<directory>
target/
</directory>
<includes>
<include>
tsdbwriter-0.0.1-SNAPSHOT.jar
</include>
</includes>
<outputDirectory>
plugin/writer/tsdbwriter
</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>
false
</useProjectArtifact>
<outputDirectory>
plugin/writer/tsdbwriter/libs
</outputDirectory>
<scope>
runtime
</scope>
</dependencySet>
</dependencySets>
</assembly>
tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/Connection4TSDB.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
conn
;
import
com.alibaba.datax.common.plugin.RecordSender
;
import
java.util.List
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Connection for TSDB-like databases
*
* @author Benedict Jin
* @since 2019-03-29
*/
public
interface
Connection4TSDB
{
/**
* Get the address of Database.
*
* @return host+ip
*/
String
address
();
/**
* Get the version of Database.
*
* @return version
*/
String
version
();
/**
* Get these configurations.
*
* @return configs
*/
String
config
();
/**
* Get the list of supported version.
*
* @return version list
*/
String
[]
getSupportVersionPrefix
();
/**
* Send data points by metric & start time & end time.
*
* @param metric metric
* @param start startTime
* @param end endTime
* @param recordSender sender
*/
void
sendDPs
(
String
metric
,
Long
start
,
Long
end
,
RecordSender
recordSender
)
throws
Exception
;
/**
* Put data point.
*
* @param dp data point
* @return whether the data point is written successfully
*/
boolean
put
(
DataPoint4TSDB
dp
);
/**
* Put data points.
*
* @param dps data points
* @return whether the data point is written successfully
*/
boolean
put
(
List
<
DataPoint4TSDB
>
dps
);
/**
* Put data points.
*
* @param dps data points
* @return whether the data point is written successfully
*/
boolean
put
(
String
dps
);
/**
* Whether current version is supported.
*
* @return true: supported; false: not yet!
*/
boolean
isSupported
();
}
tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/DataPoint4TSDB.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
conn
;
import
com.alibaba.fastjson.JSON
;
import
java.util.Map
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:DataPoint for TSDB
*
* @author Benedict Jin
* @since 2019-04-10
*/
public
class
DataPoint4TSDB
{
private
long
timestamp
;
private
String
metric
;
private
Map
<
String
,
String
>
tags
;
private
Object
value
;
public
DataPoint4TSDB
()
{
}
public
DataPoint4TSDB
(
long
timestamp
,
String
metric
,
Map
<
String
,
String
>
tags
,
Object
value
)
{
this
.
timestamp
=
timestamp
;
this
.
metric
=
metric
;
this
.
tags
=
tags
;
this
.
value
=
value
;
}
public
long
getTimestamp
()
{
return
timestamp
;
}
public
void
setTimestamp
(
long
timestamp
)
{
this
.
timestamp
=
timestamp
;
}
public
String
getMetric
()
{
return
metric
;
}
public
void
setMetric
(
String
metric
)
{
this
.
metric
=
metric
;
}
public
Map
<
String
,
String
>
getTags
()
{
return
tags
;
}
public
void
setTags
(
Map
<
String
,
String
>
tags
)
{
this
.
tags
=
tags
;
}
public
Object
getValue
()
{
return
value
;
}
public
void
setValue
(
Object
value
)
{
this
.
value
=
value
;
}
@Override
public
String
toString
()
{
return
JSON
.
toJSONString
(
this
);
}
}
tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/TSDBConnection.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
conn
;
import
com.alibaba.datax.common.plugin.RecordSender
;
import
com.alibaba.datax.plugin.writer.util.TSDBUtils
;
import
com.alibaba.fastjson.JSON
;
import
org.apache.commons.lang3.StringUtils
;
import
java.util.List
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:TSDB Connection
*
* @author Benedict Jin
* @since 2019-03-29
*/
public
class
TSDBConnection
implements
Connection4TSDB
{
private
String
address
;
public
TSDBConnection
(
String
address
)
{
if
(
StringUtils
.
isBlank
(
address
))
{
throw
new
RuntimeException
(
"TSDBConnection init failed because address is blank!"
);
}
this
.
address
=
address
;
}
@Override
public
String
address
()
{
return
address
;
}
@Override
public
String
version
()
{
return
TSDBUtils
.
version
(
address
);
}
@Override
public
String
config
()
{
return
TSDBUtils
.
config
(
address
);
}
@Override
public
String
[]
getSupportVersionPrefix
()
{
return
new
String
[]{
"2.4.1"
,
"2.4.2"
};
}
@Override
public
void
sendDPs
(
String
metric
,
Long
start
,
Long
end
,
RecordSender
recordSender
)
{
throw
new
RuntimeException
(
"Not support yet!"
);
}
@Override
public
boolean
put
(
DataPoint4TSDB
dp
)
{
return
TSDBUtils
.
put
(
address
,
dp
);
}
@Override
public
boolean
put
(
List
<
DataPoint4TSDB
>
dps
)
{
return
TSDBUtils
.
put
(
address
,
dps
);
}
@Override
public
boolean
put
(
String
dps
)
{
return
TSDBUtils
.
put
(
address
,
dps
);
}
@Override
public
boolean
isSupported
()
{
String
versionJson
=
version
();
if
(
StringUtils
.
isBlank
(
versionJson
))
{
throw
new
RuntimeException
(
"Cannot get the version!"
);
}
String
version
=
JSON
.
parseObject
(
versionJson
).
getString
(
"version"
);
if
(
StringUtils
.
isBlank
(
version
))
{
return
false
;
}
for
(
String
prefix
:
getSupportVersionPrefix
())
{
if
(
version
.
startsWith
(
prefix
))
{
return
true
;
}
}
return
false
;
}
}
tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/Constant.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
tsdbwriter
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Key
*
* @author Benedict Jin
* @since 2019-04-18
*/
public
final
class
Constant
{
static
final
int
DEFAULT_BATCH_SIZE
=
100
;
static
final
int
DEFAULT_TRY_SIZE
=
3
;
static
final
boolean
DEFAULT_IGNORE_WRITE_ERROR
=
false
;
}
tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/Key.java
0 → 100755
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
tsdbwriter
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Key
*
* @author Benedict Jin
* @since 2019-04-18
*/
public
class
Key
{
static
final
String
ENDPOINT
=
"endpoint"
;
static
final
String
BATCH_SIZE
=
"batchSize"
;
static
final
String
MAX_RETRY_TIME
=
"maxRetryTime"
;
static
final
String
IGNORE_WRITE_ERROR
=
"ignoreWriteError"
;
}
tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriter.java
0 → 100755
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
tsdbwriter
;
import
com.alibaba.datax.common.element.Record
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.plugin.RecordReceiver
;
import
com.alibaba.datax.common.spi.Writer
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.common.util.RetryUtil
;
import
com.alibaba.datax.plugin.writer.conn.TSDBConnection
;
import
org.apache.commons.lang3.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.concurrent.Callable
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:TSDB Http Writer
*
* @author Benedict Jin
* @since 2019-04-18
*/
@SuppressWarnings
(
"unused"
)
public
class
TSDBWriter
extends
Writer
{
public
static
class
Job
extends
Writer
.
Job
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
Job
.
class
);
private
Configuration
originalConfig
;
@Override
public
void
init
()
{
this
.
originalConfig
=
super
.
getPluginJobConf
();
String
address
=
this
.
originalConfig
.
getString
(
Key
.
ENDPOINT
);
if
(
StringUtils
.
isBlank
(
address
))
{
throw
DataXException
.
asDataXException
(
TSDBWriterErrorCode
.
REQUIRED_VALUE
,
"The parameter ["
+
Key
.
ENDPOINT
+
"] is not set."
);
}
Integer
batchSize
=
this
.
originalConfig
.
getInt
(
Key
.
BATCH_SIZE
);
if
(
batchSize
==
null
||
batchSize
<
1
)
{
originalConfig
.
set
(
Key
.
BATCH_SIZE
,
Constant
.
DEFAULT_BATCH_SIZE
);
LOG
.
info
(
"The parameter ["
+
Key
.
BATCH_SIZE
+
"] will be default value: "
+
Constant
.
DEFAULT_BATCH_SIZE
);
}
Integer
retrySize
=
this
.
originalConfig
.
getInt
(
Key
.
MAX_RETRY_TIME
);
if
(
retrySize
==
null
||
retrySize
<
0
)
{
originalConfig
.
set
(
Key
.
MAX_RETRY_TIME
,
Constant
.
DEFAULT_TRY_SIZE
);
LOG
.
info
(
"The parameter ["
+
Key
.
MAX_RETRY_TIME
+
"] will be default value: "
+
Constant
.
DEFAULT_TRY_SIZE
);
}
Boolean
ignoreWriteError
=
this
.
originalConfig
.
getBool
(
Key
.
IGNORE_WRITE_ERROR
);
if
(
ignoreWriteError
==
null
)
{
originalConfig
.
set
(
Key
.
IGNORE_WRITE_ERROR
,
Constant
.
DEFAULT_IGNORE_WRITE_ERROR
);
LOG
.
info
(
"The parameter ["
+
Key
.
IGNORE_WRITE_ERROR
+
"] will be default value: "
+
Constant
.
DEFAULT_IGNORE_WRITE_ERROR
);
}
}
@Override
public
void
prepare
()
{
}
@Override
public
List
<
Configuration
>
split
(
int
mandatoryNumber
)
{
ArrayList
<
Configuration
>
configurations
=
new
ArrayList
<
Configuration
>(
mandatoryNumber
);
for
(
int
i
=
0
;
i
<
mandatoryNumber
;
i
++)
{
configurations
.
add
(
this
.
originalConfig
.
clone
());
}
return
configurations
;
}
@Override
public
void
post
()
{
}
@Override
public
void
destroy
()
{
}
}
public
static
class
Task
extends
Writer
.
Task
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
Task
.
class
);
private
TSDBConnection
conn
;
private
int
batchSize
;
private
int
retrySize
;
private
boolean
ignoreWriteError
;
@Override
public
void
init
()
{
Configuration
writerSliceConfig
=
getPluginJobConf
();
String
address
=
writerSliceConfig
.
getString
(
Key
.
ENDPOINT
);
this
.
conn
=
new
TSDBConnection
(
address
);
this
.
batchSize
=
writerSliceConfig
.
getInt
(
Key
.
BATCH_SIZE
);
this
.
retrySize
=
writerSliceConfig
.
getInt
(
Key
.
MAX_RETRY_TIME
);
this
.
ignoreWriteError
=
writerSliceConfig
.
getBool
(
Key
.
IGNORE_WRITE_ERROR
);
}
@Override
public
void
prepare
()
{
}
@Override
public
void
startWrite
(
RecordReceiver
recordReceiver
)
{
try
{
Record
lastRecord
=
null
;
Record
record
;
int
count
=
0
;
StringBuilder
dps
=
new
StringBuilder
();
while
((
record
=
recordReceiver
.
getFromReader
())
!=
null
)
{
final
int
recordLength
=
record
.
getColumnNumber
();
for
(
int
i
=
0
;
i
<
recordLength
;
i
++)
{
dps
.
append
(
record
.
getColumn
(
i
).
asString
());
dps
.
append
(
","
);
count
++;
if
(
count
==
batchSize
)
{
count
=
0
;
batchPut
(
record
,
"["
+
dps
.
substring
(
0
,
dps
.
length
()
-
1
)
+
"]"
);
dps
=
new
StringBuilder
();
}
}
lastRecord
=
record
;
}
if
(
StringUtils
.
isNotBlank
(
dps
.
toString
()))
{
batchPut
(
lastRecord
,
"["
+
dps
.
substring
(
0
,
dps
.
length
()
-
1
)
+
"]"
);
}
}
catch
(
Exception
e
)
{
throw
DataXException
.
asDataXException
(
TSDBWriterErrorCode
.
RUNTIME_EXCEPTION
,
e
);
}
}
private
void
batchPut
(
final
Record
record
,
final
String
dps
)
{
try
{
RetryUtil
.
executeWithRetry
(
new
Callable
<
Integer
>()
{
@Override
public
Integer
call
()
{
if
(!
conn
.
put
(
dps
))
{
getTaskPluginCollector
().
collectDirtyRecord
(
record
,
"Put data points failed!"
);
throw
DataXException
.
asDataXException
(
TSDBWriterErrorCode
.
RUNTIME_EXCEPTION
,
"Put data points failed!"
);
}
return
0
;
}
},
retrySize
,
60000L
,
true
);
}
catch
(
Exception
e
)
{
if
(
ignoreWriteError
)
{
LOG
.
warn
(
"Ignore write exceptions and continue writing."
);
}
else
{
throw
DataXException
.
asDataXException
(
TSDBWriterErrorCode
.
RETRY_WRITER_EXCEPTION
,
e
);
}
}
}
@Override
public
void
post
()
{
}
@Override
public
void
destroy
()
{
}
}
}
tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriterErrorCode.java
0 → 100755
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
tsdbwriter
;
import
com.alibaba.datax.common.spi.ErrorCode
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:TSDB Http Writer Error Code
*
* @author Benedict Jin
* @since 2019-04-18
*/
public
enum
TSDBWriterErrorCode
implements
ErrorCode
{
REQUIRED_VALUE
(
"TSDBWriter-00"
,
"Missing the necessary value"
),
RUNTIME_EXCEPTION
(
"TSDBWriter-01"
,
"Runtime exception"
),
RETRY_WRITER_EXCEPTION
(
"TSDBWriter-02"
,
"After repeated attempts, the write still fails"
);
private
final
String
code
;
private
final
String
description
;
TSDBWriterErrorCode
(
String
code
,
String
description
)
{
this
.
code
=
code
;
this
.
description
=
description
;
}
@Override
public
String
getCode
()
{
return
this
.
code
;
}
@Override
public
String
getDescription
()
{
return
this
.
description
;
}
@Override
public
String
toString
()
{
return
String
.
format
(
"Code:[%s], Description:[%s]. "
,
this
.
code
,
this
.
description
);
}
}
tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/HttpUtils.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
util
;
import
com.alibaba.fastjson.JSON
;
import
org.apache.http.client.fluent.Content
;
import
org.apache.http.client.fluent.Request
;
import
org.apache.http.entity.ContentType
;
import
java.nio.charset.Charset
;
import
java.util.Map
;
import
java.util.concurrent.TimeUnit
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:HttpUtils
*
* @author Benedict Jin
* @since 2019-03-29
*/
public
final
class
HttpUtils
{
public
final
static
Charset
UTF_8
=
Charset
.
forName
(
"UTF-8"
);
public
final
static
int
CONNECT_TIMEOUT_DEFAULT_IN_MILL
=
(
int
)
TimeUnit
.
SECONDS
.
toMillis
(
60
);
public
final
static
int
SOCKET_TIMEOUT_DEFAULT_IN_MILL
=
(
int
)
TimeUnit
.
SECONDS
.
toMillis
(
60
);
private
HttpUtils
()
{
}
public
static
String
get
(
String
url
)
throws
Exception
{
Content
content
=
Request
.
Get
(
url
)
.
connectTimeout
(
CONNECT_TIMEOUT_DEFAULT_IN_MILL
)
.
socketTimeout
(
SOCKET_TIMEOUT_DEFAULT_IN_MILL
)
.
execute
()
.
returnContent
();
if
(
content
==
null
)
{
return
null
;
}
return
content
.
asString
(
UTF_8
);
}
public
static
String
post
(
String
url
,
Map
<
String
,
Object
>
params
)
throws
Exception
{
return
post
(
url
,
JSON
.
toJSONString
(
params
),
CONNECT_TIMEOUT_DEFAULT_IN_MILL
,
SOCKET_TIMEOUT_DEFAULT_IN_MILL
);
}
public
static
String
post
(
String
url
,
String
params
)
throws
Exception
{
return
post
(
url
,
params
,
CONNECT_TIMEOUT_DEFAULT_IN_MILL
,
SOCKET_TIMEOUT_DEFAULT_IN_MILL
);
}
public
static
String
post
(
String
url
,
Map
<
String
,
Object
>
params
,
int
connectTimeoutInMill
,
int
socketTimeoutInMill
)
throws
Exception
{
return
post
(
url
,
JSON
.
toJSONString
(
params
),
connectTimeoutInMill
,
socketTimeoutInMill
);
}
public
static
String
post
(
String
url
,
String
params
,
int
connectTimeoutInMill
,
int
socketTimeoutInMill
)
throws
Exception
{
Content
content
=
Request
.
Post
(
url
)
.
connectTimeout
(
connectTimeoutInMill
)
.
socketTimeout
(
socketTimeoutInMill
)
.
addHeader
(
"Content-Type"
,
"application/json"
)
.
bodyString
(
params
,
ContentType
.
APPLICATION_JSON
)
.
execute
()
.
returnContent
();
if
(
content
==
null
)
{
return
null
;
}
return
content
.
asString
(
UTF_8
);
}
}
tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/TSDBUtils.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
util
;
import
com.alibaba.datax.plugin.writer.conn.DataPoint4TSDB
;
import
com.alibaba.fastjson.JSON
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.List
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:TSDB Utils
*
* @author Benedict Jin
* @since 2019-03-29
*/
public
final
class
TSDBUtils
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
TSDBUtils
.
class
);
private
TSDBUtils
()
{
}
public
static
String
version
(
String
address
)
{
String
url
=
String
.
format
(
"%s/api/version"
,
address
);
String
rsp
;
try
{
rsp
=
HttpUtils
.
get
(
url
);
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
e
);
}
return
rsp
;
}
public
static
String
config
(
String
address
)
{
String
url
=
String
.
format
(
"%s/api/config"
,
address
);
String
rsp
;
try
{
rsp
=
HttpUtils
.
get
(
url
);
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
e
);
}
return
rsp
;
}
public
static
boolean
put
(
String
address
,
List
<
DataPoint4TSDB
>
dps
)
{
return
put
(
address
,
JSON
.
toJSON
(
dps
));
}
public
static
boolean
put
(
String
address
,
DataPoint4TSDB
dp
)
{
return
put
(
address
,
JSON
.
toJSON
(
dp
));
}
private
static
boolean
put
(
String
address
,
Object
o
)
{
return
put
(
address
,
o
.
toString
());
}
public
static
boolean
put
(
String
address
,
String
s
)
{
String
url
=
String
.
format
(
"%s/api/put"
,
address
);
String
rsp
;
try
{
rsp
=
HttpUtils
.
post
(
url
,
s
);
// If successful, the returned content should be null.
assert
rsp
==
null
;
}
catch
(
Exception
e
)
{
LOG
.
error
(
"Address: {}, DataPoints: {}"
,
url
,
s
);
throw
new
RuntimeException
(
e
);
}
return
true
;
}
}
tsdbwriter/src/main/resources/plugin.json
0 → 100755
View file @
4d70b5ab
{
"name"
:
"tsdbwriter"
,
"class"
:
"com.alibaba.datax.plugin.writer.tsdbwriter.TSDBWriter"
,
"description"
:
{
"useScene"
:
"往 TSDB 中摄入数据点"
,
"mechanism"
:
"调用 TSDB 的 /api/put 接口,实现数据点的写入"
,
"warn"
:
""
},
"developer"
:
"Benedict Jin"
}
tsdbwriter/src/main/resources/plugin_job_template.json
0 → 100644
View file @
4d70b5ab
{
"name"
:
"tsdbwriter"
,
"parameter"
:
{
"endpoint"
:
"http://localhost:8242"
}
}
tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/conn/TSDBConnectionTest.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
conn
;
import
org.junit.Assert
;
import
org.junit.Ignore
;
import
org.junit.Test
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:TSDBConnection Test
*
* @author Benedict Jin
* @since 2019-03-29
*/
@Ignore
public
class
TSDBConnectionTest
{
private
static
final
String
TSDB_ADDRESS
=
"http://localhost:8240"
;
@Test
public
void
testVersion
()
{
String
version
=
new
TSDBConnection
(
TSDB_ADDRESS
).
version
();
Assert
.
assertNotNull
(
version
);
}
@Test
public
void
testIsSupported
()
{
Assert
.
assertTrue
(
new
TSDBConnection
(
TSDB_ADDRESS
).
isSupported
());
}
}
tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/util/Const.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
util
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:Const
*
* @author Benedict Jin
* @since 2019-03-29
*/
final
class
Const
{
private
Const
()
{
}
static
final
String
OPENTSDB_ADDRESS
=
"http://localhost:8242"
;
static
final
String
TSDB_ADDRESS
=
"http://localhost:8240"
;
}
tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/util/HttpUtilsTest.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
util
;
import
org.junit.Assert
;
import
org.junit.Ignore
;
import
org.junit.Test
;
import
java.util.HashMap
;
import
java.util.Map
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:HttpUtils Test
*
* @author Benedict Jin
* @since 2019-03-29
*/
@Ignore
public
class
HttpUtilsTest
{
@Test
public
void
testSimpleCase
()
throws
Exception
{
String
url
=
"https://httpbin.org/post"
;
Map
<
String
,
Object
>
params
=
new
HashMap
<
String
,
Object
>();
params
.
put
(
"foo"
,
"bar"
);
String
rsp
=
HttpUtils
.
post
(
url
,
params
);
System
.
out
.
println
(
rsp
);
Assert
.
assertNotNull
(
rsp
);
}
@Test
public
void
testGet
()
throws
Exception
{
String
url
=
String
.
format
(
"%s/api/version"
,
Const
.
OPENTSDB_ADDRESS
);
String
rsp
=
HttpUtils
.
get
(
url
);
System
.
out
.
println
(
rsp
);
Assert
.
assertNotNull
(
rsp
);
}
}
tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/util/TSDBTest.java
0 → 100644
View file @
4d70b5ab
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
util
;
import
org.junit.Assert
;
import
org.junit.Ignore
;
import
org.junit.Test
;
/**
* Copyright @ 2019 alibaba.com
* All right reserved.
* Function:TSDB Test
*
* @author Benedict Jin
* @since 2019-04-11
*/
@Ignore
public
class
TSDBTest
{
@Test
public
void
testVersion
()
{
String
version
=
TSDBUtils
.
version
(
Const
.
TSDB_ADDRESS
);
Assert
.
assertNotNull
(
version
);
System
.
out
.
println
(
version
);
version
=
TSDBUtils
.
version
(
Const
.
OPENTSDB_ADDRESS
);
Assert
.
assertNotNull
(
version
);
System
.
out
.
println
(
version
);
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment