Commit 7adcecb9 authored by 黎博's avatar 黎博

k8s新增kafka支持

parent 13b829b5
......@@ -146,6 +146,9 @@ public class K8sController {
case "rabbitmq":
k8sService.createRabbitmqService(namespace);
break;
case "kafka":
k8sService.createKafkaService(namespace);
break;
default:
break;
}
......@@ -164,6 +167,9 @@ public class K8sController {
case "rabbitmq":
k8sService.createRabbitmqPvc(namespace);
break;
case "kafka":
k8sService.createKafkaPvc(namespace);
break;
default:
break;
}
......@@ -193,6 +199,9 @@ public class K8sController {
case "rabbitmq":
k8sService.createRabbitmqDeployment(namespace, image);
break;
case "kafka":
k8sService.createKafkaDeployment(namespace, image);
break;
default:
break;
}
......
......@@ -7,7 +7,9 @@ import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import io.fabric8.kubernetes.api.model.*;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
import io.fabric8.kubernetes.api.model.apps.DeploymentSpecBuilder;
import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
import io.fabric8.kubernetes.api.model.apps.DeploymentStrategy;
import io.fabric8.kubernetes.api.model.extensions.*;
......@@ -34,15 +36,12 @@ public class K8sService {
@Autowired
RedisUtils redisUtils;
@Autowired
DockerProjectService dockerProjectService;
@Value("${no.healthcheck.service}")
private String noHealthCheckService;
private KubernetesClient kubernetesClient;
@Autowired
DockerProjectService dockerProjectService;
public K8sService() {
try {
String configYAML = String.join("\n", readConfigFile("kube-config.yml"));
......@@ -59,6 +58,7 @@ public class K8sService {
/**
* 读取k8s配置文件
*
* @param file 配置文件名
* @return
* @throws IOException
......@@ -75,8 +75,22 @@ public class K8sService {
return result;
}
public static void main(String[] args) {
K8sService k8sService = new K8sService();
// k8sService.createNewNamespace("test6", "dev", "黎博");
// k8sService.podTest();
System.out.println(k8sService.getPodAndServiceInfo("vcc2", "java", "vcc-quota-service"));
// k8sService.getPodList("k8s");
// System.out.println(JSON.toJSONString(k8sService.getPodListNew("fe")));
// if (!k8sService.queryIfServiceExistByName("fe", "lxq-ui")) {
// k8sService.createUIAndNodeService("fe", "lxq-ui", "ui", "ui");
// }
}
/**
* 构造ObjectMeta对象
*
* @param namespace 环境
* @param serviceName 服务名称
* @param type 类型,java、ui、base等等
......@@ -95,6 +109,7 @@ public class K8sService {
/**
* 构建resources
*
* @param cpuRequestAmount cpu request大小
* @param cpuRequestFormat cpu request单位
* @param memRequestAmount 内存 request大小
......@@ -133,6 +148,7 @@ public class K8sService {
/**
* 构建imagePullSecrets
*
* @return
*/
public List<LocalObjectReference> buildImagePullSecrets() {
......@@ -166,6 +182,7 @@ public class K8sService {
/**
* 获取namespace列表
*
* @return
*/
public List<Map> getNamespaceList(String env) {
......@@ -197,7 +214,7 @@ public class K8sService {
host = podList.get(0).getStatus().getHostIP();
}
if (podList.size() >= 2) {
for (Pod pod: podList) {
for (Pod pod : podList) {
if (pod.getStatus().getHostIP() != null) {
host = pod.getStatus().getHostIP();
break;
......@@ -240,7 +257,7 @@ public class K8sService {
host = podList.get(0).getStatus().getHostIP();
}
if (podList.size() >= 2) {
for (Pod pod: podList) {
for (Pod pod : podList) {
if (pod.getStatus().getHostIP() != null) {
host = pod.getStatus().getHostIP();
break;
......@@ -276,6 +293,7 @@ public class K8sService {
/**
* 获取运行中的pod列表
*
* @param namespace 环境
* @return
*/
......@@ -295,7 +313,7 @@ public class K8sService {
host = podList.get(0).getStatus().getHostIP();
}
if (podList.size() >= 2) {
for (Pod pod: podList) {
for (Pod pod : podList) {
if (pod.getStatus().getHostIP() != null) {
host = pod.getStatus().getHostIP();
break;
......@@ -308,9 +326,9 @@ public class K8sService {
return resultMap;
}
/**
* 获取pod列表
*
* @param namespace
* @return
*/
......@@ -323,25 +341,28 @@ public class K8sService {
List<Ingress> ingressList = kubernetesClient.extensions().ingresses().inNamespace(namespace).list().getItems();
Map<String, Map<String, Object>> podServiceMap = new HashMap<>();
// 遍历Pod列表
for (Pod pod: podList) {
for (Pod pod : podList) {
if (!pod.getStatus().getPhase().equals("Failed")) {
String serviceName = pod.getMetadata().getLabels().get("qcloud-app");
podServiceMap.put(serviceName, formatPodInfo(pod));
}
}
// 遍历Service列表
for (Service service: serviceList) {
for (Service service : serviceList) {
String serviceName = service.getMetadata().getName();
if (StringUtils.equals(serviceName, "kafka-" + namespace)) {
serviceName = "kafka";
}
podServiceMap = assign(serviceName, podServiceMap, formatServiceInfo(service));
}
// 遍历Ingress列表
for (Ingress ingress: ingressList) {
for (Ingress ingress : ingressList) {
String serviceName = ingress.getMetadata().getName();
podServiceMap = assign(serviceName, podServiceMap, formatIngressInfo(ingress));
}
// 对象转列表
List<Map<String, Object>> resultList = new ArrayList<>();
for (Map<String, Object> map: podServiceMap.values()) {
for (Map<String, Object> map : podServiceMap.values()) {
if ((map != null) && (map.get("serviceName") != null)) {
resultList.add(map);
}
......@@ -357,7 +378,7 @@ public class K8sService {
public Map<String, Map<String, Object>> assign(String serviceName, Map<String, Map<String, Object>> podServiceMap, Map<String, Object> otherMap) {
Map<String, Object> podService = podServiceMap.get(serviceName);
for (Map.Entry<String, Object> entry: otherMap.entrySet()) {
for (Map.Entry<String, Object> entry : otherMap.entrySet()) {
if (podService != null) {
podService.put(entry.getKey(), entry.getValue());
}
......@@ -368,6 +389,7 @@ public class K8sService {
/**
* 处理pod相关信息
*
* @param pod
* @return
*/
......@@ -419,6 +441,7 @@ public class K8sService {
/**
* 格式化pod状态
*
* @param pod
* @return
*/
......@@ -447,6 +470,7 @@ public class K8sService {
/**
* 格式化k8s Service
*
* @param service
* @return
*/
......@@ -477,6 +501,7 @@ public class K8sService {
/**
* 格式化k8s Ingress
*
* @param ingress
* @return
*/
......@@ -492,6 +517,7 @@ public class K8sService {
/**
* 重置pod
*
* @param namespace 环境
* @param podName podName
* @return
......@@ -502,6 +528,7 @@ public class K8sService {
/**
* 删除一个pod
*
* @param namespace 环境
* @param serviceName 服务名
* @return
......@@ -520,6 +547,7 @@ public class K8sService {
/**
* 创建 Java Service
*
* @return
*/
public Service createJavaService(String namespace, String serviceName, Integer debug) {
......@@ -572,6 +600,7 @@ public class K8sService {
/**
* 部署Java Deployment
*
* @param serviceCreateVo
* @param dockerProject
* @return
......@@ -798,6 +827,7 @@ public class K8sService {
/**
* 创建UI或Node 服务
*
* @param namespace 环境
* @param serviceName 服务名
* @param serviceType 服务类型
......@@ -840,6 +870,7 @@ public class K8sService {
/**
* 部署UI或Node deployment
*
* @param serviceCreateVo
* @param dockerProject
* @return
......@@ -1219,6 +1250,7 @@ public class K8sService {
/**
* 创建redis哨兵模式的Deployment
*
* @param namespace
* @param image
* @return
......@@ -1352,6 +1384,7 @@ public class K8sService {
/**
* redis哨兵模式 Service
*
* @param namespace
* @return
*/
......@@ -1396,6 +1429,7 @@ public class K8sService {
/**
* 创建mysql service
*
* @param namespace 环境
* @return
*/
......@@ -1434,7 +1468,6 @@ public class K8sService {
return kubernetesClient.services().inNamespace(namespace).create(mysqlService);
}
/**
* 创建mysql pvc
*
......@@ -1645,6 +1678,7 @@ public class K8sService {
/**
* 创建mongodb pvc
*
* @param namespace
* @return
*/
......@@ -1680,6 +1714,7 @@ public class K8sService {
/**
* 创建mongodb deployment
*
* @param namespace
* @param image
* @return
......@@ -1805,6 +1840,7 @@ public class K8sService {
/**
* 创建zookeeper pvc
*
* @param namespace
* @return
*/
......@@ -1840,6 +1876,7 @@ public class K8sService {
/**
* 创建zookeeper Service
*
* @param namespace
* @return
*/
......@@ -1881,6 +1918,7 @@ public class K8sService {
/**
* 创建zookeeper deployment
*
* @param namespace
* @param image
* @return
......@@ -2009,6 +2047,7 @@ public class K8sService {
/**
* 创建rabbitmq pvc
*
* @param namespace
* @return
*/
......@@ -2045,6 +2084,7 @@ public class K8sService {
/**
* 创建rabbitmq service
*
* @param namespace
* @return
*/
......@@ -2088,6 +2128,7 @@ public class K8sService {
/**
* 创建rabbitmq deployment
*
* @param namespace
* @param image
* @return
......@@ -2184,8 +2225,190 @@ public class K8sService {
return kubernetesClient.apps().deployments().inNamespace(namespace).create(rabbitmqDeployment);
}
public Service createKafkaService(String namespace) {
Map<String, String> matchLabels = new HashMap<>();
matchLabels.put("type", "base");
matchLabels.put("qcloud-app", "kafka");
ObjectMeta objectMeta = new ObjectMetaBuilder()
.withName("kafka-" + namespace)
.withLabels(matchLabels)
.build();
Map<String, String> selectors = new HashMap<>();
selectors.put("qcloud-app", "kafka");
ServicePort servicePort = new ServicePortBuilder()
.withName("kafka-9092-9092")
.withPort(9092)
.withTargetPort(new IntOrStringBuilder().withIntVal(9092).build())
.withProtocol("TCP")
.build();
List<ServicePort> portList = new ArrayList<>();
portList.add(servicePort);
ServiceSpec serviceSpec = new ServiceSpecBuilder()
.withType("NodePort")
.withPorts(portList)
.withSelector(selectors)
.build();
Service kafkaService = new ServiceBuilder()
.withApiVersion("v1")
.withKind("Service")
.withMetadata(objectMeta)
.withSpec(serviceSpec)
.build();
return kubernetesClient.services().inNamespace(namespace).createOrReplace(kafkaService);
}
/**
* 创建kafka Deployment
* @param namespace
* @param image
* @return
*/
public Deployment createKafkaDeployment(String namespace, String image) {
ObjectMeta objectMeta = buildObjectMeta(namespace, "kafka", "base");
Map<String, String> matchLabels = new HashMap<>();
matchLabels.put("qcloud-app", "kafka");
matchLabels.put("type", "base");
Map<String, Object> zookeeperInfo = getPodAndServiceInfo(namespace, "base", "zookeeper");
String zookeeperConnect = zookeeperInfo.get("lanIp").toString() + ":" + zookeeperInfo.get("port_2181").toString() + "/kafka";
Service kafkaService;
// 默认值
String kafkaAdvertisedListeners = "PLAINTEXT://kafka-lab.lab.svc.cluster.local:9092";
int count = 10;
while (count > 0) {
try {
kafkaService = getServiceDetail(namespace, "kafka-" + namespace);
if (kafkaService != null) {
kafkaAdvertisedListeners = "PLAINTEXT://" + kafkaService.getSpec().getClusterIP() + ":" + String.valueOf(kafkaService.getSpec().getPorts().get(0).getNodePort());
break;
}
Thread.currentThread().sleep(1000);
} catch (Exception e) {
e.printStackTrace();
break;
}
count -= 1;
}
EnvVar envVar1 = new EnvVarBuilder().withName("KAFKA_ZOOKEEPER_CONNECT").withValue(zookeeperConnect).build();
EnvVar envVar2 = new EnvVarBuilder().withName("KAFKA_BROKER_ID").withValue("1").build();
EnvVar envVar3 = new EnvVarBuilder().withName("KAFKA_ADVERTISED_LISTENERS").withValue(kafkaAdvertisedListeners).build();
EnvVar envVar4 = new EnvVarBuilder().withName("KAFKA_LISTENERS").withValue("PLAINTEXT://0.0.0.0:9092").build();
EnvVar envVar5 = new EnvVarBuilder().withName("KAFKA_ADVERTISED_PORT").withValue("30901").build();
EnvVar envVar6 = new EnvVarBuilder().withName("KAFKA_ADVERTISED_HOST_NAME").withValueFrom(
new EnvVarSourceBuilder()
.withFieldRef(
new ObjectFieldSelectorBuilder()
.withFieldPath("status.hostIP")
.build())
.build())
.build();
List<EnvVar> envVarList = new ArrayList<>();
envVarList.addAll(Arrays.asList(envVar1, envVar2, envVar3, envVar4, envVar5, envVar6));
Container container = new ContainerBuilder()
.withName("kafka")
.withImage("ccr.ccs.tencentyun.com/" + image)
.withImagePullPolicy("IfNotPresent")
.withPorts(
new ContainerPortBuilder()
.withContainerPort(9092)
.build())
.withEnv(envVarList)
.withVolumeMounts(
new VolumeMountBuilder()
.withName("datadir")
.withMountPath("/var/lib/kafka")
.build())
.build();
List<Container> containerList = new ArrayList<>();
containerList.add(container);
List<Volume> volumeList = new ArrayList<>();
Volume volume = new VolumeBuilder()
.withName("datadir")
.withPersistentVolumeClaim(
new PersistentVolumeClaimVolumeSourceBuilder()
.withClaimName("kafka-" + namespace)
.build())
.build();
volumeList.add(volume);
PodSpec podSpec = new PodSpecBuilder()
.withContainers(containerList)
.withVolumes(volumeList)
.build();
PodTemplateSpec podTemplateSpec = new PodTemplateSpecBuilder()
.withMetadata(new ObjectMetaBuilder().withLabels(matchLabels).build())
.withSpec(podSpec)
.build();
DeploymentSpec deploymentSpec = new DeploymentSpecBuilder()
.withReplicas(1)
.withSelector(new LabelSelectorBuilder().withMatchLabels(matchLabels).build())
.withTemplate(podTemplateSpec)
.build();
Deployment kafkaDeployment = new DeploymentBuilder()
.withApiVersion("apps/v1")
.withKind("Deployment")
.withMetadata(objectMeta)
.withSpec(deploymentSpec)
.build();
return kubernetesClient.apps().deployments().inNamespace(namespace).createOrReplace(kafkaDeployment);
}
/**
* 创建kafka pvc
* @param namespace
* @return
*/
public PersistentVolumeClaim createKafkaPvc(String namespace) {
ObjectMeta objectMeta = new ObjectMetaBuilder().withName("kafka-" + namespace).build();
List<String> accessModes = new ArrayList<>();
accessModes.add("ReadWriteOnce");
Map<String, Quantity> requests = new HashMap<>();
Quantity quantity = new Quantity();
quantity.setAmount("10");
quantity.setFormat("Gi");
requests.put("storage", quantity);
ResourceRequirements resourceRequirements = new ResourceRequirementsBuilder()
.withRequests(requests)
.build();
PersistentVolumeClaimSpec pvcSpec = new PersistentVolumeClaimSpecBuilder()
.withAccessModes(accessModes)
.withResources(resourceRequirements)
.build();
PersistentVolumeClaim persistentVolumeClaim = new PersistentVolumeClaimBuilder()
.withApiVersion("v1")
.withKind("PersistentVolumeClaim")
.withMetadata(objectMeta)
.withSpec(pvcSpec)
.build();
return kubernetesClient.persistentVolumeClaims().inNamespace(namespace).createOrReplace(persistentVolumeClaim);
}
/**
* 查询PersistentVolumeClaims是否存在
*
* @param namespace 环境
* @param pvcName pvc名称
* @return
......@@ -2196,6 +2419,7 @@ public class K8sService {
/**
* 查询Deployment是否存在
*
* @param namespace 环境
* @param deploymentName Deployment名称
* @return
......@@ -2206,13 +2430,19 @@ public class K8sService {
/**
* 查询Service是否存在
*
* @param namespace 环境
* @param serviceName Service名称
* @return
*/
public boolean queryIfServiceExistByName(String namespace, String serviceName) {
// kafka Service name不能直接叫kafka,需要特殊处理一下
if (StringUtils.equals(serviceName, "kafka")) {
serviceName = "kafka-" + namespace;
}
return kubernetesClient.services().inNamespace(namespace).withName(serviceName).get() != null;
}
/**
* 更新部署pod
*
......@@ -2230,6 +2460,7 @@ public class K8sService {
/**
* 创建Ingress,主要用来配置域名
*
* @param namespace 环境
* @param serviceName 服务名
* @param domain 域名
......@@ -2281,6 +2512,7 @@ public class K8sService {
/**
* 查询Ingress是否存在
*
* @param namespace 环境
* @param serviceName 服务名
* @return
......@@ -2291,6 +2523,7 @@ public class K8sService {
/**
* 获取Service详情
*
* @param namespace 环境
* @param serviceName service名称
* @return
......@@ -2306,7 +2539,7 @@ public class K8sService {
labelMap.put("qcloud-app", serviceName);
labelSelector.setMatchLabels(labelMap);
List<Pod> podList = kubernetesClient.pods().inNamespace(namespace).withLabelSelector(labelSelector).list().getItems();
for (Pod pod: podList) {
for (Pod pod : podList) {
// pod可能有多个,返回Running状态的pod
if (pod.getStatus().getPhase().equals("Running")) {
return pod;
......@@ -2317,6 +2550,7 @@ public class K8sService {
/**
* 获取pod和service的一些信息
*
* @param namespace
* @param serviceType
* @param serviceName
......@@ -2375,6 +2609,7 @@ public class K8sService {
/**
* 创建新的namespace
*
* @param name namespace名称
* @param desc dev/test
* @param owner
......@@ -2460,6 +2695,7 @@ public class K8sService {
/**
* 删除ingress
*
* @param namespace 环境
* @param serviceName 服务名
* @return
......@@ -2471,6 +2707,7 @@ public class K8sService {
/**
* 删除Service
*
* @param namespace 环境
* @param serviceName 服务名
* @return
......@@ -2482,6 +2719,7 @@ public class K8sService {
/**
* 获取Deployment
*
* @param namespace
* @param serviceName
* @return
......@@ -2492,6 +2730,7 @@ public class K8sService {
/**
* 获取环境的所有k8s变量
*
* @param namespace 环境
*/
public Map<String, Object> getListEnvVars(String namespace) {
......@@ -2499,7 +2738,7 @@ public class K8sService {
List<Pod> podList = kubernetesClient.pods().inNamespace(namespace).list().getItems();
List<Service> serviceList = kubernetesClient.services().inNamespace(namespace).list().getItems();
// 遍历Pod列表
for (Pod pod: podList) {
for (Pod pod : podList) {
if (!pod.getStatus().getPhase().equals("Failed")) {
String serviceName = pod.getMetadata().getLabels().get("qcloud-app");
serviceName = serviceName.replace("-", "_").toUpperCase();
......@@ -2511,7 +2750,7 @@ public class K8sService {
}
}
// 遍历Service列表
for (Service service: serviceList) {
for (Service service : serviceList) {
String serviceName = service.getMetadata().getName().toUpperCase().replaceAll("-", "_");
List<ServicePort> servicePortList = service.getSpec().getPorts();
if (service.getMetadata().getLabels().get("type").equals("base")) {
......@@ -2522,7 +2761,7 @@ public class K8sService {
envMap.put("DB_SERVICE_PORT", servicePortList.get(0).getNodePort());
}
}
for (ServicePort servicePort: servicePortList) {
for (ServicePort servicePort : servicePortList) {
envMap.put(serviceName + "_SERVICE_PORT_" + servicePort.getPort(), servicePort.getNodePort());
// MYSQL需要额外加个参数
if (serviceName.equals("MYSQL")) {
......@@ -2533,7 +2772,7 @@ public class K8sService {
if (servicePortList.get(0).getNodePort() != null) {
envMap.put(serviceName + "_SERVICE_PORT", servicePortList.get(0).getNodePort());
}
for (ServicePort servicePort: servicePortList) {
for (ServicePort servicePort : servicePortList) {
if (service.getSpec().getType().equals("NodePort")) {
envMap.put(serviceName + "_SERVICE_PORT_" + servicePort.getPort(), servicePort.getNodePort());
} else if (service.getSpec().getType().equals("ClusterIP")) {
......@@ -2576,7 +2815,7 @@ public class K8sService {
labels.put("type", "ui");
List<Pod> podList = kubernetesClient.pods().inNamespace("fe").withLabels(labels).list().getItems();
String host = null;
for (Pod pod: podList) {
for (Pod pod : podList) {
if (pod.getStatus().getHostIP() != null) {
host = pod.getStatus().getHostIP();
break;
......@@ -2585,17 +2824,4 @@ public class K8sService {
System.out.println(podList.size());
System.out.println(host);
}
public static void main(String[] args) {
K8sService k8sService = new K8sService();
// k8sService.createNewNamespace("test6", "dev", "黎博");
// k8sService.podTest();
System.out.println(k8sService.getPodAndServiceInfo("vcc2", "java", "vcc-quota-service"));
// k8sService.getPodList("k8s");
// System.out.println(JSON.toJSONString(k8sService.getPodListNew("fe")));
// if (!k8sService.queryIfServiceExistByName("fe", "lxq-ui")) {
// k8sService.createUIAndNodeService("fe", "lxq-ui", "ui", "ui");
// }
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment