-
Notifications
You must be signed in to change notification settings - Fork 748
/
k8sutils.go
192 lines (172 loc) · 5.37 KB
/
k8sutils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
package k8sapi
import (
"context"
"fmt"
"os"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
eniconfigscheme "github.com/aws/amazon-vpc-cni-k8s/pkg/apis/crd/v1alpha1"
"github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger"
"github.com/aws/amazon-vpc-cni-k8s/utils"
rcscheme "github.com/aws/amazon-vpc-resource-controller-k8s/apis/vpcresources/v1alpha1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
cache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)
const (
awsNode = "aws-node"
)
var log = logger.Get()
// Get cache filters for IPAMD
func getIPAMDCacheFilters() map[client.Object]cache.ByObject {
if nodeName := os.Getenv("MY_NODE_NAME"); nodeName != "" {
return map[client.Object]cache.ByObject{
&corev1.Pod{}: {
Field: fields.Set{"spec.nodeName": nodeName}.AsSelector(),
}}
}
return nil
}
// Get cache filters for CNI Metrics Helper
func getMetricsHelperCacheFilters() map[client.Object]cache.ByObject {
return map[client.Object]cache.ByObject{
&corev1.Pod{}: {
Label: labels.Set(map[string]string{
"k8s-app": awsNode}).AsSelector(),
}}
}
// Create cache reader for Kubernetes client
func CreateKubeClientCache(restCfg *rest.Config, scheme *runtime.Scheme, filterMap map[client.Object]cache.ByObject) (cache.Cache, error) {
// Get HTTP client and REST mapper for cache
httpClient, err := rest.HTTPClientFor(restCfg)
if err != nil {
return nil, err
}
mapper, err := apiutil.NewDynamicRESTMapper(restCfg, httpClient)
if err != nil {
return nil, err
}
// Create a cache for the client to read from in order to decrease the number of API server calls.
cacheOptions := cache.Options{
ByObject: filterMap,
Mapper: mapper,
Scheme: scheme,
}
cache, err := cache.New(restCfg, cacheOptions)
if err != nil {
return nil, err
}
return cache, nil
}
func StartKubeClientCache(cache cache.Cache) {
stopChan := ctrl.SetupSignalHandler()
go func() {
cache.Start(stopChan)
}()
cache.WaitForCacheSync(stopChan)
}
// CreateKubeClient creates a k8s client
func CreateKubeClient(appName string) (client.Client, error) {
restCfg, err := getRestConfig()
if err != nil {
return nil, err
}
// The scheme should only contain GVKs that the client will access.
vpcCniScheme := runtime.NewScheme()
corev1.AddToScheme(vpcCniScheme)
eniconfigscheme.AddToScheme(vpcCniScheme)
rcscheme.AddToScheme(vpcCniScheme)
var filterMap map[client.Object]cache.ByObject
if appName == awsNode {
filterMap = getIPAMDCacheFilters()
} else {
filterMap = getMetricsHelperCacheFilters()
}
cacheReader, err := CreateKubeClientCache(restCfg, vpcCniScheme, filterMap)
if err != nil {
return nil, err
}
// Start cache and wait for initial sync
StartKubeClientCache(cacheReader)
// The cache will start a WATCH for all GVKs in the scheme.
k8sClient, err := client.New(restCfg, client.Options{
Cache: &client.CacheOptions{
Reader: cacheReader,
},
Scheme: vpcCniScheme,
})
if err != nil {
return nil, err
}
return k8sClient, nil
}
func GetKubeClientSet() (kubernetes.Interface, error) {
// creates the in-cluster config
config, err := getRestConfig()
if err != nil {
return nil, err
}
// creates the clientset
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
return clientSet, nil
}
func CheckAPIServerConnectivity() error {
restCfg, err := getRestConfig()
if err != nil {
return err
}
restCfg.Timeout = 5 * time.Second
clientSet, err := kubernetes.NewForConfig(restCfg)
if err != nil {
return fmt.Errorf("creating kube config, %w", err)
}
log.Infof("Testing communication with server")
// Reconcile the API server query after waiting for a second, as the request
// times out in one second if it fails to connect to the server
return wait.PollUntilContextCancel(context.Background(), 2*time.Second, true, func(ctx context.Context) (bool, error) {
version, err := clientSet.Discovery().ServerVersion()
if err != nil {
// When times out return no error, so the PollInfinite will retry with the given interval
if os.IsTimeout(err) {
log.Errorf("Unable to reach API Server, %v", err)
return false, nil
}
return false, fmt.Errorf("error communicating with apiserver: %v", err)
}
log.Infof("Successful communication with the Cluster! Cluster Version is: v%s.%s. git version: %s. git tree state: %s. commit: %s. platform: %s",
version.Major, version.Minor, version.GitVersion, version.GitTreeState, version.GitCommit, version.Platform)
return true, nil
})
}
func getRestConfig() (*rest.Config, error) {
restCfg, err := ctrl.GetConfig()
if err != nil {
return nil, err
}
restCfg.UserAgent = os.Args[0] + "-" + utils.GetEnv("VPC_CNI_VERSION", "")
if endpoint, ok := os.LookupEnv("CLUSTER_ENDPOINT"); ok {
restCfg.Host = endpoint
}
return restCfg, nil
}
func GetNode(ctx context.Context, k8sClient client.Client) (corev1.Node, error) {
log.Infof("Get Node Info for: %s", os.Getenv("MY_NODE_NAME"))
var node corev1.Node
err := k8sClient.Get(ctx, types.NamespacedName{Name: os.Getenv("MY_NODE_NAME")}, &node)
if err != nil {
log.Errorf("error retrieving node: %s", err)
return node, err
}
return node, nil
}