Commit 6db1ce63 authored by 黎博's avatar 黎博

优化同步数据库

parent 1058b9e3
...@@ -215,102 +215,17 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -215,102 +215,17 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
*/ */
@Override @Override
public boolean syncDbToDest(String ip, String port, String username, String password, String dbName, String namespace, boolean businessData) { public boolean syncDbToDest(String ip, String port, String username, String password, String dbName, String namespace, boolean businessData) {
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://" + ip + ":" + port;
String dbRedisKey = dbSyncPrefix + dbName;
String tableListKey = dbSyncPrefix + dbName + ":tables";
Connection connection = null;
Connection newConnection = null;
PreparedStatement preparedStatement = null;
try { try {
Class.forName(driver); String tableListKey = dbSyncPrefix + dbName + ":tables";
// connection = DriverManager.getConnection(url, username, password);
// 删除原有数据库
// String dropDbSql = String.format("DROP DATABASE IF EXISTS %s", dbName);
// log.info("删除原有数据库:{}", dropDbSql);
// preparedStatement = connection.prepareStatement(dropDbSql);
// preparedStatement.execute();
// 重新创建数据库
// String createDbSql = redisUtils.get(dbRedisKey).toString();
// log.info("重新创建数据库:{}", createDbSql);
// preparedStatement = connection.prepareStatement(createDbSql);
// preparedStatement.execute();
// connection.close();
// 重新设置连接,并指定数据库
newConnection = DriverManager.getConnection(url + "/" + dbName, username, password);
newConnection.setAutoCommit(false);
List<String> tableList = Arrays.asList(redisUtils.get(tableListKey).toString().split("\n")); List<String> tableList = Arrays.asList(redisUtils.get(tableListKey).toString().split("\n"));
Statement statement = newConnection.createStatement(); if (tableList.size() > 0) {
// 循环处理每个表
for (String tableName: tableList) { for (String tableName: tableList) {
log.info("开始同步表:{}", tableName);
long dataStartTime = System.currentTimeMillis();
syncSingleTableToDest(ip, port, username, password, dbName, tableName, businessData); syncSingleTableToDest(ip, port, username, password, dbName, tableName, businessData);
// String createTableKey = dbSyncPrefix + dbName + ":" + tableName + ":create" ;
// String insertTableKey = dbSyncPrefix + dbName + ":" + tableName + ":insert" ;
// String createTableValue = redisUtils.get(createTableKey).toString();
// String insertTableValue = redisUtils.get(insertTableKey).toString();
//
// // 不为空时才执行建表语句
// if (!createTableValue.isEmpty()) {
// statement.execute(createTableValue);
// }
//
// // 不为空时才插入数据
// if (!insertTableValue.isEmpty()) {
// for (String insertSql: insertTableValue.split("\n")) {
// statement.addBatch(insertSql);
// }
// statement.executeBatch();
// statement.clearBatch();
// }
long dataEndTime = System.currentTimeMillis();
log.info("{}表同步完成,共花费{}秒", tableName, (dataEndTime - dataStartTime) / 1000);
}
// 判断是否需要update
if (dbName.equals("cash_loan_flow")) {
for (String sql: cashLoanFlowSql.split("\n")) {
log.info("执行update sql: {}", sql);
statement.addBatch(sql);
}
statement.executeBatch();
statement.clearBatch();
}
if (dbName.equals("contract")) {
for (String sql: contractSql.split("\n")) {
log.info("执行update sql: {}", sql);
statement.addBatch(sql);
}
statement.executeBatch();
statement.clearBatch();
}
if (dbName.equals("payment_center")) {
for (String sql: paymentCenterSql.split("\n")) {
log.info("执行update sql: {}", sql);
statement.addBatch(sql);
} }
statement.executeBatch();
statement.clearBatch();
}
newConnection.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (newConnection != null) {
newConnection.close();
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} return false;
} }
return true; return true;
} }
...@@ -482,7 +397,7 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -482,7 +397,7 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
// 仅同步表结构,并保留业务数据 // 仅同步表结构,并保留业务数据
if (businessData) { if (businessData) {
log.info("本次同步{}.{}表,保留业务数据", dbName, tableName); log.info("开始同步{}.{}表,保留业务数据", dbName, tableName);
// 如果已有该数据库 // 如果已有该数据库
if (dbList.contains(dbName)) { if (dbList.contains(dbName)) {
List<String> tableList = getTableListByDb(ip, port, username, password, dbName); List<String> tableList = getTableListByDb(ip, port, username, password, dbName);
...@@ -629,6 +544,7 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -629,6 +544,7 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
log.info("表{}.{}同步失败.", dbName, tableName);
return false; return false;
} finally { } finally {
try { try {
...@@ -645,6 +561,7 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -645,6 +561,7 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
e.printStackTrace(); e.printStackTrace();
} }
} }
log.info("表{}.{}同步完成.", dbName, tableName);
return true; return true;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment