Commit ef9d7726 authored by 黎博's avatar 黎博

优化

parent eecab0cd
...@@ -10,7 +10,7 @@ import java.util.Map; ...@@ -10,7 +10,7 @@ import java.util.Map;
@CrossOrigin @CrossOrigin
@RestController @RestController
@RequestMapping("/db") @RequestMapping("/db/sync")
public class DbSyncController { public class DbSyncController {
@Autowired @Autowired
...@@ -28,7 +28,7 @@ public class DbSyncController { ...@@ -28,7 +28,7 @@ public class DbSyncController {
@Value("${dbsync.mysql.password}") @Value("${dbsync.mysql.password}")
private String password; private String password;
@GetMapping("/sync/one") @GetMapping("/db")
public JsonResult syncSingleDatabase(@RequestParam String namespace, @RequestParam String dbName) { public JsonResult syncSingleDatabase(@RequestParam String namespace, @RequestParam String dbName) {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
databaseSyncService.getDbInfoFromSource(ip, port, username, password, dbName); databaseSyncService.getDbInfoFromSource(ip, port, username, password, dbName);
...@@ -40,12 +40,35 @@ public class DbSyncController { ...@@ -40,12 +40,35 @@ public class DbSyncController {
return JsonResult.buildSuccessResult((endTime - startTime) / 1000); return JsonResult.buildSuccessResult((endTime - startTime) / 1000);
} }
@GetMapping("/table")
public JsonResult syncSingleTable(@RequestParam String namespace, @RequestParam String dbName, @RequestParam String tableName) {
long startTime = System.currentTimeMillis();
Map<String, String> map = databaseSyncService.getMysqlInfoByNamespace(namespace);
String destIp = map.get("ip");
String destPort = map.get("port");
databaseSyncService.getSingleTableFromSource(ip, port, username, password, dbName, tableName);
databaseSyncService.syncSingleTableToDest(destIp, destPort, "qa", "qatest", dbName, tableName);
long endTime = System.currentTimeMillis();
return JsonResult.buildSuccessResult((endTime - startTime) / 1000);
}
/** /**
* 获取数据库列表 * 获取数据库列表
* @return * @return
*/ */
@GetMapping("/sync/list") @GetMapping("/dblist")
public JsonResult getDatabaseList() { public JsonResult getDatabaseList() {
return JsonResult.buildSuccessResult(databaseSyncService.getDatabaseList(ip, port, username, password)); return JsonResult.buildSuccessResult(databaseSyncService.getDatabaseList(ip, port, username, password));
} }
/**
* 获取某个库下的表列表
* @param dbName
* @return
*/
@GetMapping("/tablelist")
public JsonResult getTableList(@RequestParam String dbName) {
return JsonResult.buildSuccessResult(databaseSyncService.getTableListByDb(ip, port, username, password, dbName));
}
} }
...@@ -9,7 +9,13 @@ public interface DatabaseSyncService { ...@@ -9,7 +9,13 @@ public interface DatabaseSyncService {
boolean syncDbToDest(String ip, String port, String username, String password, String dbName); boolean syncDbToDest(String ip, String port, String username, String password, String dbName);
boolean getSingleTableFromSource(String ip, String port, String username, String password, String dbName, String tableName);
boolean syncSingleTableToDest(String ip, String port, String username, String password, String dbName, String tableName);
List<Object> getDatabaseList(String ip, String port, String username, String password); List<Object> getDatabaseList(String ip, String port, String username, String password);
Map<String, String> getMysqlInfoByNamespace(String namespace); Map<String, String> getMysqlInfoByNamespace(String namespace);
List<Object> getTableListByDb(String ip, String port, String username, String password, String dbName);
} }
\ No newline at end of file
...@@ -27,6 +27,9 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -27,6 +27,9 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
@Value("${tke.host}") @Value("${tke.host}")
private String tkeHost; private String tkeHost;
@Value("${cash_loan_flow.sql}")
private String cashLoanFlowSql;
private final String dbSyncPrefix = "dbsync:"; private final String dbSyncPrefix = "dbsync:";
/** /**
...@@ -48,12 +51,13 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -48,12 +51,13 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
Connection connection = null; Connection connection = null;
PreparedStatement preparedStatement = null; PreparedStatement preparedStatement = null;
String dbRedisValue = ""; String dbRedisValue = "";
String insertRedisValue = ""; String insertRedisValue = "";
List<Object> createTableRedisValue = new ArrayList<>(); List<Object> createTableRedisValue = new ArrayList<>();
List<String> columnNameList = new ArrayList<>(); List<String> columnNameList = new ArrayList<>();
List<String> columnTypeList = new ArrayList<>(); List<String> columnTypeList = new ArrayList<>();
List<String> commentList = new ArrayList<>();
try { try {
Class.forName(driver); Class.forName(driver);
...@@ -67,7 +71,7 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -67,7 +71,7 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
dbRedisValue = dbResultSet.getString(2); dbRedisValue = dbResultSet.getString(2);
} }
// 设置建库语句的redis值 // 设置建库语句的redis值
redisUtils.set(dbRedisKey, dbRedisValue, 43200); redisUtils.set(dbRedisKey, dbRedisValue, 86400);
} }
if (!redisUtils.hasKey(createTableKey)) { if (!redisUtils.hasKey(createTableKey)) {
...@@ -82,7 +86,6 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -82,7 +86,6 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
preparedStatement = connection.prepareStatement(sql); preparedStatement = connection.prepareStatement(sql);
ResultSet pResultSet = preparedStatement.executeQuery(); ResultSet pResultSet = preparedStatement.executeQuery();
while (pResultSet.next()) { while (pResultSet.next()) {
log.info("获取建表语句:{}", pResultSet.getString(2));
createTableRedisValue.add(pResultSet.getString(2)); createTableRedisValue.add(pResultSet.getString(2));
} }
...@@ -93,12 +96,9 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -93,12 +96,9 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
String columnName = columnResultSet.getString("COLUMN_NAME"); String columnName = columnResultSet.getString("COLUMN_NAME");
// 数据类型 // 数据类型
String columnType = columnResultSet.getString("TYPE_NAME"); String columnType = columnResultSet.getString("TYPE_NAME");
// 备注
String remarks = columnResultSet.getString("REMARKS");
columnNameList.add(columnName); columnNameList.add(columnName);
columnTypeList.add(columnType); columnTypeList.add(columnType);
commentList.add(remarks);
} }
String columnArrayStr = null; String columnArrayStr = null;
for (String column : columnNameList) { for (String column : columnNameList) {
...@@ -121,11 +121,10 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -121,11 +121,10 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
} }
columnNameList.clear(); columnNameList.clear();
columnTypeList.clear(); columnTypeList.clear();
commentList.clear();
} }
} }
redisUtils.lSet(createTableKey, createTableRedisValue, 43200); redisUtils.lSet(createTableKey, createTableRedisValue, 86400);
redisUtils.set(insertTableKey, insertRedisValue, 43200); redisUtils.set(insertTableKey, insertRedisValue, 86400);
} }
} catch (Exception e) { } catch (Exception e) {
...@@ -142,7 +141,7 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -142,7 +141,7 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
e.printStackTrace(); e.printStackTrace();
} }
} }
return false; return true;
} }
@Override @Override
...@@ -176,24 +175,41 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -176,24 +175,41 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
// 重新设置连接,并指定数据库 // 重新设置连接,并指定数据库
newConnection = DriverManager.getConnection(url + "/" + dbName, username, password); newConnection = DriverManager.getConnection(url + "/" + dbName, username, password);
newConnection.setAutoCommit(true); newConnection.setAutoCommit(false);
// 从redis中获取要同步的表结构 // 从redis中获取要同步的表结构
List<Object> createTableRedisValue = redisUtils.lGet(createTableKey, 0, redisUtils.lGetListSize(createTableKey)); List<Object> createTableRedisValue = redisUtils.lGet(createTableKey, 0, redisUtils.lGetListSize(createTableKey));
Statement statement = newConnection.createStatement();
for (Object sql: createTableRedisValue) { for (Object sql: createTableRedisValue) {
log.info("开始同步表结构:\n {}", sql.toString()); // log.info("开始同步表结构:\n {}", sql.toString());
preparedStatement = newConnection.prepareStatement(sql.toString()); // preparedStatement = newConnection.prepareStatement(sql.toString());
preparedStatement.execute(); // preparedStatement.execute();
statement.addBatch(sql.toString());
} }
statement.executeBatch();
statement.clearBatch();
// 从redis中同步表数据 // 从redis中同步表数据
String insertTableRedisValue = redisUtils.get(insertTableKey).toString(); String insertTableRedisValue = redisUtils.get(insertTableKey).toString();
log.info("开始同步业务数据!"); log.info("开始同步数据!");
for (String insertSql: insertTableRedisValue.split("\n")) { for (String insertSql: insertTableRedisValue.split("\n")) {
log.info(insertSql); // log.info(insertSql);
preparedStatement = newConnection.prepareStatement(insertSql); // preparedStatement = newConnection.prepareStatement(insertSql);
preparedStatement.execute(); // preparedStatement.execute();
statement.addBatch(insertSql);
} }
statement.executeBatch();
statement.clearBatch();
log.info("表数据同步完成!");
// 判断是否需要update
if (dbName.equals("cash_loan_flow")) {
for (String sql: cashLoanFlowSql.split("\n")) {
statement.addBatch(sql);
}
statement.executeBatch();
statement.clearBatch();
}
newConnection.commit();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
...@@ -211,7 +227,142 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -211,7 +227,142 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
e.printStackTrace(); e.printStackTrace();
} }
} }
return false; return true;
}
@Override
public boolean getSingleTableFromSource(String ip, String port, String username, String password, String dbName, String tableName) {
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://" + ip + ":" + port + "/" + dbName;
String tableCreateKey = dbSyncPrefix + dbName + ":" + tableName + ":create";
String tableInsertKey = dbSyncPrefix + dbName + ":" + tableName + ":insert";
String insertRedisValue = "";
Connection connection = null;
PreparedStatement preparedStatement = null;
List<String> columnNameList = new ArrayList<>();
List<String> columnTypeList = new ArrayList<>();
try {
Class.forName(driver);
connection = DriverManager.getConnection(url, username, password);
connection.setAutoCommit(true);
DatabaseMetaData databaseMetaData = connection.getMetaData();
if (!redisUtils.hasKey(tableCreateKey)) {
String sql = String.format("SHOW CREATE TABLE %s", tableName);
preparedStatement = connection.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()) {
redisUtils.set(tableCreateKey, resultSet.getString(2));
}
}
if (!redisUtils.hasKey(tableInsertKey)) {
ResultSet columnResultSet = databaseMetaData.getColumns(null, "%", tableName, "%");
while (columnResultSet.next()) {
// 字段名称
String columnName = columnResultSet.getString("COLUMN_NAME");
// 数据类型
String columnType = columnResultSet.getString("TYPE_NAME");
columnNameList.add(columnName);
columnTypeList.add(columnType);;
}
String columnArrayStr = null;
for (String column : columnNameList) {
if (null == columnArrayStr) {
columnArrayStr = "`" + column + "`";
} else {
columnArrayStr = columnArrayStr + "," + "`" + column + "`";
}
}
String selectSQL = String.format("select %s from %s", columnArrayStr, tableName);
preparedStatement = connection.prepareStatement(selectSQL);
ResultSet selectResultSet = preparedStatement.executeQuery();
while (selectResultSet.next()) {
String rowValues = getRowValues(selectResultSet, columnNameList.size(), columnTypeList);
String insertSql = String.format("insert into %s (%s) values(%s);", tableName, columnArrayStr, rowValues);
insertSql = insertSql.replaceAll("\n", "<br/>");
insertSql = insertSql + "\n";
insertRedisValue += insertSql;
}
redisUtils.set(tableInsertKey, insertRedisValue);
}
} catch (Exception e) {
e.printStackTrace();
return false;
} finally {
try {
if (connection != null) {
connection.close();
}
if (preparedStatement != null) {
preparedStatement.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
return true;
}
@Override
public boolean syncSingleTableToDest(String ip, String port, String username, String password, String dbName, String tableName) {
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://" + ip + ":" + port + "/" + dbName;
String tableCreateKey = dbSyncPrefix + dbName + ":" + tableName + ":create";
String tableInsertKey = dbSyncPrefix + dbName + ":" + tableName + ":insert";
Connection connection = null;
PreparedStatement preparedStatement = null;
Statement statement = null;
try {
Class.forName(driver);
connection = DriverManager.getConnection(url, username, password);
connection.setAutoCommit(false);
String createTableSql = redisUtils.get(tableCreateKey).toString();
String insertTableSql = redisUtils.get(tableInsertKey).toString();
// 如果表存在,则首先删除表
String dropTableSql = String.format("DROP TABLE if exists %s", tableName);
preparedStatement = connection.prepareStatement(dropTableSql);
preparedStatement.execute();
// 重新创建表
preparedStatement = connection.prepareStatement(createTableSql);
preparedStatement.execute();
statement = connection.createStatement();
for (String insertSql: insertTableSql.split("\n")) {
statement.addBatch(insertSql);
}
statement.executeBatch();
connection.commit();
} catch (Exception e) {
e.printStackTrace();
return false;
} finally {
try {
if (connection != null) {
connection.close();
}
if (preparedStatement != null) {
preparedStatement.close();
}
if (statement != null) {
statement.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
return true;
} }
/** /**
...@@ -248,9 +399,9 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -248,9 +399,9 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
databaseList.add(dbListResultSet.getString(1)); databaseList.add(dbListResultSet.getString(1));
} }
preparedStatement.close(); preparedStatement.close();
// 设置数据库列表缓存,缓存6个小时 // 设置数据库列表缓存,缓存24个小时
log.info("从同步库获取到的数据库列表为:{}", databaseList); log.info("从同步库获取到的数据库列表为:{}", databaseList);
redisUtils.lSet(dbListRedisKey, databaseList, 21600); redisUtils.lSet(dbListRedisKey, databaseList, 86400);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
...@@ -289,6 +440,45 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -289,6 +440,45 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
return result; return result;
} }
@Override
public List<Object> getTableListByDb(String ip, String port, String username, String password, String dbName) {
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://" + ip + ":" + port + "/" + dbName;
Connection connection = null;
PreparedStatement preparedStatement = null;
List<Object> tableList = new ArrayList<>();
try {
Class.forName(driver);
connection = DriverManager.getConnection(url, username, password);
connection.setAutoCommit(false);
String tablesSql = "SHOW TABLES";
preparedStatement = connection.prepareStatement(tablesSql);
ResultSet tableListResultSet = preparedStatement.executeQuery();
while (tableListResultSet.next()) {
tableList.add(tableListResultSet.getString(1));
}
return tableList;
} catch (Exception e) {
e.printStackTrace();
return null;
} finally {
try {
if (preparedStatement != null) {
preparedStatement.close();
}
if (connection != null) {
connection.close();
}
} catch (Exception e) {
connection = null;
}
}
}
/** /**
* 获取表数据一行的所有值 * 获取表数据一行的所有值
* @param rs * @param rs
......
...@@ -45,11 +45,7 @@ public class K8sService { ...@@ -45,11 +45,7 @@ public class K8sService {
} }
public static void main(String[] args) throws IOException { public static void main(String[] args) throws IOException {
String configYAML = String.join("\n", readConfigFile("kube-config.yml")); K8sService k8sService = new K8sService();
Config config = Config.fromKubeconfig(configYAML);
KubernetesClient client = new DefaultKubernetesClient(config);
String configCrt = String.join("\n", readConfigFile("tke-cluster-ca.crt"));
config.setCaCertData(configCrt);
System.out.println(client.pods().inNamespace("fe").withName("mysql").get());
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment