Commit 24714bc5 authored by 黎博's avatar 黎博

修复数据库同步整体逻辑判断

parent 9d773ade
...@@ -3,6 +3,7 @@ package cn.qg.holmes.service.effect.impl; ...@@ -3,6 +3,7 @@ package cn.qg.holmes.service.effect.impl;
import cn.qg.holmes.service.effect.DatabaseSyncService; import cn.qg.holmes.service.effect.DatabaseSyncService;
import cn.qg.holmes.utils.RedisUtils; import cn.qg.holmes.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -369,12 +370,14 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -369,12 +370,14 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
public boolean syncSingleTableToDest(String ip, String port, String username, String password, String dbName, String tableName, boolean businessData) { public boolean syncSingleTableToDest(String ip, String port, String username, String password, String dbName, String tableName, boolean businessData) {
String driver = "com.mysql.jdbc.Driver"; String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://" + ip + ":" + port + "/" + dbName; String url = "jdbc:mysql://" + ip + ":" + port + "/" + dbName;
String urlWithoutDb = "jdbc:mysql://" + ip + ":" + port;
String createDbKey = dbSyncPrefix + dbName; String createDbKey = dbSyncPrefix + dbName;
String tableCreateKey = dbSyncPrefix + dbName + ":" + tableName + ":create"; String tableCreateKey = dbSyncPrefix + dbName + ":" + tableName + ":create";
String tableInsertKey = dbSyncPrefix + dbName + ":" + tableName + ":insert"; String tableInsertKey = dbSyncPrefix + dbName + ":" + tableName + ":insert";
Connection connection = null; Connection connection = null;
Connection connectionWithoutDb = null;
PreparedStatement preparedStatement = null; PreparedStatement preparedStatement = null;
Statement statement = null; Statement statement = null;
...@@ -383,26 +386,31 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -383,26 +386,31 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
try { try {
Class.forName(driver); Class.forName(driver);
connection = DriverManager.getConnection(url, username, password); // connection = DriverManager.getConnection(url, username, password);
connection.setAutoCommit(false); // connection.setAutoCommit(false);
DatabaseMetaData databaseMetaData = connection.getMetaData(); // DatabaseMetaData databaseMetaData = connection.getMetaData();
String createDbSql = redisUtils.get(createDbKey).toString(); String createDbSql = redisUtils.get(createDbKey).toString();
String createTableSql = redisUtils.get(tableCreateKey).toString(); String createTableSql = redisUtils.get(tableCreateKey).toString();
String insertTableSql = redisUtils.get(tableInsertKey).toString(); String insertTableSql = redisUtils.get(tableInsertKey).toString();
String dropTableSql = String.format("DROP TABLE if exists %s", tableName);
// 暂存原有数据 // 暂存原有数据
StringBuilder backupDataSql = new StringBuilder(); StringBuilder backupDataSql = new StringBuilder();
List<String> dbList = getDatabaseList(ip, port, username, password); List<String> dbList = getDatabaseList(ip, port, username, password);
// 仅同步表结构,并保留业务数据
if (businessData) {
log.info("开始同步{}.{}表,保留业务数据", dbName, tableName);
// 如果已有该数据库
if (dbList.contains(dbName)) { if (dbList.contains(dbName)) {
// 环境中数据库已存在
// 创建数据库连接
connection = DriverManager.getConnection(url, username, password);
connection.setAutoCommit(false);
DatabaseMetaData databaseMetaData = connection.getMetaData();
// 获取表列表
List<String> tableList = getTableListByDb(ip, port, username, password, dbName); List<String> tableList = getTableListByDb(ip, port, username, password, dbName);
// 如果数据库中已存在该表
if (tableList.contains(tableName)) { if (tableList.contains(tableName)) {
// 如果表存在
if (businessData) {
// 需要保留业务数据
log.info("因需要保留业务数据,因此先备份环境中{}.{}表的数据", dbName, tableName); log.info("因需要保留业务数据,因此先备份环境中{}.{}表的数据", dbName, tableName);
ResultSet columnResultSet = databaseMetaData.getColumns(null, "%", tableName, "%"); ResultSet columnResultSet = databaseMetaData.getColumns(null, "%", tableName, "%");
while (columnResultSet.next()) { while (columnResultSet.next()) {
...@@ -447,7 +455,6 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -447,7 +455,6 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
// 如果表存在,则首先删除表 // 如果表存在,则首先删除表
log.info("表数据备份完成,删除环境中的{}.{}表", dbName, tableName); log.info("表数据备份完成,删除环境中的{}.{}表", dbName, tableName);
String dropTableSql = String.format("DROP TABLE if exists %s", tableName);
preparedStatement = connection.prepareStatement(dropTableSql); preparedStatement = connection.prepareStatement(dropTableSql);
preparedStatement.execute(); preparedStatement.execute();
...@@ -456,92 +463,81 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -456,92 +463,81 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
preparedStatement = connection.prepareStatement(createTableSql); preparedStatement = connection.prepareStatement(createTableSql);
preparedStatement.execute(); preparedStatement.execute();
// 插入环境备份的数据 // 环境备份数据不为空,插入环境备份的数据
log.info("开始插入备份的数据!"); if (backupDataSql.length() > 0) {
log.info("开始插入环境中备份的数据!");
statement = connection.createStatement(); statement = connection.createStatement();
for (String sql: backupDataSql.toString().split("\n")) { for (String sql: backupDataSql.toString().split("\n")) {
statement.addBatch(sql); statement.addBatch(sql);
} }
} else {
// 表不存在,直接创建表
log.info("表不存在,因此不需要备份环境中的数据,直接创建表:{}.{}", dbName, tableName);
preparedStatement = connection.prepareStatement(createTableSql);
preparedStatement.execute();
// 插入同步库里备份的数据
log.info("开始插入同步库中的数据.");
statement = connection.createStatement();
for (String sql: insertTableSql.split("\n")) {
statement.addBatch(sql);
}
} }
} else { } else {
// 没有数据库,首先创建数据库 // 不需要保留业务数据
log.info("环境中没有该数据库,因此不需要备份环境中的数据,直接创建数据库:{}", dbName); // 表存在,则首先删除表
preparedStatement = connection.prepareStatement(createDbSql); preparedStatement = connection.prepareStatement(dropTableSql);
preparedStatement.execute(); preparedStatement.execute();
// 然后创建表 // 重新创建表
log.info("创建表: {}.{}", dbName, tableName); log.info("使用同步库中的表结构重新创建表: {}.{}", dbName, tableName);
preparedStatement = connection.prepareStatement(createTableSql); preparedStatement = connection.prepareStatement(createTableSql);
preparedStatement.execute(); preparedStatement.execute();
// 插入同步库里备份的数据 // 同步库中数据不为空时,插入同步库中的数据
log.info("开始插入同步库中的数据."); if (!StringUtils.isEmpty(insertTableSql)) {
log.info("开始插入同步库中的数据!");
statement = connection.createStatement(); statement = connection.createStatement();
for (String sql: insertTableSql.split("\n")) { for (String sql: insertTableSql.split("\n")) {
statement.addBatch(sql); statement.addBatch(sql);
} }
} }
}
} else { } else {
// 不保留业务数据,使用同步库里的表结构和数据 // 表不存在,创建表
// 如果数据库存在 log.info("使用同步库中的表结构创建表: {}.{}", dbName, tableName);
log.info("本次同步{}.{}表,不保留业务数据", dbName, tableName);
if (dbList.contains(dbName)) {
// 如果表存在,则首先删除表
String dropTableSql = String.format("DROP TABLE if exists %s", tableName);
log.info("首先执行删除表的SQL: {}", dropTableSql);
preparedStatement = connection.prepareStatement(dropTableSql);
preparedStatement.execute();
// 重新创建表
log.info("重新创建表: {}", tableName);
preparedStatement = connection.prepareStatement(createTableSql); preparedStatement = connection.prepareStatement(createTableSql);
preparedStatement.execute(); preparedStatement.execute();
log.info("插入同步库里的数据到表:{}.{}", dbName, tableName); // 同步库中数据不为空时,插入同步库中的数据
if (!StringUtils.isEmpty(insertTableSql)) {
log.info("开始插入同步库中的数据!");
statement = connection.createStatement(); statement = connection.createStatement();
// 插入同步库里的数据
for (String sql: insertTableSql.split("\n")) { for (String sql: insertTableSql.split("\n")) {
statement.addBatch(sql); statement.addBatch(sql);
} }
}
}
} else { } else {
// 没有数据库,首先创建数据库 // 环境中数据库不存在, 此时无论是否保留业务数据
log.info("数据库不存在,首先创建数据库: {}", dbName); // 创建数据库
preparedStatement = connection.prepareStatement(createDbSql); log.info("环境中不存在数据库,首先创建数据库:{}", dbName);
connectionWithoutDb = DriverManager.getConnection(urlWithoutDb, username, password);
connectionWithoutDb.setAutoCommit(false);
preparedStatement = connectionWithoutDb.prepareStatement(createDbSql);
preparedStatement.execute(); preparedStatement.execute();
// 然后创建表 connection = DriverManager.getConnection(url, username, password);
log.info("表不存在,直接创建表: {}", tableName); connection.setAutoCommit(false);
// 创建表
log.info("使用同步库中的表结构创建表: {}.{}", dbName, tableName);
preparedStatement = connection.prepareStatement(createTableSql); preparedStatement = connection.prepareStatement(createTableSql);
preparedStatement.execute(); preparedStatement.execute();
// 插入同步库里备份的数据 // 同步库中数据不为空时,插入同步库中的数据
log.info("插入同步库里的数据到表:{}.{}", dbName, tableName); if (!StringUtils.isEmpty(insertTableSql)) {
log.info("开始插入同步库中的数据!");
statement = connection.createStatement(); statement = connection.createStatement();
for (String sql: insertTableSql.split("\n")) { for (String sql: insertTableSql.split("\n")) {
statement.addBatch(sql); statement.addBatch(sql);
} }
} }
} }
if (statement != null) { if (statement != null) {
statement.executeBatch(); statement.executeBatch();
statement.clearBatch(); statement.clearBatch();
} }
connection.commit(); connection.commit();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
log.info("表{}.{}同步失败.", dbName, tableName); log.info("表{}.{}同步失败.", dbName, tableName);
...@@ -551,6 +547,9 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -551,6 +547,9 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
if (connection != null) { if (connection != null) {
connection.close(); connection.close();
} }
if (connectionWithoutDb != null) {
connectionWithoutDb.close();
}
if (preparedStatement != null) { if (preparedStatement != null) {
preparedStatement.close(); preparedStatement.close();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment