Commit 0ba8f771 authored by 黎博's avatar 黎博

优化数据库同步脚本及k8s client初始化

parent 68533ec0
package cn.qg.holmes.config;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import java.io.IOException;
public class K8sConfig {
public static void main(String[] args) throws IOException {
Config config = Config.fromKubeconfig("classpath/KubeConfig.yml");
KubernetesClient kubernetesClient = new DefaultKubernetesClient(config);
System.out.println(kubernetesClient.pods().inNamespace("fe").list());
}
}
...@@ -6,6 +6,8 @@ import org.springframework.beans.factory.annotation.Autowired; ...@@ -6,6 +6,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import java.util.Map;
@CrossOrigin @CrossOrigin
@RestController @RestController
@RequestMapping("/db") @RequestMapping("/db")
...@@ -28,9 +30,14 @@ public class DbSyncController { ...@@ -28,9 +30,14 @@ public class DbSyncController {
@GetMapping("/sync/one") @GetMapping("/sync/one")
public JsonResult syncSingleDatabase(@RequestParam String namespace, @RequestParam String dbName) { public JsonResult syncSingleDatabase(@RequestParam String namespace, @RequestParam String dbName) {
long startTime = System.currentTimeMillis();
databaseSyncService.getDbInfoFromSource(ip, port, username, password, dbName); databaseSyncService.getDbInfoFromSource(ip, port, username, password, dbName);
databaseSyncService.syncDbToDest("172.17.5.24", "31024", "qa", "qatest", dbName); Map<String, String> map = databaseSyncService.getMysqlInfoByNamespace(namespace);
return JsonResult.buildSuccessResult(null); String destIp = map.get("ip");
String destPort = map.get("port");
databaseSyncService.syncDbToDest(destIp, destPort, "qa", "qatest", dbName);
long endTime = System.currentTimeMillis();
return JsonResult.buildSuccessResult((endTime - startTime) / 1000);
} }
/** /**
......
package cn.qg.holmes.service.effect; package cn.qg.holmes.service.effect;
import java.util.List; import java.util.List;
import java.util.Map;
public interface DatabaseSyncService { public interface DatabaseSyncService {
...@@ -9,4 +10,6 @@ public interface DatabaseSyncService { ...@@ -9,4 +10,6 @@ public interface DatabaseSyncService {
boolean syncDbToDest(String ip, String port, String username, String password, String dbName); boolean syncDbToDest(String ip, String port, String username, String password, String dbName);
List<Object> getDatabaseList(String ip, String port, String username, String password); List<Object> getDatabaseList(String ip, String port, String username, String password);
Map<String, String> getMysqlInfoByNamespace(String namespace);
} }
\ No newline at end of file
package cn.qg.holmes.service.effect.impl; package cn.qg.holmes.service.effect.impl;
import cn.qg.holmes.service.effect.DatabaseSyncService; import cn.qg.holmes.service.effect.DatabaseSyncService;
import cn.qg.holmes.utils.HttpClientUtils;
import cn.qg.holmes.utils.RedisUtils; import cn.qg.holmes.utils.RedisUtils;
import com.alibaba.fastjson.JSON;
import com.jayway.jsonpath.JsonPath;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.sql.*; import java.sql.*;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
@Slf4j @Slf4j
@Service @Service
...@@ -18,6 +24,9 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -18,6 +24,9 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
@Autowired @Autowired
RedisUtils redisUtils; RedisUtils redisUtils;
@Value("${tke.host}")
private String tkeHost;
private final String dbSyncPrefix = "dbsync:"; private final String dbSyncPrefix = "dbsync:";
/** /**
...@@ -77,7 +86,6 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -77,7 +86,6 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
createTableRedisValue.add(pResultSet.getString(2)); createTableRedisValue.add(pResultSet.getString(2));
} }
if (!redisUtils.hasKey(insertTableKey)) { if (!redisUtils.hasKey(insertTableKey)) {
ResultSet columnResultSet = databaseMetaData.getColumns(null, "%", tableName, "%"); ResultSet columnResultSet = databaseMetaData.getColumns(null, "%", tableName, "%");
while (columnResultSet.next()) { while (columnResultSet.next()) {
...@@ -173,12 +181,14 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -173,12 +181,14 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
// 从redis中获取要同步的表结构 // 从redis中获取要同步的表结构
List<Object> createTableRedisValue = redisUtils.lGet(createTableKey, 0, redisUtils.lGetListSize(createTableKey)); List<Object> createTableRedisValue = redisUtils.lGet(createTableKey, 0, redisUtils.lGetListSize(createTableKey));
for (Object sql: createTableRedisValue) { for (Object sql: createTableRedisValue) {
log.info("开始同步表结构:\n {}", sql.toString());
preparedStatement = newConnection.prepareStatement(sql.toString()); preparedStatement = newConnection.prepareStatement(sql.toString());
preparedStatement.execute(); preparedStatement.execute();
} }
// 从redis中同步表数据 // 从redis中同步表数据
String insertTableRedisValue = redisUtils.get(insertTableKey).toString(); String insertTableRedisValue = redisUtils.get(insertTableKey).toString();
log.info("开始同步业务数据!");
for (String insertSql: insertTableRedisValue.split("\n")) { for (String insertSql: insertTableRedisValue.split("\n")) {
log.info(insertSql); log.info(insertSql);
preparedStatement = newConnection.prepareStatement(insertSql); preparedStatement = newConnection.prepareStatement(insertSql);
...@@ -256,6 +266,29 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService { ...@@ -256,6 +266,29 @@ public class DatabaseSyncServiceImpl implements DatabaseSyncService {
return databaseList; return databaseList;
} }
/**
* 根据namespace获取对应的mysql ip和端口
* @param namespace
* @return
*/
@Override
public Map<String, String> getMysqlInfoByNamespace(String namespace) {
Map<String, String> headers = new HashMap<>();
Map<String, String> params = new HashMap<>();
headers.put("cluster", "qa");
params.put("namespace", namespace);
params.put("serviceName", "mysql");
params.put("type", "base");
String response = HttpClientUtils.doPostJson(tkeHost + "/service/details", headers, JSON.toJSONString(params));
log.info(response);
String mysqlIp = JsonPath.read(response, "$.data.lanIp").toString();
String mysqlPort = JsonPath.read(response, "$.data.portMappings[0].nodePort").toString();
Map<String, String> result = new HashMap<>();
result.put("ip", mysqlIp);
result.put("port", mysqlPort);
return result;
}
/** /**
* 获取表数据一行的所有值 * 获取表数据一行的所有值
* @param rs * @param rs
......
package cn.qg.holmes.utils;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Component
public class K8sService {
private KubernetesClient kubernetesClient;
public K8sService() {
try {
String configYAML = String.join("\n", readConfigFile("kube-config.yml"));
Config config = Config.fromKubeconfig(configYAML);
kubernetesClient = new DefaultKubernetesClient(config);
String configCrt = String.join("\n", readConfigFile("tke-cluster-ca.crt"));
config.setCaCertData(configCrt);
} catch (Exception e) {
log.info("k8s service 初始化失败!");
e.printStackTrace();
}
}
public static List<String> readConfigFile(String file) throws IOException {
String str = "";
ClassPathResource resource = new ClassPathResource(file);
InputStream in = resource.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(in));
List<String> result = new ArrayList<>();
while ((str = br.readLine()) != null) {
result.add(str);
}
return result;
}
public static void main(String[] args) throws IOException {
String configYAML = String.join("\n", readConfigFile("kube-config.yml"));
Config config = Config.fromKubeconfig(configYAML);
KubernetesClient client = new DefaultKubernetesClient(config);
String configCrt = String.join("\n", readConfigFile("tke-cluster-ca.crt"));
config.setCaCertData(configCrt);
System.out.println(client.pods().inNamespace("fe").withName("mysql").get());
}
}
-----BEGIN CERTIFICATE-----
MIIDZDCCAkygAwIBAgIIAiEo8goVaeYwDQYJKoZIhvcNAQELBQAwUDELMAkGA1UE
BhMCQ04xKjARBgNVBAoTCnRlbmNlbnR5dW4wFQYDVQQKEw5zeXN0ZW06bWFzdGVy
czEVMBMGA1UEAxMMY2xzLWFjZng0cHZqMB4XDTE5MDEwMzAzMDQ1MloXDTM5MDEw
MzAzMDQ1MlowUDELMAkGA1UEBhMCQ04xKjARBgNVBAoTCnRlbmNlbnR5dW4wFQYD
VQQKEw5zeXN0ZW06bWFzdGVyczEVMBMGA1UEAxMMY2xzLWFjZng0cHZqMIIBIjAN
BgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA5RurKHR0nF2P26weSMaj+mmymw+l
A/81r5IwAQYiz1H4QOr+TWbf7tgblYsNzbiJNjNNzVDbCdbvHYhxK6UczbC8bhqq
jA8comp+dlCe6mwTBJ74p3O+JWn4RNcD+xrxurzhOFDVkkTkEcj06WOnh2OCsEZC
ByW3ZPXCjudo05JkMHWhE/pDfIa2ab16dYxfvIhsv8XqgQ2ovODo8H9iPfZC/VIx
jhQkOzmRndeEx/ZZbY9o1oOml1r9wftWZIsY2Xm64K4kT87afTF/qivx8GW9ppDy
orYVXgUQJVDG2b2YwFbWXvPwK6YbSU9GruJeRqfvIKi+wKtqC0LgZYrLUwIDAQAB
o0IwQDAOBgNVHQ8BAf8EBAMCAoQwHQYDVR0lBBYwFAYIKwYBBQUHAwIGCCsGAQUF
BwMBMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAF/cc1tDfTHQ
x5rFjrPKyanq9bVof2JZ+2xbhzuok1SicbSptLbkHxT8zriUxsP3wclYnIQm7nW6
9TIzKXI9H+kvEl3PAvrSUUFqNK+KZPDdZPvsopVut3msvPydg4de883wA0Jd5uiu
3JWLAbmazHYOMmhD2EorsUiY6w5uI7/k2WzmH3zUFwntX4M21XGrkfnHQEaTt/ht
GrlQjPW8071pJoTiX15DsweMM/JdKLWqShuIuf7asjpU+IdmB57SH74Dty3Di58C
GVWAB13spmdjHlhCy9QdliC8dS5Fj/4HTHt2Ga0CDFKEY4NqwHe/+Hj6dV92krbG
gZ6FQkaHwVk=
-----END CERTIFICATE-----
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment