Commit 0cbde284 authored by 黎博's avatar 黎博

调试同步单张表

parent d875f7c3
...@@ -2,6 +2,7 @@ package cn.qg.holmes.controller.effect; ...@@ -2,6 +2,7 @@ package cn.qg.holmes.controller.effect;
import cn.qg.holmes.common.JsonResult; import cn.qg.holmes.common.JsonResult;
import cn.qg.holmes.service.effect.DatabaseSyncService; import cn.qg.holmes.service.effect.DatabaseSyncService;
import cn.qg.holmes.service.k8s.K8sService;
import cn.qg.holmes.utils.JenkinsService; import cn.qg.holmes.utils.JenkinsService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -35,22 +36,33 @@ public class DbSyncController { ...@@ -35,22 +36,33 @@ public class DbSyncController {
@Value("${dbsync.mysql.password}") @Value("${dbsync.mysql.password}")
private String password; private String password;
@GetMapping("/one") @Autowired
public JsonResult syncSingleTable(@RequestParam String namespace, @RequestParam String dbName, @RequestParam String tableName) { K8sService k8sService;
/**
* 同步数据库
* @param namespace 环境
* @param dbName 数据库名称
* @param tableName 表名称
* @param businessData 是否保留业务数据
* @return
*/
@GetMapping("/new")
public JsonResult syncDatabase(@RequestParam String namespace, @RequestParam String dbName, @RequestParam String tableName, @RequestParam boolean businessData) {
try { try {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
Map<String, String> map = databaseSyncService.getMysqlInfoByNamespace(namespace); Map<String, String> map = k8sService.getMysqlAddressByNamespace(namespace);
String destIp = map.get("ip"); String destIp = map.get("host");
String destPort = map.get("port"); String destPort = map.get("port");
log.info("获取到{}环境的Mysql地址为:{}", namespace, destIp + ":" + destPort); log.info("获取到{}环境的Mysql地址为:{}", namespace, destIp + ":" + destPort);
if (tableName.equalsIgnoreCase("all") || tableName.equals("")) { if (tableName.equalsIgnoreCase("all") || tableName.equals("")) {
log.info("开始同步{}库下所有的表", dbName); log.info("开始同步{}库下所有表到{}环境,保留业务数据:{}", dbName, namespace, businessData);
databaseSyncService.getDbInfoFromSource(ip, port, username, password, dbName); databaseSyncService.getDbInfoFromSource(ip, port, username, password, dbName);
databaseSyncService.syncDbToDest(destIp, destPort, "qa", "qatest", dbName, namespace); databaseSyncService.syncDbToDest(destIp, destPort, "qa", "qatest", dbName, namespace, businessData);
} else { } else {
log.info("开始同步{}库下{}表", dbName, tableName); log.info("开始同步{}库下{}表到{}环境,保留业务数据:{}", dbName, tableName, namespace, businessData);
databaseSyncService.getSingleTableFromSource(ip, port, username, password, dbName, tableName); databaseSyncService.getSingleTableFromSource(ip, port, username, password, dbName, tableName);
databaseSyncService.syncSingleTableToDest(destIp, destPort, "qa", "qatest", dbName, tableName); databaseSyncService.syncSingleTableToDest(destIp, destPort, "qa", "qatest", dbName, tableName, businessData);
} }
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
long elapsedTime = (endTime - startTime) / 1000; long elapsedTime = (endTime - startTime) / 1000;
...@@ -77,7 +89,7 @@ public class DbSyncController { ...@@ -77,7 +89,7 @@ public class DbSyncController {
*/ */
@GetMapping("/tables") @GetMapping("/tables")
public JsonResult getTableList(@RequestParam String dbName) { public JsonResult getTableList(@RequestParam String dbName) {
List<Object> tableList = databaseSyncService.getTableListByDb(ip, port, username, password, dbName); List<String> tableList = databaseSyncService.getTableListByDb(ip, port, username, password, dbName);
tableList.add(0, "all"); tableList.add(0, "all");
return JsonResult.buildSuccessResult(tableList); return JsonResult.buildSuccessResult(tableList);
} }
......
package cn.qg.holmes.service.effect; package cn.qg.holmes.service.effect;
import java.util.List; import java.util.List;
import java.util.Map;
public interface DatabaseSyncService { public interface DatabaseSyncService {
boolean getDbInfoFromSource(String ip, String port, String username, String password, String dbName); boolean getDbInfoFromSource(String ip, String port, String username, String password, String dbName);
boolean syncDbToDest(String ip, String port, String username, String password, String dbName, String namespace); boolean syncDbToDest(String ip, String port, String username, String password, String dbName, String namespace, boolean businessData);
boolean getSingleTableFromSource(String ip, String port, String username, String password, String dbName, String tableName); boolean getSingleTableFromSource(String ip, String port, String username, String password, String dbName, String tableName);
boolean syncSingleTableToDest(String ip, String port, String username, String password, String dbName, String tableName); boolean syncSingleTableToDest(String ip, String port, String username, String password, String dbName, String tableName, boolean businessData);
List<Object> getDatabaseList(String ip, String port, String username, String password); List<String> getDatabaseList(String ip, String port, String username, String password);
Map<String, String> getMysqlInfoByNamespace(String namespace); List<String> getTableListByDb(String ip, String port, String username, String password, String dbName);
List<Object> getTableListByDb(String ip, String port, String username, String password, String dbName);
} }
\ No newline at end of file
package cn.qg.holmes.service.effect.impl; package cn.qg.holmes.service.effect.impl;
import cn.qg.holmes.service.effect.DatabaseSyncService; import cn.qg.holmes.service.effect.DatabaseSyncService;
import cn.qg.holmes.utils.HttpClientUtils;
import cn.qg.holmes.utils.RedisUtils; import cn.qg.holmes.utils.RedisUtils;
import com.alibaba.fastjson.JSON;
import com.jayway.jsonpath.JsonPath;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
...@@ -12,7 +9,9 @@ import org.springframework.stereotype.Service; ...@@ -12,7 +9,9 @@ import org.springframework.stereotype.Service;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.sql.*; import java.sql.*;
import java.util.*; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@Slf4j @Slf4j
@Service @Service
...@@ -21,9 +20,6 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -21,9 +20,6 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
@Autowired @Autowired
RedisUtils redisUtils; RedisUtils redisUtils;
@Value("${tke.host}")
private String tkeHost;
@Value("${cash_loan_flow.sql}") @Value("${cash_loan_flow.sql}")
private String cashLoanFlowSql; private String cashLoanFlowSql;
...@@ -35,6 +31,29 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -35,6 +31,29 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
private final String dbSyncPrefix = "dbsync:"; private final String dbSyncPrefix = "dbsync:";
/**
* 实现StringBuilder的replaceAll
*
* @param stb
* @param oldStr 被替换的字符串
* @param newStr 替换oldStr
* @return
*/
public StringBuilder replaceAll(StringBuilder stb, String oldStr, String newStr) {
if (stb == null || oldStr == null || newStr == null || stb.length() == 0 || oldStr.length() == 0)
return stb;
int index = stb.indexOf(oldStr);
if (index > -1 && !oldStr.equals(newStr)) {
int lastIndex = 0;
while (index > -1) {
stb.replace(index, index + oldStr.length(), newStr);
lastIndex = index + newStr.length();
index = stb.indexOf(oldStr, lastIndex);
}
}
return stb;
}
/** /**
* 从中间库获取数据库相关信息 * 从中间库获取数据库相关信息
* @param ip 中间库 ip * @param ip 中间库 ip
...@@ -56,11 +75,11 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -56,11 +75,11 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
// 创建数据库的redis值 // 创建数据库的redis值
String dbRedisValue = ""; String dbRedisValue = "";
// 表数据redis值 // 表数据redis值
String insertRedisValue = ""; StringBuilder insertRedisValue = new StringBuilder();
// 建表语句redis值 // 建表语句redis值
String createTableRedisValue = ""; String createTableRedisValue = "";
// 表名列表redis值 // 表名列表redis值
String tableListStr = ""; StringBuilder tableListStr = new StringBuilder();
List<String> columnNameList = new ArrayList<>(); List<String> columnNameList = new ArrayList<>();
List<String> columnTypeList = new ArrayList<>(); List<String> columnTypeList = new ArrayList<>();
...@@ -80,20 +99,21 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -80,20 +99,21 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
redisUtils.set(dbRedisKey, dbRedisValue, 86400); redisUtils.set(dbRedisKey, dbRedisValue, 86400);
} }
DatabaseMetaData databaseMetaData = connection.getMetaData(); DatabaseMetaData databaseMetaData = connection.getMetaData();
// 获取所有表名 // 获取所有表名
ResultSet tableResultSet = databaseMetaData.getTables(null, null, null, new String[]{"TABLE"}); ResultSet tableResultSet = databaseMetaData.getTables(null, null, null, new String[]{"TABLE"});
// 遍历表,一个表一个表处理
while (tableResultSet.next()) { while (tableResultSet.next()) {
String tableName = tableResultSet.getString("TABLE_NAME"); String tableName = tableResultSet.getString("TABLE_NAME");
tableListStr += tableName; tableListStr.append(tableName);
tableListStr += "\n"; tableListStr.append("\n");
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
log.info("开始获取表{}的数据", tableName); log.info("开始获取表{}的数据", tableName);
String createTableKey = dbSyncPrefix + dbName + ":" + tableName + ":create" ; String createTableKey = dbSyncPrefix + dbName + ":" + tableName + ":create" ;
String insertTableKey = dbSyncPrefix + dbName + ":" + tableName + ":insert" ; String insertTableKey = dbSyncPrefix + dbName + ":" + tableName + ":insert" ;
// 表结构
if (!redisUtils.hasKey(createTableKey)) { if (!redisUtils.hasKey(createTableKey)) {
// 获取所有建表语句 // 获取所有建表语句
String sql = String.format("SHOW CREATE TABLE %s", tableName); String sql = String.format("SHOW CREATE TABLE %s", tableName);
...@@ -106,6 +126,7 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -106,6 +126,7 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
createTableRedisValue = ""; createTableRedisValue = "";
} }
// 表数据
if (!redisUtils.hasKey(insertTableKey)) { if (!redisUtils.hasKey(insertTableKey)) {
ResultSet columnResultSet = databaseMetaData.getColumns(null, "%", tableName, "%"); ResultSet columnResultSet = databaseMetaData.getColumns(null, "%", tableName, "%");
while (columnResultSet.next()) { while (columnResultSet.next()) {
...@@ -117,34 +138,55 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -117,34 +138,55 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
columnNameList.add(columnName); columnNameList.add(columnName);
columnTypeList.add(columnType); columnTypeList.add(columnType);
} }
String columnArrayStr = null; // String columnArrayStr = null;
StringBuilder columnArrayStr = new StringBuilder();
for (String column : columnNameList) { for (String column : columnNameList) {
if (null == columnArrayStr) { if (columnArrayStr.length() == 0) {
columnArrayStr = "`" + column + "`"; columnArrayStr.append("`").append(column).append("`");
} else { } else {
columnArrayStr = columnArrayStr + "," + "`" + column + "`"; columnArrayStr.append(",`").append(columnArrayStr).append("`");
} }
// if (null == columnArrayStr) {
// columnArrayStr = "`" + column + "`";
// } else {
// columnArrayStr = columnArrayStr + "," + "`" + column + "`";
// }
} }
String selectSQL = String.format("select %s from %s", columnArrayStr, tableName); // String selectSQL = String.format("select %s from %s", columnArrayStr, tableName);
preparedStatement = connection.prepareStatement(selectSQL); preparedStatement = connection.prepareStatement("select " + columnArrayStr + " from " + tableName);
ResultSet selectResultSet = preparedStatement.executeQuery(); ResultSet selectResultSet = preparedStatement.executeQuery();
StringBuilder insertSql = new StringBuilder();
while (selectResultSet.next()) { while (selectResultSet.next()) {
String rowValues = getRowValues(selectResultSet, columnNameList.size(), columnTypeList); String rowValues = getRowValues(selectResultSet, columnNameList.size(), columnTypeList);
String insertSql = String.format("insert into %s (%s) values(%s);", tableName, columnArrayStr, rowValues); // String insertSql = String.format("insert into %s (%s) values(%s);", tableName, columnArrayStr, rowValues);
insertSql = insertSql.replaceAll("\n", "<br/>"); insertSql
insertSql = insertSql + "\n"; .append("insert into ")
insertRedisValue += insertSql; .append(tableName)
.append(" (")
.append(columnArrayStr)
.append(") ")
.append("values (")
.append(rowValues)
.append(");");
insertSql = replaceAll(insertSql, "\n", "<br/>");
// insertSql = insertSql.replaceAll("\n", "<br/>");
// insertSql = insertSql + "\n";
insertSql.append("\n");
insertRedisValue.append(insertSql);
// 清空insertSql
insertSql.delete(0, insertSql.length());
} }
columnNameList.clear(); columnNameList.clear();
columnTypeList.clear(); columnTypeList.clear();
redisUtils.set(insertTableKey, insertRedisValue, 86400); redisUtils.set(insertTableKey, insertRedisValue.toString(), 86400);
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
log.info("{}表数据获取完成,共花费{}秒", tableName, (endTime - startTime) / 1000); log.info("{}表数据获取完成,共花费{}秒", tableName, (endTime - startTime) / 1000);
insertRedisValue = ""; // 清空insertRedisValue
insertRedisValue.delete(0, insertRedisValue.length());
} }
} }
redisUtils.set(tableListKey, tableListStr, 86400); redisUtils.set(tableListKey, tableListStr.toString(), 86400);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
...@@ -172,7 +214,7 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -172,7 +214,7 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
* @return * @return
*/ */
@Override @Override
public boolean syncDbToDest(String ip, String port, String username, String password, String dbName, String namespace) { public boolean syncDbToDest(String ip, String port, String username, String password, String dbName, String namespace, boolean businessData) {
String driver = "com.mysql.jdbc.Driver"; String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://" + ip + ":" + port; String url = "jdbc:mysql://" + ip + ":" + port;
...@@ -185,20 +227,20 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -185,20 +227,20 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
try { try {
Class.forName(driver); Class.forName(driver);
connection = DriverManager.getConnection(url, username, password); // connection = DriverManager.getConnection(url, username, password);
// 删除原有数据库 // 删除原有数据库
String dropDbSql = String.format("DROP DATABASE IF EXISTS %s", dbName); // String dropDbSql = String.format("DROP DATABASE IF EXISTS %s", dbName);
log.info("删除原有数据库:{}", dropDbSql); // log.info("删除原有数据库:{}", dropDbSql);
preparedStatement = connection.prepareStatement(dropDbSql); // preparedStatement = connection.prepareStatement(dropDbSql);
preparedStatement.execute(); // preparedStatement.execute();
// 重新创建数据库 // 重新创建数据库
String createDbSql = redisUtils.get(dbRedisKey).toString(); // String createDbSql = redisUtils.get(dbRedisKey).toString();
log.info("重新创建数据库:{}", createDbSql); // log.info("重新创建数据库:{}", createDbSql);
preparedStatement = connection.prepareStatement(createDbSql); // preparedStatement = connection.prepareStatement(createDbSql);
preparedStatement.execute(); // preparedStatement.execute();
connection.close(); // connection.close();
// 重新设置连接,并指定数据库 // 重新设置连接,并指定数据库
newConnection = DriverManager.getConnection(url + "/" + dbName, username, password); newConnection = DriverManager.getConnection(url + "/" + dbName, username, password);
...@@ -210,24 +252,25 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -210,24 +252,25 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
for (String tableName: tableList) { for (String tableName: tableList) {
log.info("开始同步表:{}", tableName); log.info("开始同步表:{}", tableName);
long dataStartTime = System.currentTimeMillis(); long dataStartTime = System.currentTimeMillis();
String createTableKey = dbSyncPrefix + dbName + ":" + tableName + ":create" ; syncSingleTableToDest(ip, port, username, password, dbName, tableName, businessData);
String insertTableKey = dbSyncPrefix + dbName + ":" + tableName + ":insert" ; // String createTableKey = dbSyncPrefix + dbName + ":" + tableName + ":create" ;
String createTableValue = redisUtils.get(createTableKey).toString(); // String insertTableKey = dbSyncPrefix + dbName + ":" + tableName + ":insert" ;
String insertTableValue = redisUtils.get(insertTableKey).toString(); // String createTableValue = redisUtils.get(createTableKey).toString();
// String insertTableValue = redisUtils.get(insertTableKey).toString();
// 不为空时才执行建表语句 //
if (!createTableValue.isEmpty()) { // // 不为空时才执行建表语句
statement.execute(createTableValue); // if (!createTableValue.isEmpty()) {
} // statement.execute(createTableValue);
// }
// 不为空时才插入数据 //
if (!insertTableValue.isEmpty()) { // // 不为空时才插入数据
for (String insertSql: insertTableValue.split("\n")) { // if (!insertTableValue.isEmpty()) {
statement.addBatch(insertSql); // for (String insertSql: insertTableValue.split("\n")) {
} // statement.addBatch(insertSql);
statement.executeBatch(); // }
statement.clearBatch(); // statement.executeBatch();
} // statement.clearBatch();
// }
long dataEndTime = System.currentTimeMillis(); long dataEndTime = System.currentTimeMillis();
log.info("{}表同步完成,共花费{}秒", tableName, (dataEndTime - dataStartTime) / 1000); log.info("{}表同步完成,共花费{}秒", tableName, (dataEndTime - dataStartTime) / 1000);
} }
...@@ -262,15 +305,9 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -262,15 +305,9 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
try { try {
if (connection != null) {
connection.close();
}
if (newConnection != null) { if (newConnection != null) {
newConnection.close(); newConnection.close();
} }
if (preparedStatement != null) {
preparedStatement.close();
}
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
...@@ -278,14 +315,26 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -278,14 +315,26 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
return true; return true;
} }
/**
* 获取单个表
* @param ip 同步库ip
* @param port 同步库端口
* @param username 同步库用户名
* @param password 同步库密码
* @param dbName 数据库名
* @param tableName 表名
* @return
*/
@Override @Override
public boolean getSingleTableFromSource(String ip, String port, String username, String password, String dbName, String tableName) { public boolean getSingleTableFromSource(String ip, String port, String username, String password, String dbName, String tableName) {
String driver = "com.mysql.jdbc.Driver"; String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://" + ip + ":" + port + "/" + dbName; String url = "jdbc:mysql://" + ip + ":" + port + "/" + dbName;
String dbRedisKey = dbSyncPrefix + dbName;
String tableCreateKey = dbSyncPrefix + dbName + ":" + tableName + ":create"; String tableCreateKey = dbSyncPrefix + dbName + ":" + tableName + ":create";
String tableInsertKey = dbSyncPrefix + dbName + ":" + tableName + ":insert"; String tableInsertKey = dbSyncPrefix + dbName + ":" + tableName + ":insert";
String insertRedisValue = ""; String dbRedisValue = null;
StringBuilder insertRedisValue = new StringBuilder();
Connection connection = null; Connection connection = null;
PreparedStatement preparedStatement = null; PreparedStatement preparedStatement = null;
...@@ -298,6 +347,18 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -298,6 +347,18 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
connection.setAutoCommit(true); connection.setAutoCommit(true);
DatabaseMetaData databaseMetaData = connection.getMetaData(); DatabaseMetaData databaseMetaData = connection.getMetaData();
// 判断是否有创建数据库的redis值
if (!redisUtils.hasKey(dbRedisKey)) {
String dbSql = String.format("SHOW CREATE DATABASE %s", dbName);
preparedStatement = connection.prepareStatement(dbSql);
ResultSet dbResultSet = preparedStatement.executeQuery();
while (dbResultSet.next()) {
dbRedisValue = dbResultSet.getString(2);
}
// 设置建库语句的redis值
redisUtils.set(dbRedisKey, dbRedisValue, 86400);
}
if (!redisUtils.hasKey(tableCreateKey)) { if (!redisUtils.hasKey(tableCreateKey)) {
String sql = String.format("SHOW CREATE TABLE %s", tableName); String sql = String.format("SHOW CREATE TABLE %s", tableName);
preparedStatement = connection.prepareStatement(sql); preparedStatement = connection.prepareStatement(sql);
...@@ -308,6 +369,7 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -308,6 +369,7 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
} }
if (!redisUtils.hasKey(tableInsertKey)) { if (!redisUtils.hasKey(tableInsertKey)) {
log.info(("redis中没有{}表的数据,开始获取表数据。", tableName);
ResultSet columnResultSet = databaseMetaData.getColumns(null, "%", tableName, "%"); ResultSet columnResultSet = databaseMetaData.getColumns(null, "%", tableName, "%");
while (columnResultSet.next()) { while (columnResultSet.next()) {
// 字段名称 // 字段名称
...@@ -318,26 +380,46 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -318,26 +380,46 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
columnNameList.add(columnName); columnNameList.add(columnName);
columnTypeList.add(columnType);; columnTypeList.add(columnType);;
} }
String columnArrayStr = null; // String columnArrayStr = null;
StringBuilder columnArrayStr = new StringBuilder();
for (String column : columnNameList) { for (String column : columnNameList) {
if (null == columnArrayStr) { // if (null == columnArrayStr) {
columnArrayStr = "`" + column + "`"; // columnArrayStr = "`" + column + "`";
// } else {
// columnArrayStr = columnArrayStr + "," + "`" + column + "`";
// }
if (columnArrayStr.length() == 0) {
columnArrayStr.append("`").append(column).append("`");
} else { } else {
columnArrayStr = columnArrayStr + "," + "`" + column + "`"; columnArrayStr.append(",`").append(column).append("`");
} }
} }
String selectSQL = String.format("select %s from %s", columnArrayStr, tableName); String selectSQL = String.format("select %s from %s", columnArrayStr, tableName);
preparedStatement = connection.prepareStatement(selectSQL); preparedStatement = connection.prepareStatement(selectSQL);
ResultSet selectResultSet = preparedStatement.executeQuery(); ResultSet selectResultSet = preparedStatement.executeQuery();
StringBuilder insertSql = new StringBuilder();
while (selectResultSet.next()) { while (selectResultSet.next()) {
String rowValues = getRowValues(selectResultSet, columnNameList.size(), columnTypeList); String rowValues = getRowValues(selectResultSet, columnNameList.size(), columnTypeList);
String insertSql = String.format("insert into %s (%s) values(%s);", tableName, columnArrayStr, rowValues); // String insertSql = String.format("insert into %s (%s) values(%s);", tableName, columnArrayStr, rowValues);
insertSql = insertSql.replaceAll("\n", "<br/>"); // insertSql = insertSql.replaceAll("\n", "<br/>");
insertSql = insertSql + "\n"; // insertSql = insertSql + "\n";
insertRedisValue += insertSql; insertSql
} .append("insert into ")
redisUtils.set(tableInsertKey, insertRedisValue, 600); .append(tableName)
.append(" (")
.append(columnArrayStr)
.append(") ")
.append("values (")
.append(rowValues)
.append(");");
insertSql = replaceAll(insertSql, "\n", "<br/>");
insertSql.append("\n");
insertRedisValue.append(insertSql);
insertSql.delete(0, insertSql.length());
}
redisUtils.set(tableInsertKey, insertRedisValue.toString(), 600);
insertRedisValue.delete(0, insertRedisValue.length());
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
...@@ -357,11 +439,23 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -357,11 +439,23 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
return true; return true;
} }
/**
* 同步单张表
* @param ip
* @param port
* @param username
* @param password
* @param dbName
* @param tableName
* @param businessData 是否同步业务数据
* @return
*/
@Override @Override
public boolean syncSingleTableToDest(String ip, String port, String username, String password, String dbName, String tableName) { public boolean syncSingleTableToDest(String ip, String port, String username, String password, String dbName, String tableName, boolean businessData) {
String driver = "com.mysql.jdbc.Driver"; String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://" + ip + ":" + port + "/" + dbName; String url = "jdbc:mysql://" + ip + ":" + port + "/" + dbName;
String createDbKey = dbSyncPrefix + dbName;
String tableCreateKey = dbSyncPrefix + dbName + ":" + tableName + ":create"; String tableCreateKey = dbSyncPrefix + dbName + ":" + tableName + ":create";
String tableInsertKey = dbSyncPrefix + dbName + ":" + tableName + ":insert"; String tableInsertKey = dbSyncPrefix + dbName + ":" + tableName + ":insert";
...@@ -369,30 +463,170 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -369,30 +463,170 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
PreparedStatement preparedStatement = null; PreparedStatement preparedStatement = null;
Statement statement = null; Statement statement = null;
List<String> columnNameList = new ArrayList<>();
List<String> columnTypeList = new ArrayList<>();
try { try {
Class.forName(driver); Class.forName(driver);
connection = DriverManager.getConnection(url, username, password); connection = DriverManager.getConnection(url, username, password);
connection.setAutoCommit(false); connection.setAutoCommit(false);
DatabaseMetaData databaseMetaData = connection.getMetaData();
String createDbSql = redisUtils.get(createDbKey).toString();
String createTableSql = redisUtils.get(tableCreateKey).toString(); String createTableSql = redisUtils.get(tableCreateKey).toString();
String insertTableSql = redisUtils.get(tableInsertKey).toString(); String insertTableSql = redisUtils.get(tableInsertKey).toString();
// 暂存原有数据
StringBuilder backupDataSql = new StringBuilder();
List<String> dbList = getDatabaseList(ip, port, username, password);
// 仅同步表结构,并保留业务数据
if (businessData) {
log.info("本次同步{}.{}表,保留业务数据", dbName, tableName);
// 如果已有该数据库
if (dbList.contains(dbName)) {
List<String> tableList = getTableListByDb(ip, port, username, password, dbName);
// 如果数据库中已存在该表
if (tableList.contains(tableName)) {
log.info("因需要保留业务数据,因此先备份环境中{}.{}表的数据", dbName, tableName);
ResultSet columnResultSet = databaseMetaData.getColumns(null, "%", tableName, "%");
while (columnResultSet.next()) {
// 字段名称
String columnName = columnResultSet.getString("COLUMN_NAME");
// 数据类型
String columnType = columnResultSet.getString("TYPE_NAME");
columnNameList.add(columnName);
columnTypeList.add(columnType);;
}
StringBuilder columnArrayStr = new StringBuilder();
for (String column : columnNameList) {
if (columnArrayStr.length() == 0) {
columnArrayStr.append("`").append(column).append("`");
} else {
columnArrayStr.append(",`").append(columnArrayStr).append("`");
}
}
String selectSQL = String.format("select %s from %s", columnArrayStr, tableName);
preparedStatement = connection.prepareStatement(selectSQL);
ResultSet selectResultSet = preparedStatement.executeQuery();
StringBuilder insertSql = new StringBuilder();
while (selectResultSet.next()) {
String rowValues = getRowValues(selectResultSet, columnNameList.size(), columnTypeList);
insertSql
.append("insert into ")
.append(tableName)
.append(" (")
.append(columnArrayStr)
.append(") ")
.append("values (")
.append(rowValues)
.append(");");
insertSql = replaceAll(insertSql, "\n", "<br/>");
insertSql.append("\n");
backupDataSql.append(insertSql);
insertSql.delete(0, insertSql.length());
}
// 如果表存在,则首先删除表 // 如果表存在,则首先删除表
log.info("表数据备份完成,删除环境中的{}.{}表", dbName, tableName);
String dropTableSql = String.format("DROP TABLE if exists %s", tableName); String dropTableSql = String.format("DROP TABLE if exists %s", tableName);
preparedStatement = connection.prepareStatement(dropTableSql); preparedStatement = connection.prepareStatement(dropTableSql);
preparedStatement.execute(); preparedStatement.execute();
// 重新创建表 // 重新创建表
log.info("使用同步库中的表结构重新创建表: {}.{}", dbName, tableName);
preparedStatement = connection.prepareStatement(createTableSql); preparedStatement = connection.prepareStatement(createTableSql);
preparedStatement.execute(); preparedStatement.execute();
// 插入环境备份的数据
log.info("开始插入备份的数据!");
statement = connection.createStatement(); statement = connection.createStatement();
for (String insertSql: insertTableSql.split("\n")) { for (String sql: backupDataSql.toString().split("\n")) {
statement.addBatch(insertSql); statement.addBatch(sql);
} }
} else {
// 表不存在,直接创建表
log.info("表不存在,因此不需要备份环境中的数据,直接创建表:{}.{}", dbName, tableName);
preparedStatement = connection.prepareStatement(createTableSql);
preparedStatement.execute();
// 插入同步库里备份的数据
log.info("开始插入同步库中的数据.");
statement = connection.createStatement();
for (String sql: insertTableSql.split("\n")) {
statement.addBatch(sql);
}
}
} else {
// 没有数据库,首先创建数据库
log.info("环境中没有该数据库,因此不需要备份环境中的数据,直接创建数据库:{}", dbName);
preparedStatement = connection.prepareStatement(createDbSql);
preparedStatement.execute();
// 然后创建表
log.info("创建表: {}.{}", dbName, tableName);
preparedStatement = connection.prepareStatement(createTableSql);
preparedStatement.execute();
// 插入同步库里备份的数据
log.info("开始插入同步库中的数据.");
statement = connection.createStatement();
for (String sql: insertTableSql.split("\n")) {
statement.addBatch(sql);
}
}
} else {
// 不保留业务数据,使用同步库里的表结构和数据
// 如果数据库存在
log.info("本次同步{}.{}表,不保留业务数据", dbName, tableName);
if (dbList.contains(dbName)) {
// 如果表存在,则首先删除表
log.info("表存在,首先删除表: {}", tableName);
String dropTableSql = String.format("DROP TABLE if exists %s", tableName);
preparedStatement = connection.prepareStatement(dropTableSql);
preparedStatement.execute();
// 重新创建表
log.info("重新创建表: {}", tableName);
preparedStatement = connection.prepareStatement(createTableSql);
preparedStatement.execute();
log.info("插入同步库里的数据到表:{}.{}", dbName, tableName);
statement = connection.createStatement();
// 插入同步库里的数据
for (String sql: insertTableSql.split("\n")) {
statement.addBatch(sql);
}
} else {
// 没有数据库,首先创建数据库
log.info("数据库不存在,首先创建数据库: {}", dbName);
preparedStatement = connection.prepareStatement(createDbSql);
preparedStatement.execute();
// 然后创建表
log.info("表不存在,直接创建表: {}", tableName);
preparedStatement = connection.prepareStatement(createTableSql);
preparedStatement.execute();
// 插入同步库里备份的数据
log.info("插入同步库里的数据到表:{}.{}", dbName, tableName);
statement = connection.createStatement();
for (String sql: insertTableSql.split("\n")) {
statement.addBatch(sql);
}
}
}
if (statement != null) {
statement.executeBatch(); statement.executeBatch();
statement.clearBatch(); statement.clearBatch();
}
connection.commit(); connection.commit();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
return false; return false;
...@@ -423,17 +657,11 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -423,17 +657,11 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
* @return * @return
*/ */
@Override @Override
public List<Object> getDatabaseList(String ip, String port, String username, String password) { public List<String> getDatabaseList(String ip, String port, String username, String password) {
String driver = "com.mysql.jdbc.Driver"; String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://" + ip + ":" + port; String url = "jdbc:mysql://" + ip + ":" + port;
List<Object> databaseList = new ArrayList<>(); List<String> databaseList = new ArrayList<>();
String dbListRedisKey = dbSyncPrefix + ":db:list";
if (redisUtils.hasKey(dbListRedisKey)) {
databaseList = redisUtils.lGet(dbListRedisKey, 0, redisUtils.lGetListSize(dbListRedisKey));
log.info("从redis获取到的数据库列表为:{}", databaseList);
} else {
Connection connection = null; Connection connection = null;
try { try {
...@@ -448,9 +676,6 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -448,9 +676,6 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
databaseList.add(dbListResultSet.getString(1)); databaseList.add(dbListResultSet.getString(1));
} }
preparedStatement.close(); preparedStatement.close();
// 设置数据库列表缓存,缓存24个小时
log.info("从同步库获取到的数据库列表为:{}", databaseList);
redisUtils.lSet(dbListRedisKey, databaseList, 86400);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
...@@ -462,32 +687,9 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -462,32 +687,9 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
connection = null; connection = null;
} }
} }
}
return databaseList; return databaseList;
} }
/**
* 根据namespace获取对应的mysql ip和端口
* @param namespace
* @return
*/
@Override
public Map<String, String> getMysqlInfoByNamespace(String namespace) {
Map<String, String> headers = new HashMap<>();
Map<String, String> params = new HashMap<>();
headers.put("cluster", "qa");
params.put("namespace", namespace);
params.put("serviceName", "mysql");
params.put("type", "base");
String response = HttpClientUtils.doPostJson(tkeHost + "/service/details", headers, JSON.toJSONString(params));
String mysqlIp = JsonPath.read(response, "$.data.lanIp").toString();
String mysqlPort = JsonPath.read(response, "$.data.portMappings[0].nodePort").toString();
Map<String, String> result = new HashMap<>();
result.put("ip", mysqlIp);
result.put("port", mysqlPort);
return result;
}
/** /**
* 获取某个数据库下表列表 * 获取某个数据库下表列表
* @param ip mysql ip * @param ip mysql ip
...@@ -498,13 +700,13 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -498,13 +700,13 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
* @return * @return
*/ */
@Override @Override
public List<Object> getTableListByDb(String ip, String port, String username, String password, String dbName) { public List<String> getTableListByDb(String ip, String port, String username, String password, String dbName) {
String driver = "com.mysql.jdbc.Driver"; String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://" + ip + ":" + port + "/" + dbName; String url = "jdbc:mysql://" + ip + ":" + port + "/" + dbName;
Connection connection = null; Connection connection = null;
PreparedStatement preparedStatement = null; PreparedStatement preparedStatement = null;
List<Object> tableList = new ArrayList<>(); List<String> tableList = new ArrayList<>();
try { try {
Class.forName(driver); Class.forName(driver);
...@@ -545,7 +747,7 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -545,7 +747,7 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
*/ */
private String getRowValues(ResultSet rs, int size, List<String> columnTypeList) { private String getRowValues(ResultSet rs, int size, List<String> columnTypeList) {
try { try {
String rowValues = null; StringBuilder rowValues = new StringBuilder();
for (int i = 1; i <= size; i++) { for (int i = 1; i <= size; i++) {
String columnValue = null; String columnValue = null;
...@@ -563,14 +765,14 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -563,14 +765,14 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
columnValue = "'" + columnValue + "'"; columnValue = "'" + columnValue + "'";
} }
// 拼接字段值 // 拼接字段值
if (null == rowValues) { if (rowValues.length() == 0) {
rowValues = columnValue; rowValues.append(columnValue);
} else { } else {
rowValues = rowValues + "," + columnValue; rowValues.append(",").append(columnValue);
} }
} }
return rowValues; return rowValues.toString();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
System.out.println("获取表数据一行的所有值异常"); System.out.println("获取表数据一行的所有值异常");
......
...@@ -227,88 +227,33 @@ public class K8sService { ...@@ -227,88 +227,33 @@ public class K8sService {
* @param namespace 环境 * @param namespace 环境
* @return * @return
*/ */
public List<Map<String, Object>> getPodList(String namespace) { public Map<String, String> getMysqlAddressByNamespace(String namespace) {
List<Pod> podList = kubernetesClient.pods().inNamespace(namespace).list().getItems(); // 获取Service
List<Map<String, Object>> result = new ArrayList<>(); Service service = kubernetesClient.services().inNamespace(namespace).withName("mysql").get();
for (Pod pod : podList) { Map<String, String> labels = new HashMap<>();
System.out.println(pod); labels.put("qcloud-app", "mysql");
if (pod.getStatus().getPhase().equals("Running") || pod.getStatus().getPhase().equals("Pending")) { labels.put("type", "base");
Map<String, Object> map = new HashMap<>(); List<Pod> podList = kubernetesClient.pods().inNamespace(namespace).withLabels(labels).list().getItems();
// 端口映射 String port = null;
List<Map<String, Object>> portMappingList = new ArrayList<>(); String host = null;
ObjectMeta podMetadata = pod.getMetadata();
String serviceName = podMetadata.getLabels().get("qcloud-app");
// 端口暴露在Service
Service service = kubernetesClient.services().inNamespace(namespace).withName(serviceName).get();
if (service != null) { if (service != null) {
map.put("serviceType", service.getSpec().getType()); port = String.valueOf(service.getSpec().getPorts().get(0).getNodePort());
List<ServicePort> servicePortList = service.getSpec().getPorts();
if (servicePortList.size() > 0) {
for (ServicePort servicePort : servicePortList) {
if (servicePort.getNodePort() != null) {
map.put("port_" + servicePort.getName(), servicePort.getNodePort());
}
Map<String, Object> portMap = new HashMap<>();
portMap.put("name", servicePort.getName());
portMap.put("nodePort", servicePort.getNodePort());
portMap.put("port", servicePort.getPort());
portMap.put("protocol", servicePort.getProtocol());
portMap.put("targetPort", servicePort.getTargetPort());
portMappingList.add(portMap);
}
}
if (portMappingList.size() > 0) {
map.put("portMappings", portMappingList);
}
}
// 格式化创建时间
try {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
df.setTimeZone(TimeZone.getTimeZone("UTC"));
map.put("createdAt", df.parse(podMetadata.getCreationTimestamp()));
} catch (Exception e) {
e.printStackTrace();
log.info("时间转换异常!");
map.put("createdAt", podMetadata.getCreationTimestamp());
}
// 从Ingress里获取host
Ingress ingress = kubernetesClient.extensions().ingresses().inNamespace(namespace).withName(serviceName).get();
if (ingress != null) {
map.put("host", ingress.getSpec().getRules().get(0).getHost());
}
map.put("serviceName", serviceName);
map.put("podName", podMetadata.getName());
map.put("labels", podMetadata.getLabels());
map.put("image", pod.getStatus().getContainerStatuses().get(0).getImage());
map.put("imageId", pod.getStatus().getContainerStatuses().get(0).getImageID());
map.put("lanIp", pod.getStatus().getHostIP());
map.put("podIp", pod.getStatus().getPodIP());
map.put("startTime", pod.getStatus().getStartTime());
// 状态判断
List<ContainerStatus> containerStatuses = pod.getStatus().getContainerStatuses();
String status = "";
if (pod.getMetadata().getDeletionTimestamp() != null) {
status = "Terminating";
} else {
if (containerStatuses.size() == 0) {
status = "Pending";
} else {
if (containerStatuses.get(0).getState().getWaiting() != null) {
status = "ContainerCreating";
}
if (containerStatuses.get(0).getState().getRunning() != null && !containerStatuses.get(0).getReady()) {
status = "Waiting";
}
if (containerStatuses.get(0).getState().getRunning() != null && containerStatuses.get(0).getReady()) {
status = "Normal";
} }
if (podList.size() == 1) {
host = podList.get(0).getStatus().getHostIP();
} }
if (podList.size() >= 2) {
for (Pod pod: podList) {
if (pod.getStatus().getHostIP() != null) {
host = pod.getStatus().getHostIP();
break;
} }
map.put("status", status);
result.add(map);
} }
} }
return result; Map<String, String> resultMap = new HashMap<>();
resultMap.put("host", host);
resultMap.put("port", port);
return resultMap;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment