Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
D
DataX
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
risk-feature
DataX
Commits
ca32cbd4
Commit
ca32cbd4
authored
May 29, 2019
by
qingdao.gqs
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
hbase11xsqlwriter支持phoenix thinclient
parent
20471dd4
Changes
8
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
644 additions
and
390 deletions
+644
-390
pom.xml
hbase11xsqlwriter/pom.xml
+22
-0
Constant.java
...ibaba/datax/plugin/writer/hbase11xsqlwriter/Constant.java
+1
-0
HbaseSQLHelper.java
...datax/plugin/writer/hbase11xsqlwriter/HbaseSQLHelper.java
+127
-3
HbaseSQLWriterConfig.java
...plugin/writer/hbase11xsqlwriter/HbaseSQLWriterConfig.java
+62
-18
HbaseSQLWriterTask.java
...x/plugin/writer/hbase11xsqlwriter/HbaseSQLWriterTask.java
+23
-10
Key.java
...om/alibaba/datax/plugin/writer/hbase11xsqlwriter/Key.java
+7
-0
ThinClientPTable.java
...tax/plugin/writer/hbase11xsqlwriter/ThinClientPTable.java
+402
-0
SqlFormatUtil.java
...va/com/alibaba/datax/plugin/rdbms/util/SqlFormatUtil.java
+0
-359
No files found.
hbase11xsqlwriter/pom.xml
View file @
ca32cbd4
...
@@ -18,6 +18,8 @@
...
@@ -18,6 +18,8 @@
<phoenix.version>
4.11.0-HBase-1.1
</phoenix.version>
<phoenix.version>
4.11.0-HBase-1.1
</phoenix.version>
<hadoop.version>
2.7.1
</hadoop.version>
<hadoop.version>
2.7.1
</hadoop.version>
<commons-codec.version>
1.8
</commons-codec.version>
<commons-codec.version>
1.8
</commons-codec.version>
<protobuf.version>
3.2.0
</protobuf.version>
<httpclient.version>
4.4.1
</httpclient.version>
</properties>
</properties>
<dependencies>
<dependencies>
...
@@ -47,6 +49,11 @@
...
@@ -47,6 +49,11 @@
<artifactId>
phoenix-core
</artifactId>
<artifactId>
phoenix-core
</artifactId>
<version>
${phoenix.version}
</version>
<version>
${phoenix.version}
</version>
</dependency>
</dependency>
<dependency>
<groupId>
org.apache.phoenix
</groupId>
<artifactId>
phoenix-queryserver-client
</artifactId>
<version>
${phoenix.version}
</version>
</dependency>
<dependency>
<dependency>
<groupId>
com.google.guava
</groupId>
<groupId>
com.google.guava
</groupId>
<artifactId>
guava
</artifactId>
<artifactId>
guava
</artifactId>
...
@@ -58,6 +65,21 @@
...
@@ -58,6 +65,21 @@
<version>
${commons-codec.version}
</version>
<version>
${commons-codec.version}
</version>
</dependency>
</dependency>
<!-- httpclient begin -->
<dependency>
<groupId>
org.apache.httpcomponents
</groupId>
<artifactId>
httpclient
</artifactId>
<version>
${httpclient.version}
</version>
</dependency>
<!-- httpclient end -->
<dependency>
<groupId>
com.google.protobuf
</groupId>
<artifactId>
protobuf-java
</artifactId>
<version>
${protobuf.version}
</version>
</dependency>
<!-- for test -->
<!-- for test -->
<dependency>
<dependency>
<groupId>
junit
</groupId>
<groupId>
junit
</groupId>
...
...
hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/Constant.java
View file @
ca32cbd4
...
@@ -8,6 +8,7 @@ public final class Constant {
...
@@ -8,6 +8,7 @@ public final class Constant {
public
static
final
boolean
DEFAULT_LAST_COLUMN_IS_VERSION
=
false
;
// 默认最后一列不是version列
public
static
final
boolean
DEFAULT_LAST_COLUMN_IS_VERSION
=
false
;
// 默认最后一列不是version列
public
static
final
int
DEFAULT_BATCH_ROW_COUNT
=
256
;
// 默认一次写256行
public
static
final
int
DEFAULT_BATCH_ROW_COUNT
=
256
;
// 默认一次写256行
public
static
final
boolean
DEFAULT_TRUNCATE
=
false
;
// 默认开始的时候不清空表
public
static
final
boolean
DEFAULT_TRUNCATE
=
false
;
// 默认开始的时候不清空表
public
static
final
boolean
DEFAULT_USE_THIN_CLIENT
=
false
;
// 默认不用thin客户端
public
static
final
int
TYPE_UNSIGNED_TINYINT
=
11
;
public
static
final
int
TYPE_UNSIGNED_TINYINT
=
11
;
public
static
final
int
TYPE_UNSIGNED_SMALLINT
=
13
;
public
static
final
int
TYPE_UNSIGNED_SMALLINT
=
13
;
...
...
hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/HbaseSQLHelper.java
View file @
ca32cbd4
...
@@ -11,6 +11,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
...
@@ -11,6 +11,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import
org.apache.phoenix.schema.ColumnNotFoundException
;
import
org.apache.phoenix.schema.ColumnNotFoundException
;
import
org.apache.phoenix.schema.MetaDataClient
;
import
org.apache.phoenix.schema.MetaDataClient
;
import
org.apache.phoenix.schema.PTable
;
import
org.apache.phoenix.schema.PTable
;
import
org.apache.phoenix.schema.types.PDataType
;
import
org.apache.phoenix.util.SchemaUtil
;
import
org.apache.phoenix.util.SchemaUtil
;
import
org.slf4j.Logger
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.slf4j.LoggerFactory
;
...
@@ -18,7 +19,11 @@ import org.slf4j.LoggerFactory;
...
@@ -18,7 +19,11 @@ import org.slf4j.LoggerFactory;
import
java.io.IOException
;
import
java.io.IOException
;
import
java.sql.Connection
;
import
java.sql.Connection
;
import
java.sql.DriverManager
;
import
java.sql.DriverManager
;
import
java.sql.ResultSet
;
import
java.sql.ResultSetMetaData
;
import
java.sql.SQLException
;
import
java.sql.SQLException
;
import
java.sql.Statement
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Map
;
...
@@ -28,6 +33,8 @@ import java.util.Map;
...
@@ -28,6 +33,8 @@ import java.util.Map;
public
class
HbaseSQLHelper
{
public
class
HbaseSQLHelper
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
HbaseSQLHelper
.
class
);
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
HbaseSQLHelper
.
class
);
public
static
ThinClientPTable
ptable
;
/**
/**
* 将datax的配置解析成sql writer的配置
* 将datax的配置解析成sql writer的配置
*/
*/
...
@@ -53,6 +60,11 @@ public class HbaseSQLHelper {
...
@@ -53,6 +60,11 @@ public class HbaseSQLHelper {
return
new
Pair
<
String
,
String
>(
zkQuorum
,
znode
);
return
new
Pair
<
String
,
String
>(
zkQuorum
,
znode
);
}
}
public
static
Map
<
String
,
String
>
getThinConnectConfig
(
String
hbaseCfgString
)
{
assert
hbaseCfgString
!=
null
;
return
JSON
.
parseObject
(
hbaseCfgString
,
new
TypeReference
<
Map
<
String
,
String
>>()
{});
}
/**
/**
* 校验配置
* 校验配置
*/
*/
...
@@ -61,12 +73,12 @@ public class HbaseSQLHelper {
...
@@ -61,12 +73,12 @@ public class HbaseSQLHelper {
Connection
conn
=
getJdbcConnection
(
cfg
);
Connection
conn
=
getJdbcConnection
(
cfg
);
// 检查表:存在,可用
// 检查表:存在,可用
checkTable
(
conn
,
cfg
.
get
TableName
());
checkTable
(
conn
,
cfg
.
get
Namespace
(),
cfg
.
getTableName
(),
cfg
.
isThinClient
());
// 校验元数据:配置中给出的列必须是目的表中已经存在的列
// 校验元数据:配置中给出的列必须是目的表中已经存在的列
PTable
schema
=
null
;
PTable
schema
=
null
;
try
{
try
{
schema
=
getTableSchema
(
conn
,
cfg
.
get
TableName
());
schema
=
getTableSchema
(
conn
,
cfg
.
get
Namespace
(),
cfg
.
getTableName
(),
cfg
.
isThinClient
());
}
catch
(
SQLException
e
)
{
}
catch
(
SQLException
e
)
{
throw
DataXException
.
asDataXException
(
HbaseSQLWriterErrorCode
.
GET_HBASE_CONNECTION_ERROR
,
throw
DataXException
.
asDataXException
(
HbaseSQLWriterErrorCode
.
GET_HBASE_CONNECTION_ERROR
,
"无法获取目的表"
+
cfg
.
getTableName
()
+
"的元数据信息,表可能不是SQL表或表名配置错误,请检查您的配置 或者 联系 HBase 管理员."
,
e
);
"无法获取目的表"
+
cfg
.
getTableName
()
+
"的元数据信息,表可能不是SQL表或表名配置错误,请检查您的配置 或者 联系 HBase 管理员."
,
e
);
...
@@ -97,7 +109,11 @@ public class HbaseSQLHelper {
...
@@ -97,7 +109,11 @@ public class HbaseSQLHelper {
Connection
conn
;
Connection
conn
;
try
{
try
{
Class
.
forName
(
"org.apache.phoenix.jdbc.PhoenixDriver"
);
Class
.
forName
(
"org.apache.phoenix.jdbc.PhoenixDriver"
);
conn
=
DriverManager
.
getConnection
(
connStr
);
if
(
cfg
.
isThinClient
())
{
conn
=
getThinClientJdbcConnection
(
cfg
);
}
else
{
conn
=
DriverManager
.
getConnection
(
connStr
);
}
conn
.
setAutoCommit
(
false
);
conn
.
setAutoCommit
(
false
);
}
catch
(
Throwable
e
)
{
}
catch
(
Throwable
e
)
{
throw
DataXException
.
asDataXException
(
HbaseSQLWriterErrorCode
.
GET_HBASE_CONNECTION_ERROR
,
throw
DataXException
.
asDataXException
(
HbaseSQLWriterErrorCode
.
GET_HBASE_CONNECTION_ERROR
,
...
@@ -107,6 +123,32 @@ public class HbaseSQLHelper {
...
@@ -107,6 +123,32 @@ public class HbaseSQLHelper {
return
conn
;
return
conn
;
}
}
/**
* 创建 thin client jdbc连接
* @param cfg
* @return
* @throws SQLException
*/
public
static
Connection
getThinClientJdbcConnection
(
HbaseSQLWriterConfig
cfg
)
throws
SQLException
{
String
connStr
=
cfg
.
getConnectionString
();
LOG
.
info
(
"Connecting to HBase cluster ["
+
connStr
+
"] use thin client ..."
);
Connection
conn
=
DriverManager
.
getConnection
(
connStr
,
cfg
.
getUsername
(),
cfg
.
getPassword
());
String
userNamespaceQuery
=
"use "
+
cfg
.
getNamespace
();
Statement
statement
=
null
;
try
{
statement
=
conn
.
createStatement
();
statement
.
executeUpdate
(
userNamespaceQuery
);
return
conn
;
}
catch
(
Exception
e
)
{
throw
DataXException
.
asDataXException
(
HbaseSQLWriterErrorCode
.
GET_HBASE_CONNECTION_ERROR
,
"无法连接配置的namespace, 请检查配置 或者 联系 HBase 管理员."
,
e
);
}
finally
{
if
(
statement
!=
null
)
{
statement
.
close
();
}
}
}
/**
/**
* 获取一张表的元数据信息
* 获取一张表的元数据信息
* @param conn hbsae sql的jdbc连接
* @param conn hbsae sql的jdbc连接
...
@@ -121,6 +163,70 @@ public class HbaseSQLHelper {
...
@@ -121,6 +163,70 @@ public class HbaseSQLHelper {
return
mdc
.
updateCache
(
schemaName
,
tableName
).
getTable
();
return
mdc
.
updateCache
(
schemaName
,
tableName
).
getTable
();
}
}
/**
* 获取一张表的元数据信息
* @param conn
* @param namespace
* @param fullTableName
* @param isThinClient 是否使用thin client
* @return 表的元数据
* @throws SQLException
*/
public
static
PTable
getTableSchema
(
Connection
conn
,
String
namespace
,
String
fullTableName
,
boolean
isThinClient
)
throws
SQLException
{
LOG
.
info
(
"Start to get table schema of namespace="
+
namespace
+
" , fullTableName="
+
fullTableName
);
if
(!
isThinClient
)
{
return
getTableSchema
(
conn
,
fullTableName
);
}
else
{
if
(
ptable
==
null
)
{
ResultSet
result
=
conn
.
getMetaData
().
getColumns
(
null
,
namespace
,
fullTableName
,
null
);
try
{
ThinClientPTable
retTable
=
new
ThinClientPTable
();
retTable
.
setColTypeMap
(
parseColType
(
result
));
ptable
=
retTable
;
}
finally
{
if
(
result
!=
null
)
{
result
.
close
();
}
}
}
return
ptable
;
}
}
/**
* 解析字段
* @param rs
* @return
* @throws SQLException
*/
public
static
Map
<
String
,
ThinClientPTable
.
ThinClientPColumn
>
parseColType
(
ResultSet
rs
)
throws
SQLException
{
Map
<
String
,
ThinClientPTable
.
ThinClientPColumn
>
cols
=
new
HashMap
<
String
,
ThinClientPTable
.
ThinClientPColumn
>();
ResultSetMetaData
md
=
rs
.
getMetaData
();
int
columnCount
=
md
.
getColumnCount
();
while
(
rs
.
next
())
{
String
colName
=
null
;
PDataType
colType
=
null
;
for
(
int
i
=
1
;
i
<=
columnCount
;
i
++)
{
if
(
md
.
getColumnLabel
(
i
).
equals
(
"TYPE_NAME"
))
{
colType
=
PDataType
.
fromSqlTypeName
((
String
)
rs
.
getObject
(
i
));
}
else
if
(
md
.
getColumnLabel
(
i
).
equals
(
"COLUMN_NAME"
))
{
colName
=
(
String
)
rs
.
getObject
(
i
);
}
}
if
(
colType
==
null
||
colName
==
null
)
{
throw
new
SQLException
(
"ColType or colName is null, colType : "
+
colType
+
" , colName : "
+
colName
);
}
cols
.
put
(
colName
,
new
ThinClientPTable
.
ThinClientPColumn
(
colName
,
colType
));
}
return
cols
;
}
/**
/**
* 清空表
* 清空表
*/
*/
...
@@ -148,6 +254,24 @@ public class HbaseSQLHelper {
...
@@ -148,6 +254,24 @@ public class HbaseSQLHelper {
}
}
}
}
/**
* 检查表
* @param conn
* @param namespace
* @param tableName
* @param isThinClient
* @throws DataXException
*/
public
static
void
checkTable
(
Connection
conn
,
String
namespace
,
String
tableName
,
boolean
isThinClient
)
throws
DataXException
{
if
(!
isThinClient
)
{
checkTable
(
conn
,
tableName
);
}
else
{
//ignore check table when use thin client
}
}
/**
/**
* 检查表:表要存在,enabled
* 检查表:表要存在,enabled
*/
*/
...
...
hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/HbaseSQLWriterConfig.java
View file @
ca32cbd4
...
@@ -2,6 +2,7 @@ package com.alibaba.datax.plugin.writer.hbase11xsqlwriter;
...
@@ -2,6 +2,7 @@ package com.alibaba.datax.plugin.writer.hbase11xsqlwriter;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.exception.DataXException
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.alibaba.datax.common.util.Configuration
;
import
com.google.common.base.Strings
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.hadoop.hbase.TableName
;
import
org.apache.hadoop.hbase.TableName
;
import
org.apache.hadoop.hbase.util.Pair
;
import
org.apache.hadoop.hbase.util.Pair
;
...
@@ -9,6 +10,7 @@ import org.slf4j.Logger;
...
@@ -9,6 +10,7 @@ import org.slf4j.Logger;
import
org.slf4j.LoggerFactory
;
import
org.slf4j.LoggerFactory
;
import
java.util.List
;
import
java.util.List
;
import
java.util.Map
;
/**
/**
* HBase SQL writer config
* HBase SQL writer config
...
@@ -30,6 +32,10 @@ public class HbaseSQLWriterConfig {
...
@@ -30,6 +32,10 @@ public class HbaseSQLWriterConfig {
private
NullModeType
nullMode
;
private
NullModeType
nullMode
;
private
int
batchSize
;
// 一次批量写入多少行
private
int
batchSize
;
// 一次批量写入多少行
private
boolean
truncate
;
// 导入开始前是否要清空目的表
private
boolean
truncate
;
// 导入开始前是否要清空目的表
private
boolean
isThinClient
;
private
String
namespace
;
private
String
username
;
private
String
password
;
/**
/**
* @return 获取原始的datax配置
* @return 获取原始的datax配置
...
@@ -81,6 +87,22 @@ public class HbaseSQLWriterConfig {
...
@@ -81,6 +87,22 @@ public class HbaseSQLWriterConfig {
return
truncate
;
return
truncate
;
}
}
public
boolean
isThinClient
()
{
return
isThinClient
;
}
public
String
getNamespace
()
{
return
namespace
;
}
public
String
getPassword
()
{
return
password
;
}
public
String
getUsername
()
{
return
username
;
}
/**
/**
* @param dataxCfg
* @param dataxCfg
* @return
* @return
...
@@ -100,6 +122,7 @@ public class HbaseSQLWriterConfig {
...
@@ -100,6 +122,7 @@ public class HbaseSQLWriterConfig {
cfg
.
nullMode
=
NullModeType
.
getByTypeName
(
dataxCfg
.
getString
(
Key
.
NULL_MODE
,
Constant
.
DEFAULT_NULL_MODE
));
cfg
.
nullMode
=
NullModeType
.
getByTypeName
(
dataxCfg
.
getString
(
Key
.
NULL_MODE
,
Constant
.
DEFAULT_NULL_MODE
));
cfg
.
batchSize
=
dataxCfg
.
getInt
(
Key
.
BATCH_SIZE
,
Constant
.
DEFAULT_BATCH_ROW_COUNT
);
cfg
.
batchSize
=
dataxCfg
.
getInt
(
Key
.
BATCH_SIZE
,
Constant
.
DEFAULT_BATCH_ROW_COUNT
);
cfg
.
truncate
=
dataxCfg
.
getBool
(
Key
.
TRUNCATE
,
Constant
.
DEFAULT_TRUNCATE
);
cfg
.
truncate
=
dataxCfg
.
getBool
(
Key
.
TRUNCATE
,
Constant
.
DEFAULT_TRUNCATE
);
cfg
.
isThinClient
=
dataxCfg
.
getBool
(
Key
.
THIN_CLIENT
,
Constant
.
DEFAULT_USE_THIN_CLIENT
);
// 4. 打印解析出来的配置
// 4. 打印解析出来的配置
LOG
.
info
(
"HBase SQL writer config parsed:"
+
cfg
.
toString
());
LOG
.
info
(
"HBase SQL writer config parsed:"
+
cfg
.
toString
());
...
@@ -117,31 +140,52 @@ public class HbaseSQLWriterConfig {
...
@@ -117,31 +140,52 @@ public class HbaseSQLWriterConfig {
"读 Hbase 时需要配置hbaseConfig,其内容为 Hbase 连接信息,请联系 Hbase PE 获取该信息."
);
"读 Hbase 时需要配置hbaseConfig,其内容为 Hbase 连接信息,请联系 Hbase PE 获取该信息."
);
}
}
// 解析zk服务器和znode信息
Pair
<
String
,
String
>
zkCfg
;
if
(
dataxCfg
.
getBool
(
Key
.
THIN_CLIENT
,
Constant
.
DEFAULT_USE_THIN_CLIENT
))
{
try
{
Map
<
String
,
String
>
thinConnectConfig
=
HbaseSQLHelper
.
getThinConnectConfig
(
hbaseCfg
);
zkCfg
=
HbaseSQLHelper
.
getHbaseConfig
(
hbaseCfg
);
String
thinConnectStr
=
thinConnectConfig
.
get
(
Key
.
HBASE_THIN_CONNECT_URL
);
}
catch
(
Throwable
t
)
{
cfg
.
namespace
=
thinConnectConfig
.
get
(
Key
.
HBASE_THIN_CONNECT_NAMESPACE
);
// 解析hbase配置错误
cfg
.
username
=
thinConnectConfig
.
get
(
Key
.
HBASE_THIN_CONNECT_USERNAME
);
throw
DataXException
.
asDataXException
(
cfg
.
password
=
thinConnectConfig
.
get
(
Key
.
HBASE_THIN_CONNECT_PASSWORD
);
if
(
Strings
.
isNullOrEmpty
(
thinConnectStr
))
{
throw
DataXException
.
asDataXException
(
HbaseSQLWriterErrorCode
.
ILLEGAL_VALUE
,
"thinClient=true的轻客户端模式下HBase的hbase.thin.connect.url配置不能为空,请联系HBase PE获取该信息."
);
}
if
(
Strings
.
isNullOrEmpty
(
cfg
.
namespace
)
||
Strings
.
isNullOrEmpty
(
cfg
.
username
)
||
Strings
.
isNullOrEmpty
(
cfg
.
password
))
{
throw
DataXException
.
asDataXException
(
HbaseSQLWriterErrorCode
.
ILLEGAL_VALUE
,
"thinClient=true的轻客户端模式下HBase的hbase.thin.connect.namespce|username|password配置不能为空,请联系HBase "
+
"PE获取该信息."
);
}
cfg
.
connectionString
=
thinConnectStr
;
}
else
{
// 解析zk服务器和znode信息
Pair
<
String
,
String
>
zkCfg
;
try
{
zkCfg
=
HbaseSQLHelper
.
getHbaseConfig
(
hbaseCfg
);
}
catch
(
Throwable
t
)
{
// 解析hbase配置错误
throw
DataXException
.
asDataXException
(
HbaseSQLWriterErrorCode
.
REQUIRED_VALUE
,
HbaseSQLWriterErrorCode
.
REQUIRED_VALUE
,
"解析hbaseConfig出错,请确认您配置的hbaseConfig为合法的json数据格式,内容正确."
);
"解析hbaseConfig出错,请确认您配置的hbaseConfig为合法的json数据格式,内容正确."
);
}
}
String
zkQuorum
=
zkCfg
.
getFirst
();
String
zkQuorum
=
zkCfg
.
getFirst
();
String
znode
=
zkCfg
.
getSecond
();
String
znode
=
zkCfg
.
getSecond
();
if
(
zkQuorum
==
null
||
zkQuorum
.
isEmpty
())
{
if
(
zkQuorum
==
null
||
zkQuorum
.
isEmpty
())
{
throw
DataXException
.
asDataXException
(
throw
DataXException
.
asDataXException
(
HbaseSQLWriterErrorCode
.
ILLEGAL_VALUE
,
HbaseSQLWriterErrorCode
.
ILLEGAL_VALUE
,
"HBase的hbase.zookeeper.quorum配置不能为空,请联系HBase PE获取该信息."
);
"HBase的hbase.zookeeper.quorum配置不能为空,请联系HBase PE获取该信息."
);
}
}
if
(
znode
==
null
||
znode
.
isEmpty
())
{
if
(
znode
==
null
||
znode
.
isEmpty
())
{
throw
DataXException
.
asDataXException
(
throw
DataXException
.
asDataXException
(
HbaseSQLWriterErrorCode
.
ILLEGAL_VALUE
,
HbaseSQLWriterErrorCode
.
ILLEGAL_VALUE
,
"HBase的zookeeper.znode.parent配置不能为空,请联系HBase PE获取该信息."
);
"HBase的zookeeper.znode.parent配置不能为空,请联系HBase PE获取该信息."
);
}
}
// 生成sql使用的连接字符串, 格式: jdbc:phoenix:zk_quorum:2181:/znode_parent
// 生成sql使用的连接字符串, 格式: jdbc:phoenix:zk_quorum:2181:/znode_parent
cfg
.
connectionString
=
"jdbc:phoenix:"
+
zkQuorum
+
":2181:"
+
znode
;
cfg
.
connectionString
=
"jdbc:phoenix:"
+
zkQuorum
+
":2181:"
+
znode
;
}
}
}
private
static
void
parseTableConfig
(
HbaseSQLWriterConfig
cfg
,
Configuration
dataxCfg
)
{
private
static
void
parseTableConfig
(
HbaseSQLWriterConfig
cfg
,
Configuration
dataxCfg
)
{
...
...
hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/HbaseSQLWriterTask.java
View file @
ca32cbd4
...
@@ -157,12 +157,20 @@ public class HbaseSQLWriterTask {
...
@@ -157,12 +157,20 @@ public class HbaseSQLWriterTask {
private
PreparedStatement
createPreparedStatement
()
throws
SQLException
{
private
PreparedStatement
createPreparedStatement
()
throws
SQLException
{
// 生成列名集合,列之间用逗号分隔: col1,col2,col3,...
// 生成列名集合,列之间用逗号分隔: col1,col2,col3,...
StringBuilder
columnNamesBuilder
=
new
StringBuilder
();
StringBuilder
columnNamesBuilder
=
new
StringBuilder
();
for
(
String
col
:
cfg
.
getColumns
())
{
if
(
cfg
.
isThinClient
())
{
// 列名使用双引号,则不自动转换为全大写,而是保留用户配置的大小写
for
(
String
col
:
cfg
.
getColumns
())
{
columnNamesBuilder
.
append
(
"\""
);
// thin 客户端不使用双引号
columnNamesBuilder
.
append
(
col
);
columnNamesBuilder
.
append
(
col
);
columnNamesBuilder
.
append
(
"\""
);
columnNamesBuilder
.
append
(
","
);
columnNamesBuilder
.
append
(
","
);
}
}
else
{
for
(
String
col
:
cfg
.
getColumns
())
{
// 列名使用双引号,则不自动转换为全大写,而是保留用户配置的大小写
columnNamesBuilder
.
append
(
"\""
);
columnNamesBuilder
.
append
(
col
);
columnNamesBuilder
.
append
(
"\""
);
columnNamesBuilder
.
append
(
","
);
}
}
}
columnNamesBuilder
.
setLength
(
columnNamesBuilder
.
length
()
-
1
);
// 移除末尾多余的逗号
columnNamesBuilder
.
setLength
(
columnNamesBuilder
.
length
()
-
1
);
// 移除末尾多余的逗号
String
columnNames
=
columnNamesBuilder
.
toString
();
String
columnNames
=
columnNamesBuilder
.
toString
();
...
@@ -171,9 +179,13 @@ public class HbaseSQLWriterTask {
...
@@ -171,9 +179,13 @@ public class HbaseSQLWriterTask {
// 生成UPSERT模板
// 生成UPSERT模板
String
tableName
=
cfg
.
getTableName
();
String
tableName
=
cfg
.
getTableName
();
// 表名使用双引号,则不自动转换为全大写,而是保留用户配置的大小写
StringBuilder
upsertBuilder
=
null
;
StringBuilder
upsertBuilder
=
if
(
cfg
.
isThinClient
())
{
new
StringBuilder
(
"upsert into \""
+
tableName
+
"\" ("
+
columnNames
+
" ) values ("
);
upsertBuilder
=
new
StringBuilder
(
"upsert into "
+
tableName
+
" ("
+
columnNames
+
" ) values ("
);
}
else
{
// 表名使用双引号,则不自动转换为全大写,而是保留用户配置的大小写
upsertBuilder
=
new
StringBuilder
(
"upsert into \""
+
tableName
+
"\" ("
+
columnNames
+
" ) values ("
);
}
for
(
int
i
=
0
;
i
<
cfg
.
getColumns
().
size
();
i
++)
{
for
(
int
i
=
0
;
i
<
cfg
.
getColumns
().
size
();
i
++)
{
upsertBuilder
.
append
(
"?,"
);
upsertBuilder
.
append
(
"?,"
);
}
}
...
@@ -191,7 +203,8 @@ public class HbaseSQLWriterTask {
...
@@ -191,7 +203,8 @@ public class HbaseSQLWriterTask {
*/
*/
private
int
[]
getColumnSqlType
(
List
<
String
>
columnNames
)
throws
SQLException
{
private
int
[]
getColumnSqlType
(
List
<
String
>
columnNames
)
throws
SQLException
{
int
[]
types
=
new
int
[
numberOfColumnsToWrite
];
int
[]
types
=
new
int
[
numberOfColumnsToWrite
];
PTable
ptable
=
HbaseSQLHelper
.
getTableSchema
(
connection
,
cfg
.
getTableName
());
PTable
ptable
=
HbaseSQLHelper
.
getTableSchema
(
connection
,
cfg
.
getNamespace
(),
cfg
.
getTableName
(),
cfg
.
isThinClient
());
for
(
int
i
=
0
;
i
<
columnNames
.
size
();
i
++)
{
for
(
int
i
=
0
;
i
<
columnNames
.
size
();
i
++)
{
String
name
=
columnNames
.
get
(
i
);
String
name
=
columnNames
.
get
(
i
);
...
...
hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/Key.java
View file @
ca32cbd4
...
@@ -10,6 +10,10 @@ public final class Key {
...
@@ -10,6 +10,10 @@ public final class Key {
public
final
static
String
HBASE_CONFIG
=
"hbaseConfig"
;
public
final
static
String
HBASE_CONFIG
=
"hbaseConfig"
;
public
final
static
String
HBASE_ZK_QUORUM
=
HConstants
.
ZOOKEEPER_QUORUM
;
public
final
static
String
HBASE_ZK_QUORUM
=
HConstants
.
ZOOKEEPER_QUORUM
;
public
final
static
String
HBASE_ZNODE_PARENT
=
HConstants
.
ZOOKEEPER_ZNODE_PARENT
;
public
final
static
String
HBASE_ZNODE_PARENT
=
HConstants
.
ZOOKEEPER_ZNODE_PARENT
;
public
final
static
String
HBASE_THIN_CONNECT_URL
=
"hbase.thin.connect.url"
;
public
final
static
String
HBASE_THIN_CONNECT_NAMESPACE
=
"hbase.thin.connect.namespace"
;
public
final
static
String
HBASE_THIN_CONNECT_USERNAME
=
"hbase.thin.connect.username"
;
public
final
static
String
HBASE_THIN_CONNECT_PASSWORD
=
"hbase.thin.connect.password"
;
/**
/**
* 【必选】writer要写入的表的表名
* 【必选】writer要写入的表的表名
...
@@ -34,6 +38,9 @@ public final class Key {
...
@@ -34,6 +38,9 @@ public final class Key {
*/
*/
public
static
final
String
TRUNCATE
=
"truncate"
;
public
static
final
String
TRUNCATE
=
"truncate"
;
public
static
final
String
THIN_CLIENT
=
"thinClient"
;
/**
/**
* 【可选】批量写入的最大行数,默认100行
* 【可选】批量写入的最大行数,默认100行
*/
*/
...
...
hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/ThinClientPTable.java
0 → 100644
View file @
ca32cbd4
package
com
.
alibaba
.
datax
.
plugin
.
writer
.
hbase11xsqlwriter
;
import
org.apache.hadoop.hbase.io.ImmutableBytesWritable
;
import
org.apache.phoenix.hbase.index.util.KeyValueBuilder
;
import
org.apache.phoenix.index.IndexMaintainer
;
import
org.apache.phoenix.jdbc.PhoenixConnection
;
import
org.apache.phoenix.schema.AmbiguousColumnException
;
import
org.apache.phoenix.schema.ColumnFamilyNotFoundException
;
import
org.apache.phoenix.schema.ColumnNotFoundException
;
import
org.apache.phoenix.schema.PColumn
;
import
org.apache.phoenix.schema.PColumnFamily
;
import
org.apache.phoenix.schema.PIndexState
;
import
org.apache.phoenix.schema.PName
;
import
org.apache.phoenix.schema.PRow
;
import
org.apache.phoenix.schema.PTable
;
import
org.apache.phoenix.schema.PTableKey
;
import
org.apache.phoenix.schema.PTableType
;
import
org.apache.phoenix.schema.RowKeySchema
;
import
org.apache.phoenix.schema.SortOrder
;
import
org.apache.phoenix.schema.types.PDataType
;
import
java.util.List
;
import
java.util.Map
;
public
class
ThinClientPTable
implements
PTable
{
private
Map
<
String
,
ThinClientPColumn
>
colMap
;
public
void
setColTypeMap
(
Map
<
String
,
ThinClientPColumn
>
colMap
)
{
this
.
colMap
=
colMap
;
}
@Override
public
long
getTimeStamp
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
long
getSequenceNumber
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
long
getIndexDisableTimestamp
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
PName
getName
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
PName
getSchemaName
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
PName
getTableName
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
PName
getTenantId
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
PTableType
getType
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
PName
getPKName
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
List
<
PColumn
>
getPKColumns
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
List
<
PColumn
>
getColumns
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
List
<
PColumnFamily
>
getColumnFamilies
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
PColumnFamily
getColumnFamily
(
byte
[]
bytes
)
throws
ColumnFamilyNotFoundException
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
PColumnFamily
getColumnFamily
(
String
s
)
throws
ColumnFamilyNotFoundException
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
PColumn
getColumnForColumnName
(
String
colname
)
throws
ColumnNotFoundException
,
AmbiguousColumnException
{
if
(!
colMap
.
containsKey
(
colname
))
{
throw
new
ColumnNotFoundException
(
"Col "
+
colname
+
" not found"
);
}
return
colMap
.
get
(
colname
);
}
@Override
public
PColumn
getColumnForColumnQualifier
(
byte
[]
bytes
,
byte
[]
bytes1
)
throws
ColumnNotFoundException
,
AmbiguousColumnException
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
PColumn
getPKColumn
(
String
s
)
throws
ColumnNotFoundException
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
PRow
newRow
(
KeyValueBuilder
keyValueBuilder
,
long
l
,
ImmutableBytesWritable
immutableBytesWritable
,
boolean
b
,
byte
[]...
bytes
)
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
PRow
newRow
(
KeyValueBuilder
keyValueBuilder
,
ImmutableBytesWritable
immutableBytesWritable
,
boolean
b
,
byte
[]...
bytes
)
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
int
newKey
(
ImmutableBytesWritable
immutableBytesWritable
,
byte
[][]
bytes
)
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
RowKeySchema
getRowKeySchema
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
Integer
getBucketNum
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
List
<
PTable
>
getIndexes
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
PIndexState
getIndexState
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
PName
getParentName
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
PName
getParentTableName
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
PName
getParentSchemaName
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
List
<
PName
>
getPhysicalNames
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
PName
getPhysicalName
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
boolean
isImmutableRows
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
boolean
getIndexMaintainers
(
ImmutableBytesWritable
immutableBytesWritable
,
PhoenixConnection
phoenixConnection
)
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
IndexMaintainer
getIndexMaintainer
(
PTable
pTable
,
PhoenixConnection
phoenixConnection
)
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
PName
getDefaultFamilyName
()
{
return
null
;
}
@Override
public
boolean
isWALDisabled
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
boolean
isMultiTenant
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
boolean
getStoreNulls
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
boolean
isTransactional
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
ViewType
getViewType
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
String
getViewStatement
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
Short
getViewIndexId
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
PTableKey
getKey
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
IndexType
getIndexType
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
int
getBaseColumnCount
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
boolean
rowKeyOrderOptimizable
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
int
getRowTimestampColPos
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
long
getUpdateCacheFrequency
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
boolean
isNamespaceMapped
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
String
getAutoPartitionSeqName
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
boolean
isAppendOnlySchema
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
ImmutableStorageScheme
getImmutableStorageScheme
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
QualifierEncodingScheme
getEncodingScheme
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
EncodedCQCounter
getEncodedCQCounter
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
boolean
useStatsForParallelization
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
int
getEstimatedSize
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
public
static
class
ThinClientPColumn
implements
PColumn
{
private
String
colName
;
private
PDataType
pDataType
;
public
ThinClientPColumn
(
String
colName
,
PDataType
pDataType
)
{
this
.
colName
=
colName
;
this
.
pDataType
=
pDataType
;
}
@Override
public
PName
getName
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
PName
getFamilyName
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
int
getPosition
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
Integer
getArraySize
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
byte
[]
getViewConstant
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
boolean
isViewReferenced
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
int
getEstimatedSize
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
String
getExpressionStr
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
boolean
isRowTimestamp
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
boolean
isDynamic
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
byte
[]
getColumnQualifierBytes
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
boolean
isNullable
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
PDataType
getDataType
()
{
return
pDataType
;
}
@Override
public
Integer
getMaxLength
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
Integer
getScale
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
@Override
public
SortOrder
getSortOrder
()
{
throw
new
UnsupportedOperationException
(
"Not implement"
);
}
}
}
plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/SqlFormatUtil.java
deleted
100755 → 0
View file @
20471dd4
package
com
.
alibaba
.
datax
.
plugin
.
rdbms
.
util
;
import
java.util.HashSet
;
import
java.util.LinkedList
;
import
java.util.Set
;
import
java.util.StringTokenizer
;
// TODO delete it
public
class
SqlFormatUtil
{
private
static
final
Set
<
String
>
BEGIN_CLAUSES
=
new
HashSet
<
String
>();
private
static
final
Set
<
String
>
END_CLAUSES
=
new
HashSet
<
String
>();
private
static
final
Set
<
String
>
LOGICAL
=
new
HashSet
<
String
>();
private
static
final
Set
<
String
>
QUANTIFIERS
=
new
HashSet
<
String
>();
private
static
final
Set
<
String
>
DML
=
new
HashSet
<
String
>();
private
static
final
Set
<
String
>
MISC
=
new
HashSet
<
String
>();
private
static
final
String
WHITESPACE
=
" \n\r\f\t"
;
static
{
BEGIN_CLAUSES
.
add
(
"left"
);
BEGIN_CLAUSES
.
add
(
"right"
);
BEGIN_CLAUSES
.
add
(
"inner"
);
BEGIN_CLAUSES
.
add
(
"outer"
);
BEGIN_CLAUSES
.
add
(
"group"
);
BEGIN_CLAUSES
.
add
(
"order"
);
END_CLAUSES
.
add
(
"where"
);
END_CLAUSES
.
add
(
"set"
);
END_CLAUSES
.
add
(
"having"
);
END_CLAUSES
.
add
(
"join"
);
END_CLAUSES
.
add
(
"from"
);
END_CLAUSES
.
add
(
"by"
);
END_CLAUSES
.
add
(
"join"
);
END_CLAUSES
.
add
(
"into"
);
END_CLAUSES
.
add
(
"union"
);
LOGICAL
.
add
(
"and"
);
LOGICAL
.
add
(
"or"
);
LOGICAL
.
add
(
"when"
);
LOGICAL
.
add
(
"else"
);
LOGICAL
.
add
(
"end"
);
QUANTIFIERS
.
add
(
"in"
);
QUANTIFIERS
.
add
(
"all"
);
QUANTIFIERS
.
add
(
"exists"
);
QUANTIFIERS
.
add
(
"some"
);
QUANTIFIERS
.
add
(
"any"
);
DML
.
add
(
"insert"
);
DML
.
add
(
"update"
);
DML
.
add
(
"delete"
);
MISC
.
add
(
"select"
);
MISC
.
add
(
"on"
);
}
static
final
String
indentString
=
" "
;
static
final
String
initial
=
"\n "
;
public
static
String
format
(
String
source
)
{
return
new
FormatProcess
(
source
).
perform
();
}
private
static
class
FormatProcess
{
boolean
beginLine
=
true
;
boolean
afterBeginBeforeEnd
=
false
;
boolean
afterByOrSetOrFromOrSelect
=
false
;
boolean
afterValues
=
false
;
boolean
afterOn
=
false
;
boolean
afterBetween
=
false
;
boolean
afterInsert
=
false
;
int
inFunction
=
0
;
int
parensSinceSelect
=
0
;
private
LinkedList
<
Integer
>
parenCounts
=
new
LinkedList
<
Integer
>();
private
LinkedList
<
Boolean
>
afterByOrFromOrSelects
=
new
LinkedList
<
Boolean
>();
int
indent
=
1
;
StringBuilder
result
=
new
StringBuilder
();
StringTokenizer
tokens
;
String
lastToken
;
String
token
;
String
lcToken
;
public
FormatProcess
(
String
sql
)
{
tokens
=
new
StringTokenizer
(
sql
,
"()+*/-=<>'`\"[],"
+
WHITESPACE
,
true
);
}
public
String
perform
()
{
result
.
append
(
initial
);
while
(
tokens
.
hasMoreTokens
())
{
token
=
tokens
.
nextToken
();
lcToken
=
token
.
toLowerCase
();
if
(
"'"
.
equals
(
token
))
{
String
t
;
do
{
t
=
tokens
.
nextToken
();
token
+=
t
;
}
while
(!
"'"
.
equals
(
t
)
&&
tokens
.
hasMoreTokens
());
// cannot
// handle
// single
// quotes
}
else
if
(
"\""
.
equals
(
token
))
{
String
t
;
do
{
t
=
tokens
.
nextToken
();
token
+=
t
;
}
while
(!
"\""
.
equals
(
t
));
}
if
(
afterByOrSetOrFromOrSelect
&&
","
.
equals
(
token
))
{
commaAfterByOrFromOrSelect
();
}
else
if
(
afterOn
&&
","
.
equals
(
token
))
{
commaAfterOn
();
}
else
if
(
"("
.
equals
(
token
))
{
openParen
();
}
else
if
(
")"
.
equals
(
token
))
{
closeParen
();
}
else
if
(
BEGIN_CLAUSES
.
contains
(
lcToken
))
{
beginNewClause
();
}
else
if
(
END_CLAUSES
.
contains
(
lcToken
))
{
endNewClause
();
}
else
if
(
"select"
.
equals
(
lcToken
))
{
select
();
}
else
if
(
DML
.
contains
(
lcToken
))
{
updateOrInsertOrDelete
();
}
else
if
(
"values"
.
equals
(
lcToken
))
{
values
();
}
else
if
(
"on"
.
equals
(
lcToken
))
{
on
();
}
else
if
(
afterBetween
&&
lcToken
.
equals
(
"and"
))
{
misc
();
afterBetween
=
false
;
}
else
if
(
LOGICAL
.
contains
(
lcToken
))
{
logical
();
}
else
if
(
isWhitespace
(
token
))
{
white
();
}
else
{
misc
();
}
if
(!
isWhitespace
(
token
))
{
lastToken
=
lcToken
;
}
}
return
result
.
toString
();
}
private
void
commaAfterOn
()
{
out
();
indent
--;
newline
();
afterOn
=
false
;
afterByOrSetOrFromOrSelect
=
true
;
}
private
void
commaAfterByOrFromOrSelect
()
{
out
();
newline
();
}
private
void
logical
()
{
if
(
"end"
.
equals
(
lcToken
))
{
indent
--;
}
newline
();
out
();
beginLine
=
false
;
}
private
void
on
()
{
indent
++;
afterOn
=
true
;
newline
();
out
();
beginLine
=
false
;
}
private
void
misc
()
{
out
();
if
(
"between"
.
equals
(
lcToken
))
{
afterBetween
=
true
;
}
if
(
afterInsert
)
{
newline
();
afterInsert
=
false
;
}
else
{
beginLine
=
false
;
if
(
"case"
.
equals
(
lcToken
))
{
indent
++;
}
}
}
private
void
white
()
{
if
(!
beginLine
)
{
result
.
append
(
" "
);
}
}
private
void
updateOrInsertOrDelete
()
{
out
();
indent
++;
beginLine
=
false
;
if
(
"update"
.
equals
(
lcToken
))
{
newline
();
}
if
(
"insert"
.
equals
(
lcToken
))
{
afterInsert
=
true
;
}
}
private
void
select
()
{
out
();
indent
++;
newline
();
parenCounts
.
addLast
(
Integer
.
valueOf
(
parensSinceSelect
));
afterByOrFromOrSelects
.
addLast
(
Boolean
.
valueOf
(
afterByOrSetOrFromOrSelect
));
parensSinceSelect
=
0
;
afterByOrSetOrFromOrSelect
=
true
;
}
private
void
out
()
{
result
.
append
(
token
);
}
private
void
endNewClause
()
{
if
(!
afterBeginBeforeEnd
)
{
indent
--;
if
(
afterOn
)
{
indent
--;
afterOn
=
false
;
}
newline
();
}
out
();
if
(!
"union"
.
equals
(
lcToken
))
{
indent
++;
}
newline
();
afterBeginBeforeEnd
=
false
;
afterByOrSetOrFromOrSelect
=
"by"
.
equals
(
lcToken
)
||
"set"
.
equals
(
lcToken
)
||
"from"
.
equals
(
lcToken
);
}
private
void
beginNewClause
()
{
if
(!
afterBeginBeforeEnd
)
{
if
(
afterOn
)
{
indent
--;
afterOn
=
false
;
}
indent
--;
newline
();
}
out
();
beginLine
=
false
;
afterBeginBeforeEnd
=
true
;
}
private
void
values
()
{
indent
--;
newline
();
out
();
indent
++;
newline
();
afterValues
=
true
;
}
private
void
closeParen
()
{
parensSinceSelect
--;
if
(
parensSinceSelect
<
0
)
{
indent
--;
parensSinceSelect
=
parenCounts
.
removeLast
().
intValue
();
afterByOrSetOrFromOrSelect
=
afterByOrFromOrSelects
.
removeLast
().
booleanValue
();
}
if
(
inFunction
>
0
)
{
inFunction
--;
out
();
}
else
{
if
(!
afterByOrSetOrFromOrSelect
)
{
indent
--;
newline
();
}
out
();
}
beginLine
=
false
;
}
private
void
openParen
()
{
if
(
isFunctionName
(
lastToken
)
||
inFunction
>
0
)
{
inFunction
++;
}
beginLine
=
false
;
if
(
inFunction
>
0
)
{
out
();
}
else
{
out
();
if
(!
afterByOrSetOrFromOrSelect
)
{
indent
++;
newline
();
beginLine
=
true
;
}
}
parensSinceSelect
++;
}
private
static
boolean
isFunctionName
(
String
tok
)
{
final
char
begin
=
tok
.
charAt
(
0
);
final
boolean
isIdentifier
=
Character
.
isJavaIdentifierStart
(
begin
)
||
'"'
==
begin
;
return
isIdentifier
&&
!
LOGICAL
.
contains
(
tok
)
&&
!
END_CLAUSES
.
contains
(
tok
)
&&
!
QUANTIFIERS
.
contains
(
tok
)
&&
!
DML
.
contains
(
tok
)
&&
!
MISC
.
contains
(
tok
);
}
private
static
boolean
isWhitespace
(
String
token
)
{
return
WHITESPACE
.
indexOf
(
token
)
>=
0
;
}
private
void
newline
()
{
result
.
append
(
"\n"
);
for
(
int
i
=
0
;
i
<
indent
;
i
++)
{
result
.
append
(
indentString
);
}
beginLine
=
true
;
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment