Commit 5ff6550d authored by 刘洪 's avatar 刘洪

bug:windows下换行符 ^M

parent 046a31e1
......@@ -29,7 +29,7 @@ import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;
public class HdfsHelper {
public class HdfsHelper {
public static final Logger LOG = LoggerFactory.getLogger(HdfsWriter.Job.class);
public FileSystem fileSystem = null;
public JobConf conf = null;
......@@ -39,10 +39,10 @@ public class HdfsHelper {
// Kerberos
private Boolean haveKerberos = false;
private String kerberosKeytabFilePath;
private String kerberosPrincipal;
private String kerberosKeytabFilePath;
private String kerberosPrincipal;
public void getFileSystem(String defaultFS, Configuration taskConfig){
public void getFileSystem(String defaultFS, Configuration taskConfig) {
hadoopConf = new org.apache.hadoop.conf.Configuration();
Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG);
......@@ -57,7 +57,7 @@ public class HdfsHelper {
//是否有Kerberos认证
this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false);
if(haveKerberos){
if (haveKerberos) {
this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);
this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL);
hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos");
......@@ -71,14 +71,14 @@ public class HdfsHelper {
"message:defaultFS =" + defaultFS);
LOG.error(message);
throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
}catch (Exception e) {
} catch (Exception e) {
String message = String.format("获取FileSystem失败,请检查HDFS地址是否正确: [%s]",
"message:defaultFS =" + defaultFS);
LOG.error(message);
throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
}
if(null == fileSystem || null == conf){
if (null == fileSystem || null == conf) {
String message = String.format("获取FileSystem失败,请检查HDFS地址是否正确: [%s]",
"message:defaultFS =" + defaultFS);
LOG.error(message);
......@@ -86,8 +86,8 @@ public class HdfsHelper {
}
}
private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath){
if(haveKerberos && StringUtils.isNotBlank(this.kerberosPrincipal) && StringUtils.isNotBlank(this.kerberosKeytabFilePath)){
private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath) {
if (haveKerberos && StringUtils.isNotBlank(this.kerberosPrincipal) && StringUtils.isNotBlank(this.kerberosKeytabFilePath)) {
UserGroupInformation.setConfiguration(this.hadoopConf);
try {
UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
......@@ -101,19 +101,19 @@ public class HdfsHelper {
}
/**
*获取指定目录先的文件列表
* 获取指定目录先的文件列表
*
* @param dir
* @return
* 拿到的是文件全路径,
* @return 拿到的是文件全路径,
* eg:hdfs://10.101.204.12:9000/user/hive/warehouse/writer.db/text/test.textfile
*/
public String[] hdfsDirList(String dir){
public String[] hdfsDirList(String dir) {
Path path = new Path(dir);
String[] files = null;
try {
FileStatus[] status = fileSystem.listStatus(path);
files = new String[status.length];
for(int i=0;i<status.length;i++){
for (int i = 0; i < status.length; i++) {
files[i] = status[i].getPath().toString();
}
} catch (IOException e) {
......@@ -126,24 +126,25 @@ public class HdfsHelper {
/**
* 获取以fileName__ 开头的文件列表
*
* @param dir
* @param fileName
* @return
*/
public Path[] hdfsDirList(String dir,String fileName){
public Path[] hdfsDirList(String dir, String fileName) {
Path path = new Path(dir);
Path[] files = null;
String filterFileName = fileName + "__*";
try {
PathFilter pathFilter = new GlobFilter(filterFileName);
FileStatus[] status = fileSystem.listStatus(path,pathFilter);
FileStatus[] status = fileSystem.listStatus(path, pathFilter);
files = new Path[status.length];
for(int i=0;i<status.length;i++){
for (int i = 0; i < status.length; i++) {
files[i] = status[i].getPath();
}
} catch (IOException e) {
String message = String.format("获取目录[%s]下文件名以[%s]开头的文件列表时发生网络IO异常,请检查您的网络是否正常!",
dir,fileName);
dir, fileName);
LOG.error(message);
throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
}
......@@ -177,11 +178,11 @@ public class HdfsHelper {
return isDir;
}
public void deleteFiles(Path[] paths){
for(int i=0;i<paths.length;i++){
public void deleteFiles(Path[] paths) {
for (int i = 0; i < paths.length; i++) {
LOG.info(String.format("delete file [%s].", paths[i].toString()));
try {
fileSystem.delete(paths[i],true);
fileSystem.delete(paths[i], true);
} catch (IOException e) {
String message = String.format("删除文件[%s]时发生IO异常,请检查您的网络是否正常!",
paths[i].toString());
......@@ -191,10 +192,10 @@ public class HdfsHelper {
}
}
public void deleteDir(Path path){
LOG.info(String.format("start delete tmp dir [%s] .",path.toString()));
public void deleteDir(Path path) {
LOG.info(String.format("start delete tmp dir [%s] .", path.toString()));
try {
if(isPathexists(path.toString())) {
if (isPathexists(path.toString())) {
fileSystem.delete(path, true);
}
} catch (Exception e) {
......@@ -202,52 +203,52 @@ public class HdfsHelper {
LOG.error(message);
throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
}
LOG.info(String.format("finish delete tmp dir [%s] .",path.toString()));
LOG.info(String.format("finish delete tmp dir [%s] .", path.toString()));
}
public void renameFile(HashSet<String> tmpFiles, HashSet<String> endFiles){
public void renameFile(HashSet<String> tmpFiles, HashSet<String> endFiles) {
Path tmpFilesParent = null;
if(tmpFiles.size() != endFiles.size()){
if (tmpFiles.size() != endFiles.size()) {
String message = String.format("临时目录下文件名个数与目标文件名个数不一致!");
LOG.error(message);
throw DataXException.asDataXException(HdfsWriterErrorCode.HDFS_RENAME_FILE_ERROR, message);
}else{
try{
for (Iterator it1=tmpFiles.iterator(),it2=endFiles.iterator();it1.hasNext()&&it2.hasNext();){
} else {
try {
for (Iterator it1 = tmpFiles.iterator(), it2 = endFiles.iterator(); it1.hasNext() && it2.hasNext(); ) {
String srcFile = it1.next().toString();
String dstFile = it2.next().toString();
Path srcFilePah = new Path(srcFile);
Path dstFilePah = new Path(dstFile);
if(tmpFilesParent == null){
if (tmpFilesParent == null) {
tmpFilesParent = srcFilePah.getParent();
}
LOG.info(String.format("start rename file [%s] to file [%s].", srcFile,dstFile));
LOG.info(String.format("start rename file [%s] to file [%s].", srcFile, dstFile));
boolean renameTag = false;
long fileLen = fileSystem.getFileStatus(srcFilePah).getLen();
if(fileLen>0){
renameTag = fileSystem.rename(srcFilePah,dstFilePah);
if(!renameTag){
if (fileLen > 0) {
renameTag = fileSystem.rename(srcFilePah, dstFilePah);
if (!renameTag) {
String message = String.format("重命名文件[%s]失败,请检查您的网络是否正常!", srcFile);
LOG.error(message);
throw DataXException.asDataXException(HdfsWriterErrorCode.HDFS_RENAME_FILE_ERROR, message);
}
LOG.info(String.format("finish rename file [%s] to file [%s].", srcFile,dstFile));
}else{
LOG.info(String.format("finish rename file [%s] to file [%s].", srcFile, dstFile));
} else {
LOG.info(String.format("文件[%s]内容为空,请检查写入是否正常!", srcFile));
}
}
}catch (Exception e) {
} catch (Exception e) {
String message = String.format("重命名文件时发生异常,请检查您的网络是否正常!");
LOG.error(message);
throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
}finally {
} finally {
deleteDir(tmpFilesParent);
}
}
}
//关闭FileSystem
public void closeFileSystem(){
public void closeFileSystem() {
try {
fileSystem.close();
} catch (IOException e) {
......@@ -259,7 +260,7 @@ public class HdfsHelper {
//textfile格式文件
public FSDataOutputStream getOutputStream(String path){
public FSDataOutputStream getOutputStream(String path) {
Path storePath = new Path(path);
FSDataOutputStream fSDataOutputStream = null;
try {
......@@ -275,26 +276,27 @@ public class HdfsHelper {
/**
* 写textfile类型文件
*
* @param lineReceiver
* @param config
* @param fileName
* @param taskPluginCollector
*/
public void textFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,
TaskPluginCollector taskPluginCollector){
TaskPluginCollector taskPluginCollector) {
char fieldDelimiter = config.getChar(Key.FIELD_DELIMITER);
List<Configuration> columns = config.getListConfiguration(Key.COLUMN);
String compress = config.getString(Key.COMPRESS,null);
List<Configuration> columns = config.getListConfiguration(Key.COLUMN);
String compress = config.getString(Key.COMPRESS, null);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm");
String attempt = "attempt_"+dateFormat.format(new Date())+"_0001_m_000000_0";
String attempt = "attempt_" + dateFormat.format(new Date()) + "_0001_m_000000_0";
Path outputPath = new Path(fileName);
//todo 需要进一步确定TASK_ATTEMPT_ID
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
FileOutputFormat outFormat = new TextOutputFormat();
outFormat.setOutputPath(conf, outputPath);
outFormat.setWorkOutputPath(conf, outputPath);
if(null != compress) {
if (null != compress) {
Class<? extends CompressionCodec> codecClass = getCompressCodec(compress);
if (null != codecClass) {
outFormat.setOutputCompressorClass(conf, codecClass);
......@@ -306,7 +308,7 @@ public class HdfsHelper {
while ((record = lineReceiver.getFromReader()) != null) {
MutablePair<Text, Boolean> transportResult = transportOneRecord(record, fieldDelimiter, columns, taskPluginCollector);
if (!transportResult.getRight()) {
writer.write(NullWritable.get(),transportResult.getLeft());
writer.write(NullWritable.get(), transportResult.getLeft());
}
}
writer.close(Reporter.NULL);
......@@ -321,11 +323,11 @@ public class HdfsHelper {
public static MutablePair<Text, Boolean> transportOneRecord(
Record record, char fieldDelimiter, List<Configuration> columnsConfiguration, TaskPluginCollector taskPluginCollector) {
MutablePair<List<Object>, Boolean> transportResultList = transportOneRecord(record,columnsConfiguration,taskPluginCollector);
MutablePair<List<Object>, Boolean> transportResultList = transportOneRecord(record, columnsConfiguration, taskPluginCollector);
//保存<转换后的数据,是否是脏数据>
MutablePair<Text, Boolean> transportResult = new MutablePair<Text, Boolean>();
transportResult.setRight(false);
if(null != transportResultList){
if (null != transportResultList) {
Text recordResult = new Text(StringUtils.join(transportResultList.getLeft(), fieldDelimiter));
transportResult.setRight(transportResultList.getRight());
transportResult.setLeft(recordResult);
......@@ -333,20 +335,20 @@ public class HdfsHelper {
return transportResult;
}
public Class<? extends CompressionCodec> getCompressCodec(String compress){
public Class<? extends CompressionCodec> getCompressCodec(String compress) {
Class<? extends CompressionCodec> codecClass = null;
if(null == compress){
if (null == compress) {
codecClass = null;
}else if("GZIP".equalsIgnoreCase(compress)){
} else if ("GZIP".equalsIgnoreCase(compress)) {
codecClass = org.apache.hadoop.io.compress.GzipCodec.class;
}else if ("BZIP2".equalsIgnoreCase(compress)) {
} else if ("BZIP2".equalsIgnoreCase(compress)) {
codecClass = org.apache.hadoop.io.compress.BZip2Codec.class;
}else if("SNAPPY".equalsIgnoreCase(compress)){
} else if ("SNAPPY".equalsIgnoreCase(compress)) {
//todo 等需求明确后支持 需要用户安装SnappyCodec
codecClass = org.apache.hadoop.io.compress.SnappyCodec.class;
// org.apache.hadoop.hive.ql.io.orc.ZlibCodec.class not public
//codecClass = org.apache.hadoop.hive.ql.io.orc.ZlibCodec.class;
}else {
} else {
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
String.format("目前不支持您配置的 compress 模式 : [%s]", compress));
}
......@@ -355,24 +357,25 @@ public class HdfsHelper {
/**
* 写orcfile类型文件
*
* @param lineReceiver
* @param config
* @param fileName
* @param taskPluginCollector
*/
public void orcFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,
TaskPluginCollector taskPluginCollector){
List<Configuration> columns = config.getListConfiguration(Key.COLUMN);
TaskPluginCollector taskPluginCollector) {
List<Configuration> columns = config.getListConfiguration(Key.COLUMN);
String compress = config.getString(Key.COMPRESS, null);
List<String> columnNames = getColumnNames(columns);
List<ObjectInspector> columnTypeInspectors = getColumnTypeInspectors(columns);
StructObjectInspector inspector = (StructObjectInspector)ObjectInspectorFactory
StructObjectInspector inspector = (StructObjectInspector) ObjectInspectorFactory
.getStandardStructObjectInspector(columnNames, columnTypeInspectors);
OrcSerde orcSerde = new OrcSerde();
FileOutputFormat outFormat = new OrcOutputFormat();
if(!"NONE".equalsIgnoreCase(compress) && null != compress ) {
if (!"NONE".equalsIgnoreCase(compress) && null != compress) {
Class<? extends CompressionCodec> codecClass = getCompressCodec(compress);
if (null != codecClass) {
outFormat.setOutputCompressorClass(conf, codecClass);
......@@ -382,7 +385,7 @@ public class HdfsHelper {
RecordWriter writer = outFormat.getRecordWriter(fileSystem, conf, fileName, Reporter.NULL);
Record record = null;
while ((record = lineReceiver.getFromReader()) != null) {
MutablePair<List<Object>, Boolean> transportResult = transportOneRecord(record,columns,taskPluginCollector);
MutablePair<List<Object>, Boolean> transportResult = transportOneRecord(record, columns, taskPluginCollector);
if (!transportResult.getRight()) {
writer.write(NullWritable.get(), orcSerde.serialize(transportResult.getLeft(), inspector));
}
......@@ -397,7 +400,7 @@ public class HdfsHelper {
}
}
public List<String> getColumnNames(List<Configuration> columns){
public List<String> getColumnNames(List<Configuration> columns) {
List<String> columnNames = Lists.newArrayList();
for (Configuration eachColumnConf : columns) {
columnNames.add(eachColumnConf.getString(Key.NAME));
......@@ -407,11 +410,12 @@ public class HdfsHelper {
/**
* 根据writer配置的字段类型,构建inspector
*
* @param columns
* @return
*/
public List<ObjectInspector> getColumnTypeInspectors(List<Configuration> columns){
List<ObjectInspector> columnTypeInspectors = Lists.newArrayList();
public List<ObjectInspector> getColumnTypeInspectors(List<Configuration> columns) {
List<ObjectInspector> columnTypeInspectors = Lists.newArrayList();
for (Configuration eachColumnConf : columns) {
SupportHiveDataType columnType = SupportHiveDataType.valueOf(eachColumnConf.getString(Key.TYPE).toUpperCase());
ObjectInspector objectInspector = null;
......@@ -463,7 +467,7 @@ public class HdfsHelper {
return columnTypeInspectors;
}
public OrcSerde getOrcSerde(Configuration config){
public OrcSerde getOrcSerde(Configuration config) {
String fieldDelimiter = config.getString(Key.FIELD_DELIMITER);
String compress = config.getString(Key.COMPRESS);
String encoding = config.getString(Key.ENCODING);
......@@ -479,8 +483,8 @@ public class HdfsHelper {
}
public static MutablePair<List<Object>, Boolean> transportOneRecord(
Record record,List<Configuration> columnsConfiguration,
TaskPluginCollector taskPluginCollector){
Record record, List<Configuration> columnsConfiguration,
TaskPluginCollector taskPluginCollector) {
MutablePair<List<Object>, Boolean> transportResult = new MutablePair<List<Object>, Boolean>();
transportResult.setRight(false);
......@@ -519,7 +523,7 @@ public class HdfsHelper {
case STRING:
case VARCHAR:
case CHAR:
recordList.add(column.asString().replaceAll("\n", " "));
recordList.add(trimNewLine(column.asString()));
break;
case BOOLEAN:
recordList.add(column.asBoolean());
......@@ -548,7 +552,7 @@ public class HdfsHelper {
transportResult.setRight(true);
break;
}
}else {
} else {
// warn: it's all ok if nullFormat is null
recordList.add(null);
}
......@@ -557,4 +561,10 @@ public class HdfsHelper {
transportResult.setLeft(recordList);
return transportResult;
}
private static String trimNewLine(String var) {
return var.replaceAll("\r\n", " ")
.replaceAll("\n", " ")
.replaceAll("\r", " ");
}
}
......@@ -313,7 +313,7 @@ public class UnstructuredStorageWriterUtil {
if (StringUtils.isBlank(data)) {
splitedRows.add(data);
} else {
splitedRows.add(data.replaceAll("\n", "\\n").replaceAll("\t", "\\t"));
splitedRows.add(escape(data));
}
} else {
if (null != dateParse) {
......@@ -336,4 +336,11 @@ public class UnstructuredStorageWriterUtil {
taskPluginCollector.collectDirtyRecord(record, e);
}
}
private static String escape(String var) {
return var
.replaceAll("\n", "\\n")
.replaceAll("\r", "\\r")
.replaceAll("\t", "\\t");
}
}
......@@ -74,7 +74,7 @@
<!-- <module>odpswriter</module>-->
<module>txtfilewriter</module>
<!-- <module>ftpwriter</module>-->
<!-- <module>hdfswriter</module>-->
<module>hdfswriter</module>
<!-- <module>streamwriter</module>-->
<!-- <module>otswriter</module>-->
<!-- <module>oraclewriter</module>-->
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment