1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
func (nc *NodeController) nodeObjectSyncsAman() {
// 1.get nodes with machine.type=aman label selector
selector := labels.SelectorFromSet(labels.Set{"machine.type": "aman"})
nodeList, err := nc.nodesLister.List(selector)
if err != nil {
klog.Errorf("Failed to list aman nodes, error: %s", err.Error())
return
}
if len(nodeList) == 0 {
return
}
// 2.initializes node status map
nodes := make(map[string]string)
nodeStatus := make(map[string]corev1.ConditionStatus)
for _, node := range nodeList {
nodes[getNodeIp(node)] = node.Name
nodeStatus[node.Name] = corev1.ConditionFalse
}
klog.Infof("Local nodes: %+v", nodes)
klog.Infof("Local node status: %+v", nodeStatus)
// 3.get node status from another k8s cluster
remoteNodes, err := nc.remoteKubeClientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
klog.Errorf("Failed to list nodes from remote cluster: %v", err)
return
}
klog.Infof("Remote nodes count: %d", len(remoteNodes.Items))
for _, node := range remoteNodes.Items {
klog.Infof("Remote node: %s, IP: %s", node.Name, getNodeIp(&node))
}
// 遍历远程集群的节点,更新节点状态
for _, remoteNode := range remoteNodes.Items {
remoteNodeIP := getNodeIp(&remoteNode)
klog.Infof("Processing remote node: %s, IP: %s", remoteNode.Name, remoteNodeIP)
if localNodeName, exists := nodes[remoteNodeIP]; exists {
klog.Infof("Found matching local node: %s for remote node: %s", localNodeName, remoteNode.Name)
// 检查远程节点的状态
for _, condition := range remoteNode.Status.Conditions {
if condition.Type == corev1.NodeReady {
klog.Infof("Remote node %s Ready condition: %v", remoteNode.Name, condition.Status)
nodeStatus[localNodeName] = condition.Status
break
}
}
} else {
klog.Warningf("No matching local node found for remote node IP: %s", remoteNodeIP)
}
}
klog.Infof("Local node status after update: %+v", nodeStatus)
// 6.update node status
for nodeName, status := range nodeStatus {
// get node by client-go informer
node, err := nc.nodesLister.Get(nodeName)
if err != nil {
klog.Errorf("Failed to get node(%s), error: %s", nodeName, err.Error())
continue
}
// update node status condition
for i, condition := range node.Status.Conditions {
if condition.Type == corev1.NodeReady {
currentTime := metav1.Now()
if status == corev1.ConditionTrue {
node.Status.Conditions[i].Reason = "EdgeReady"
node.Status.Conditions[i].Message = "edge is posting ready status"
} else {
node.Status.Conditions[i].Reason = "NodeStatusUnknown"
node.Status.Conditions[i].Message = "Kubelet stopped posting node status."
}
node.Status.Conditions[i].Status = status
node.Status.Conditions[i].LastHeartbeatTime = currentTime
node.Status.Conditions[i].LastTransitionTime = currentTime
break
}
}
// update node status by client-go
_, err = nc.kubeClientset.CoreV1().Nodes().UpdateStatus(context.TODO(), node, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update node(%s) status, error: %s", nodeName, err.Error())
continue
}
}
}
|