-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathnode_controller.go
More file actions
176 lines (158 loc) · 5.77 KB
/
node_controller.go
File metadata and controls
176 lines (158 loc) · 5.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
// SPDX-FileCopyrightText: 2020 SAP SE or an SAP affiliate company
// SPDX-License-Identifier: Apache-2.0
package controllers
import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"
"github.com/elastic/go-ucfg"
"github.com/go-logr/logr"
"github.com/sapcc/ucfgwrap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/events"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/sapcc/maintenance-controller/cache"
"github.com/sapcc/maintenance-controller/constants"
"github.com/sapcc/maintenance-controller/state"
)
// NodeReconciler reconciles a Node object.
type NodeReconciler struct {
client.Client
Clientset *kubernetes.Clientset
Log logr.Logger
Scheme *runtime.Scheme
Recorder events.EventRecorder
NodeInfoCache cache.NodeInfoCache
}
type reconcileParameters struct {
client client.Client
clientset kubernetes.Interface
config *Config
log logr.Logger
recorder events.EventRecorder
node *corev1.Node
nodeInfoCache cache.NodeInfoCache
}
// +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=nodes/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=core,resources=events,verbs=create;update;patch
// +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;list;watch;create;update;patch;delete
// Reconcile reconciles the given request.
func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// load the configuration
conf, err := ucfgwrap.FromYAMLFile(constants.MaintenanceConfigFilePath, ucfg.VarExp, ucfg.ResolveEnv)
if err != nil {
r.Log.Error(err, "Failed to parse configuration file (syntax error)")
// the controller is misconfigured, no need to requeue before the configuration is fixed
return ctrl.Result{}, nil
}
config, err := LoadConfig(&conf)
if err != nil {
r.Log.Error(err, "Failed to parse configuration file (semantic error)")
// the controller is misconfigured, no need to requeue before the configuration is fixed
return ctrl.Result{}, nil
}
// fetch the current node from the api server
var theNode corev1.Node
err = r.Get(ctx, req.NamespacedName, &theNode)
if errors.IsNotFound(err) {
r.NodeInfoCache.Delete(req.Name)
r.Log.Info("Could not find node on the API server, maybe it has been deleted?", "node", req.NamespacedName)
return ctrl.Result{}, nil
} else if err != nil {
r.Log.Error(err, "Failed to retrieve node information from the API server", "node", req.NamespacedName)
return ctrl.Result{RequeueAfter: config.RequeueInterval}, nil
}
unmodifiedNode := theNode.DeepCopy()
// perform the reconciliation
err = reconcileInternal(ctx, r.makeParams(config, &theNode))
if err != nil {
r.Log.Error(err, "Failed to reconcile. Skipping node patching.", "node", req.NamespacedName)
return ctrl.Result{RequeueAfter: config.RequeueInterval}, nil
}
// if the controller did not change anything, there is no need to patch
if equality.Semantic.DeepEqual(&theNode, unmodifiedNode) {
return ctrl.Result{RequeueAfter: config.RequeueInterval}, nil
}
// patch node
err = r.Patch(ctx, &theNode, client.MergeFrom(unmodifiedNode))
if err != nil {
r.Log.Error(err, "Failed to patch node on the API server", "node", req.NamespacedName)
}
// await cache update
err = pollCacheUpdate(ctx, r.Client, types.NamespacedName{
Name: theNode.Name,
Namespace: theNode.Namespace,
}, theNode.ResourceVersion)
if err != nil {
r.Log.Error(err, "Failed to poll for cache update")
}
return ctrl.Result{RequeueAfter: config.RequeueInterval}, nil
}
func (r *NodeReconciler) makeParams(config *Config, node *corev1.Node) reconcileParameters {
return reconcileParameters{
client: r.Client,
clientset: r.Clientset,
config: config,
log: r.Log.WithValues("node", types.NamespacedName{Name: node.Name, Namespace: node.Namespace}),
node: node,
recorder: r.Recorder,
nodeInfoCache: r.NodeInfoCache,
}
}
// Ensures a new version of the specified resources arrives in the cache made by controller-runtime.
func pollCacheUpdate(ctx context.Context, k8sClient client.Client, ref types.NamespacedName, targetVersion string) error {
return wait.PollImmediate(20*time.Millisecond, 1*time.Second, func() (bool, error) { //nolint:staticcheck
var nextNode corev1.Node
if err := k8sClient.Get(ctx, ref, &nextNode); err != nil {
return false, err
}
nextVersion, err := strconv.Atoi(nextNode.ResourceVersion)
if err != nil {
return false, err
}
currentVersion, err := strconv.Atoi(targetVersion)
if err != nil {
return false, err
}
return nextVersion >= currentVersion, nil
})
}
func reconcileInternal(ctx context.Context, params reconcileParameters) error {
dataStr := params.node.Annotations[constants.DataAnnotationKey]
data, err := state.ParseData(dataStr)
if err != nil {
return err
}
err = HandleNode(ctx, params, &data)
if err != nil {
return err
}
return writeData(params.node, data)
}
func writeData(node *corev1.Node, data state.Data) error {
dataBytes, err := json.Marshal(&data)
if err != nil {
return fmt.Errorf("failed to marshal internal data: %w", err)
}
if node.Annotations == nil {
node.Annotations = make(map[string]string)
}
node.Annotations[constants.DataAnnotationKey] = string(dataBytes)
return nil
}
// SetupWithManager attaches the controller to the given manager.
func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Node{}).
Complete(r)
}