diff --git a/.gitignore b/.gitignore index b4f3e57c7d..71a11d7ea1 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ verify-network .idea/ *.iml .DS_Store +cni-metrics-helper/cni-metrics-helper portmap diff --git a/Makefile b/Makefile index 2d274b4afc..c3f10d89be 100644 --- a/Makefile +++ b/Makefile @@ -12,7 +12,7 @@ # language governing permissions and limitations under the License. # -.PHONY: build-linux clean docker docker-build lint unit-test vet download-portmap build-docker-test +.PHONY: all build-linux clean docker docker-build lint unit-test vet download-portmap build-docker-test build-metrics docker-metrics metrics-unit-test docker-metrics-test docker-vet IMAGE ?= amazon/amazon-k8s-cni VERSION ?= $(shell git describe --tags --always --dirty) @@ -42,8 +42,8 @@ build-linux: GOOS=linux GOARCH=$(ARCH) CGO_ENABLED=0 go build -o aws-k8s-agent -ldflags "$(LDFLAGS)" GOOS=linux GOARCH=$(ARCH) CGO_ENABLED=0 go build -o aws-cni -ldflags "$(LDFLAGS)" ./plugins/routed-eni/ -# Build docker image -docker: +# Build CNI Docker image +docker: @docker build --build-arg arch="$(ARCH)" -f scripts/dockerfiles/Dockerfile.release -t "$(IMAGE):$(VERSION)" . @echo "Built Docker image \"$(IMAGE):$(VERSION)\"" @@ -64,6 +64,27 @@ docker-unit-test: build-docker-test docker run -e GO111MODULE=on \ amazon-k8s-cni-test:latest make unit-test +# Build metrics +build-metrics: + GOOS=linux GOARCH=$(ARCH) CGO_ENABLED=0 go build -o cni-metrics-helper/cni-metrics-helper cni-metrics-helper/cni-metrics-helper.go + +# Build metrics Docker image +docker-metrics: + @docker build --build-arg arch="$(ARCH)" -f scripts/dockerfiles/Dockerfile.metrics -t "amazon/cni-metrics-helper:$(VERSION)" . + @echo "Built Docker image \"amazon/cni-metrics-helper:$(VERSION)\"" + +metrics-unit-test: + GOOS=linux CGO_ENABLED=1 go test -v -cover -race -timeout 10s ./cni-metrics-helper/metrics/... + +docker-metrics-test: + docker run -v $(shell pwd):/usr/src/app/src/github.com/aws/amazon-vpc-cni-k8s \ + --workdir=/usr/src/app/src/github.com/aws/amazon-vpc-cni-k8s \ + --env GOPATH=/usr/src/app \ + golang:1.10 make metrics-unit-test + +# Build both CNI and metrics helper +all: docker docker-metrics + # golint # To install: go get -u golang.org/x/lint/golint lint: @@ -86,4 +107,5 @@ docker-vet: build-docker-test clean: rm -f aws-k8s-agent rm -f aws-cni + rm -f cni-metrics-helper/cni-metrics-helper rm -f portmap diff --git a/cni-metrics-helper/cni-metrics-helper.go b/cni-metrics-helper/cni-metrics-helper.go new file mode 100644 index 0000000000..0ceb56f85b --- /dev/null +++ b/cni-metrics-helper/cni-metrics-helper.go @@ -0,0 +1,115 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package main + +import ( + "context" + "flag" + "fmt" + "os" + "strings" + "time" + + "github.com/golang/glog" + "github.com/spf13/pflag" + + "github.com/aws/amazon-vpc-cni-k8s/cni-metrics-helper/metrics" + "github.com/aws/amazon-vpc-cni-k8s/pkg/k8sapi" + "github.com/aws/amazon-vpc-cni-k8s/pkg/publisher" +) + +type options struct { + kubeconfig string + pullInterval int + pullCNI bool + submitCW bool + help bool +} + +func main() { + options := &options{} + flags := pflag.NewFlagSet("", pflag.ExitOnError) + // Add glog flags + flags.AddGoFlagSet(flag.CommandLine) + _ = flags.Lookup("logtostderr").Value.Set("true") + flags.Lookup("logtostderr").DefValue = "true" + flags.Lookup("logtostderr").NoOptDefVal = "true" + flags.BoolVar(&options.submitCW, "cloudwatch", true, "a bool") + + flags.Usage = func() { + _, _ = fmt.Fprintf(os.Stderr, "Usage of %s:\n", os.Args[0]) + flags.PrintDefaults() + } + + err := flags.Parse(os.Args) + if err != nil { + glog.Fatalf("Error on parsing parameters: %s", err) + } + + err = flag.CommandLine.Parse([]string{}) + if err != nil { + glog.Fatalf("Error on parsing commandline: %s", err) + } + + if options.help { + flags.Usage() + os.Exit(1) + } + + cwENV, found := os.LookupEnv("USE_CLOUDWATCH") + if found { + if strings.Compare(cwENV, "yes") == 0 { + options.submitCW = true + } + if strings.Compare(cwENV, "no") == 0 { + options.submitCW = false + } + } + + glog.Infof("Starting CNIMetricsHelper, cloudwatch: %v...", options.submitCW) + + kubeClient, err := k8sapi.CreateKubeClient() + if err != nil { + glog.Errorf("Failed to create client: %v", err) + os.Exit(1) + } + + discoverController := k8sapi.NewController(kubeClient) + go discoverController.DiscoverK8SPods() + + var cw publisher.Publisher + + if options.submitCW { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cw, err = publisher.New(ctx) + if err != nil { + glog.Errorf("Failed to create publisher: %v", err) + os.Exit(1) + } + go cw.Start() + defer cw.Stop() + } + + var cniMetric *metrics.CNIMetricsTarget + cniMetric = metrics.CNIMetricsNew(kubeClient, cw, discoverController, options.submitCW) + + // metric loop + var pullInterval = 30 // seconds + for range time.Tick(time.Duration(pullInterval) * time.Second) { + glog.Info("Collecting metrics ...") + metrics.Handler(cniMetric) + } +} diff --git a/cni-metrics-helper/metrics/cni_metrics.go b/cni-metrics-helper/metrics/cni_metrics.go new file mode 100644 index 0000000000..89396d8327 --- /dev/null +++ b/cni-metrics-helper/metrics/cni_metrics.go @@ -0,0 +1,167 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +// Package metrics handles the processing of all metrics. This file handles metrics for ipamd +package metrics + +import ( + "github.com/golang/glog" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + + "github.com/aws/amazon-vpc-cni-k8s/pkg/k8sapi" + "github.com/aws/amazon-vpc-cni-k8s/pkg/publisher" +) + +// Port where prometheus metrics are published. +const metricsPort = 61678 + +// InterestingCNIMetrics defines metrics parsing definition for kube-state-metrics +var InterestingCNIMetrics = map[string]metricsConvert{ + "awscni_assigned_ip_addresses": { + actions: []metricsAction{ + {cwMetricName: "assignIPAddresses", + matchFunc: matchAny, + actionFunc: metricsAdd, + data: &dataPoints{}}}}, + "awscni_total_ip_addresses": { + actions: []metricsAction{ + {cwMetricName: "totalIPAddresses", + matchFunc: matchAny, + actionFunc: metricsAdd, + data: &dataPoints{}}}}, + "awscni_eni_allocated": { + actions: []metricsAction{ + {cwMetricName: "eniAllocated", + matchFunc: matchAny, + actionFunc: metricsAdd, + data: &dataPoints{}}}}, + "awscni_eni_max": { + actions: []metricsAction{ + {cwMetricName: "eniMaxAvailable", + matchFunc: matchAny, + actionFunc: metricsAdd, + data: &dataPoints{}}}}, + "awscni_ip_max": { + actions: []metricsAction{ + {cwMetricName: "maxIPAddresses", + matchFunc: matchAny, + actionFunc: metricsAdd, + data: &dataPoints{}}}}, + "awscni_aws_api_latency_ms": { + actions: []metricsAction{ + {cwMetricName: "awsAPILatency", + matchFunc: matchAny, + actionFunc: metricsMax, + data: &dataPoints{}, + logToFile: true}}}, + "awscni_aws_api_error_count": { + actions: []metricsAction{ + {cwMetricName: "awsAPIErr", + matchFunc: matchAny, + actionFunc: metricsAdd, + data: &dataPoints{}, + logToFile: true}}}, + "awscni_aws_utils_error_count": { + actions: []metricsAction{ + {cwMetricName: "awsUtilErr", + matchFunc: matchAny, + actionFunc: metricsAdd, + data: &dataPoints{}, + logToFile: true}}}, + "awscni_ipamd_error_count": { + actions: []metricsAction{ + {cwMetricName: "ipamdErr", + matchFunc: matchAny, + actionFunc: metricsAdd, + data: &dataPoints{}, + logToFile: true}}}, + "awscni_ipamd_action_inprogress": { + actions: []metricsAction{ + {cwMetricName: "ipamdActionInProgress", + matchFunc: matchAny, + actionFunc: metricsAdd, + data: &dataPoints{}, + logToFile: true}}}, + "awscni_reconcile_count": { + actions: []metricsAction{ + {cwMetricName: "reconcileCount", + matchFunc: matchAny, + actionFunc: metricsAdd, + data: &dataPoints{}, + logToFile: true}}}, + "awscni_add_ip_req_count": { + actions: []metricsAction{ + {cwMetricName: "addReqCount", + matchFunc: matchAny, + actionFunc: metricsAdd, + data: &dataPoints{}, + logToFile: true}}}, + "awscni_del_ip_req_count": { + actions: []metricsAction{ + {cwMetricName: "delReqCount", + matchFunc: matchAny, + actionFunc: metricsAdd, + data: &dataPoints{}, + logToFile: true}}}, +} + +// CNIMetricsTarget defines data structure for kube-state-metric target +type CNIMetricsTarget struct { + interestingMetrics map[string]metricsConvert + cwMetricsPublisher publisher.Publisher + kubeClient clientset.Interface + cniPods []string + discoveryController *k8sapi.Controller + submitCW bool +} + +// CNIMetricsNew creates a new metricsTarget +func CNIMetricsNew(c clientset.Interface, cw publisher.Publisher, d *k8sapi.Controller, submitCW bool) *CNIMetricsTarget { + return &CNIMetricsTarget{ + interestingMetrics: InterestingCNIMetrics, + cwMetricsPublisher: cw, + kubeClient: c, + discoveryController: d, + submitCW: submitCW, + } +} + +func (t *CNIMetricsTarget) grabMetricsFromTarget(cniPod string) ([]byte, error) { + glog.Infof("Grabbing metrics from CNI: %s", cniPod) + output, err := getMetricsFromPod(t.kubeClient, cniPod, metav1.NamespaceSystem, metricsPort) + if err != nil { + glog.Errorf("grabMetricsFromTarget: Failed to grab CNI endpoint: %v", err) + return nil, err + } + + glog.V(5).Infof("cni-metrics text output: %s", string(output)) + return output, nil +} + +func (t *CNIMetricsTarget) getInterestingMetrics() map[string]metricsConvert { + return InterestingCNIMetrics +} + +func (t *CNIMetricsTarget) getCWMetricsPublisher() publisher.Publisher { + return t.cwMetricsPublisher +} + +func (t *CNIMetricsTarget) getTargetList() []string { + pods := t.discoveryController.GetCNIPods() + return pods +} + +func (t *CNIMetricsTarget) submitCloudWatch() bool { + return t.submitCW +} diff --git a/cni-metrics-helper/metrics/cni_test1.data b/cni-metrics-helper/metrics/cni_test1.data new file mode 100644 index 0000000000..e833b491bd --- /dev/null +++ b/cni-metrics-helper/metrics/cni_test1.data @@ -0,0 +1,188 @@ +# HELP awscni_add_ip_req_count The number of add IP address request +# TYPE awscni_add_ip_req_count counter +awscni_add_ip_req_count 100 +# HELP awscni_assigned_ip_addresses The number of IP addresses assigned to pods +# TYPE awscni_assigned_ip_addresses gauge +awscni_assigned_ip_addresses 1 +# HELP awscni_aws_api_error_count The number of times AWS API returns an error +# TYPE awscni_aws_api_error_count counter +awscni_aws_api_error_count{api="DeleteNetworkInterface",error="InvalidParameterValue"} 14 +# HELP awscni_aws_api_latency_ms AWS API call latency in ms +# TYPE awscni_aws_api_latency_ms summary +awscni_aws_api_latency_ms{api="AssignPrivateIpAddresses",error="false",quantile="0.5"} NaN +awscni_aws_api_latency_ms{api="AssignPrivateIpAddresses",error="false",quantile="0.9"} NaN +awscni_aws_api_latency_ms{api="AssignPrivateIpAddresses",error="false",quantile="0.99"} NaN +awscni_aws_api_latency_ms_sum{api="AssignPrivateIpAddresses",error="false"} 2938 +awscni_aws_api_latency_ms_count{api="AssignPrivateIpAddresses",error="false"} 10 +awscni_aws_api_latency_ms{api="AttachNetworkInterface",error="false",quantile="0.5"} NaN +awscni_aws_api_latency_ms{api="AttachNetworkInterface",error="false",quantile="0.9"} NaN +awscni_aws_api_latency_ms{api="AttachNetworkInterface",error="false",quantile="0.99"} NaN +awscni_aws_api_latency_ms_sum{api="AttachNetworkInterface",error="false"} 4377 +awscni_aws_api_latency_ms_count{api="AttachNetworkInterface",error="false"} 10 +awscni_aws_api_latency_ms{api="CreateNetworkInterface",error="false",quantile="0.5"} NaN +awscni_aws_api_latency_ms{api="CreateNetworkInterface",error="false",quantile="0.9"} NaN +awscni_aws_api_latency_ms{api="CreateNetworkInterface",error="false",quantile="0.99"} NaN +awscni_aws_api_latency_ms_sum{api="CreateNetworkInterface",error="false"} 1328 +awscni_aws_api_latency_ms_count{api="CreateNetworkInterface",error="false"} 10 +awscni_aws_api_latency_ms{api="CreateTags",error="false",quantile="0.5"} NaN +awscni_aws_api_latency_ms{api="CreateTags",error="false",quantile="0.9"} NaN +awscni_aws_api_latency_ms{api="CreateTags",error="false",quantile="0.99"} NaN +awscni_aws_api_latency_ms_sum{api="CreateTags",error="false"} 1123 +awscni_aws_api_latency_ms_count{api="CreateTags",error="false"} 10 +awscni_aws_api_latency_ms{api="DeleteNetworkInterface",error="false",quantile="0.5"} NaN +awscni_aws_api_latency_ms{api="DeleteNetworkInterface",error="false",quantile="0.9"} NaN +awscni_aws_api_latency_ms{api="DeleteNetworkInterface",error="false",quantile="0.99"} NaN +awscni_aws_api_latency_ms_sum{api="DeleteNetworkInterface",error="false"} 2364 +awscni_aws_api_latency_ms_count{api="DeleteNetworkInterface",error="false"} 9 +awscni_aws_api_latency_ms{api="DeleteNetworkInterface",error="true",quantile="0.5"} NaN +awscni_aws_api_latency_ms{api="DeleteNetworkInterface",error="true",quantile="0.9"} NaN +awscni_aws_api_latency_ms{api="DeleteNetworkInterface",error="true",quantile="0.99"} NaN +awscni_aws_api_latency_ms_sum{api="DeleteNetworkInterface",error="true"} 1806 +awscni_aws_api_latency_ms_count{api="DeleteNetworkInterface",error="true"} 14 +awscni_aws_api_latency_ms{api="DescribeInstances",error="false",quantile="0.5"} NaN +awscni_aws_api_latency_ms{api="DescribeInstances",error="false",quantile="0.9"} NaN +awscni_aws_api_latency_ms{api="DescribeInstances",error="false",quantile="0.99"} NaN +awscni_aws_api_latency_ms_sum{api="DescribeInstances",error="false"} 1330 +awscni_aws_api_latency_ms_count{api="DescribeInstances",error="false"} 10 +awscni_aws_api_latency_ms{api="DescribeNetworkInterfaces",error="false",quantile="0.5"} NaN +awscni_aws_api_latency_ms{api="DescribeNetworkInterfaces",error="false",quantile="0.9"} NaN +awscni_aws_api_latency_ms{api="DescribeNetworkInterfaces",error="false",quantile="0.99"} NaN +awscni_aws_api_latency_ms_sum{api="DescribeNetworkInterfaces",error="false"} 2360 +awscni_aws_api_latency_ms_count{api="DescribeNetworkInterfaces",error="false"} 20 +awscni_aws_api_latency_ms{api="DetachNetworkInterface",error="false",quantile="0.5"} NaN +awscni_aws_api_latency_ms{api="DetachNetworkInterface",error="false",quantile="0.9"} NaN +awscni_aws_api_latency_ms{api="DetachNetworkInterface",error="false",quantile="0.99"} NaN +awscni_aws_api_latency_ms_sum{api="DetachNetworkInterface",error="false"} 1828 +awscni_aws_api_latency_ms_count{api="DetachNetworkInterface",error="false"} 9 +awscni_aws_api_latency_ms{api="GetMetadata",error="false",quantile="0.5"} 0 +awscni_aws_api_latency_ms{api="GetMetadata",error="false",quantile="0.9"} 0 +awscni_aws_api_latency_ms{api="GetMetadata",error="false",quantile="0.99"} 1 +awscni_aws_api_latency_ms_sum{api="GetMetadata",error="false"} 4384 +awscni_aws_api_latency_ms_count{api="GetMetadata",error="false"} 82716 +awscni_aws_api_latency_ms{api="ModifyNetworkInterfaceAttribute",error="false",quantile="0.5"} NaN +awscni_aws_api_latency_ms{api="ModifyNetworkInterfaceAttribute",error="false",quantile="0.9"} NaN +awscni_aws_api_latency_ms{api="ModifyNetworkInterfaceAttribute",error="false",quantile="0.99"} NaN +awscni_aws_api_latency_ms_sum{api="ModifyNetworkInterfaceAttribute",error="false"} 1551 +awscni_aws_api_latency_ms_count{api="ModifyNetworkInterfaceAttribute",error="false"} 10 +# HELP awscni_del_ip_req_count The number of delete IP address request +# TYPE awscni_del_ip_req_count counter +awscni_del_ip_req_count{reason="PodDeleted"} 106 +awscni_del_ip_req_count{reason="SetupNSFailed"} 2 +# HELP awscni_eni_allocated The number of ENIs allocated +# TYPE awscni_eni_allocated gauge +awscni_eni_allocated 2 +# HELP awscni_eni_max The maximum number of ENIs that can be attached to the instance +# TYPE awscni_eni_max gauge +awscni_eni_max 3 +# HELP awscni_ip_max The maximum number of IP addresses that can be allocated to the instance +# TYPE awscni_ip_max gauge +awscni_ip_max 15 +# HELP awscni_ipamd_action_inprogress The number of ipamd actions in progress +# TYPE awscni_ipamd_action_inprogress gauge +awscni_ipamd_action_inprogress{fn="decreaseIPPool"} 0 +awscni_ipamd_action_inprogress{fn="increaseIPPool"} 0 +awscni_ipamd_action_inprogress{fn="nodeIPPoolReconcile"} 0 +awscni_ipamd_action_inprogress{fn="nodeInit"} 0 +awscni_ipamd_action_inprogress{fn="retryAllocENIIP"} 0 +# HELP awscni_total_ip_addresses The total number of IP addresses +# TYPE awscni_total_ip_addresses gauge +awscni_total_ip_addresses 10 +# HELP go_gc_duration_seconds A summary of the GC invocation durations. +# TYPE go_gc_duration_seconds summary +go_gc_duration_seconds{quantile="0"} 1.7901e-05 +go_gc_duration_seconds{quantile="0.25"} 3.2781e-05 +go_gc_duration_seconds{quantile="0.5"} 5.1354e-05 +go_gc_duration_seconds{quantile="0.75"} 0.00013115 +go_gc_duration_seconds{quantile="1"} 0.005550315 +go_gc_duration_seconds_sum 0.797514698 +go_gc_duration_seconds_count 9895 +# HELP go_goroutines Number of goroutines that currently exist. +# TYPE go_goroutines gauge +go_goroutines 25 +# HELP go_memstats_alloc_bytes Number of bytes allocated and still in use. +# TYPE go_memstats_alloc_bytes gauge +go_memstats_alloc_bytes 7.710456e+06 +# HELP go_memstats_alloc_bytes_total Total number of bytes allocated, even if freed. +# TYPE go_memstats_alloc_bytes_total counter +go_memstats_alloc_bytes_total 1.778966304e+10 +# HELP go_memstats_buck_hash_sys_bytes Number of bytes used by the profiling bucket hash table. +# TYPE go_memstats_buck_hash_sys_bytes gauge +go_memstats_buck_hash_sys_bytes 1.775926e+06 +# HELP go_memstats_frees_total Total number of frees. +# TYPE go_memstats_frees_total counter +go_memstats_frees_total 1.80537453e+08 +# HELP go_memstats_gc_sys_bytes Number of bytes used for garbage collection system metadata. +# TYPE go_memstats_gc_sys_bytes gauge +go_memstats_gc_sys_bytes 712704 +# HELP go_memstats_heap_alloc_bytes Number of heap bytes allocated and still in use. +# TYPE go_memstats_heap_alloc_bytes gauge +go_memstats_heap_alloc_bytes 7.710456e+06 +# HELP go_memstats_heap_idle_bytes Number of heap bytes waiting to be used. +# TYPE go_memstats_heap_idle_bytes gauge +go_memstats_heap_idle_bytes 4.620288e+06 +# HELP go_memstats_heap_inuse_bytes Number of heap bytes that are in use. +# TYPE go_memstats_heap_inuse_bytes gauge +go_memstats_heap_inuse_bytes 1.0387456e+07 +# HELP go_memstats_heap_objects Number of allocated objects. +# TYPE go_memstats_heap_objects gauge +go_memstats_heap_objects 43550 +# HELP go_memstats_heap_released_bytes_total Total number of heap bytes released to OS. +# TYPE go_memstats_heap_released_bytes_total counter +go_memstats_heap_released_bytes_total 1.073152e+06 +# HELP go_memstats_heap_sys_bytes Number of heap bytes obtained from system. +# TYPE go_memstats_heap_sys_bytes gauge +go_memstats_heap_sys_bytes 1.5007744e+07 +# HELP go_memstats_last_gc_time_seconds Number of seconds since 1970 of last garbage collection. +# TYPE go_memstats_last_gc_time_seconds gauge +go_memstats_last_gc_time_seconds 1.5562085013141422e+09 +# HELP go_memstats_lookups_total Total number of pointer lookups. +# TYPE go_memstats_lookups_total counter +go_memstats_lookups_total 226723 +# HELP go_memstats_mallocs_total Total number of mallocs. +# TYPE go_memstats_mallocs_total counter +go_memstats_mallocs_total 1.80581003e+08 +# HELP go_memstats_mcache_inuse_bytes Number of bytes in use by mcache structures. +# TYPE go_memstats_mcache_inuse_bytes gauge +go_memstats_mcache_inuse_bytes 3472 +# HELP go_memstats_mcache_sys_bytes Number of bytes used for mcache structures obtained from system. +# TYPE go_memstats_mcache_sys_bytes gauge +go_memstats_mcache_sys_bytes 16384 +# HELP go_memstats_mspan_inuse_bytes Number of bytes in use by mspan structures. +# TYPE go_memstats_mspan_inuse_bytes gauge +go_memstats_mspan_inuse_bytes 128896 +# HELP go_memstats_mspan_sys_bytes Number of bytes used for mspan structures obtained from system. +# TYPE go_memstats_mspan_sys_bytes gauge +go_memstats_mspan_sys_bytes 163840 +# HELP go_memstats_next_gc_bytes Number of heap bytes when next garbage collection will take place. +# TYPE go_memstats_next_gc_bytes gauge +go_memstats_next_gc_bytes 1.1965008e+07 +# HELP go_memstats_other_sys_bytes Number of bytes used for other system allocations. +# TYPE go_memstats_other_sys_bytes gauge +go_memstats_other_sys_bytes 589762 +# HELP go_memstats_stack_inuse_bytes Number of bytes in use by the stack allocator. +# TYPE go_memstats_stack_inuse_bytes gauge +go_memstats_stack_inuse_bytes 720896 +# HELP go_memstats_stack_sys_bytes Number of bytes obtained from system for stack allocator. +# TYPE go_memstats_stack_sys_bytes gauge +go_memstats_stack_sys_bytes 720896 +# HELP go_memstats_sys_bytes Number of bytes obtained by system. Sum of all system allocations. +# TYPE go_memstats_sys_bytes gauge +go_memstats_sys_bytes 1.8987256e+07 +# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds. +# TYPE process_cpu_seconds_total counter +process_cpu_seconds_total 1391.13 +# HELP process_max_fds Maximum number of open file descriptors. +# TYPE process_max_fds gauge +process_max_fds 65536 +# HELP process_open_fds Number of open file descriptors. +# TYPE process_open_fds gauge +process_open_fds 10 +# HELP process_resident_memory_bytes Resident memory size in bytes. +# TYPE process_resident_memory_bytes gauge +process_resident_memory_bytes 3.9559168e+07 +# HELP process_start_time_seconds Start time of the process since unix epoch in seconds. +# TYPE process_start_time_seconds gauge +process_start_time_seconds 1.5550271644e+09 +# HELP process_virtual_memory_bytes Virtual memory size in bytes. +# TYPE process_virtual_memory_bytes gauge +process_virtual_memory_bytes 5.1757056e+07 diff --git a/cni-metrics-helper/metrics/metrics.go b/cni-metrics-helper/metrics/metrics.go new file mode 100644 index 0000000000..a88d8af560 --- /dev/null +++ b/cni-metrics-helper/metrics/metrics.go @@ -0,0 +1,455 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +// Package metrics provide common data structure and routines for converting/aggregating prometheus metrics to cloudwatch metrics +package metrics + +import ( + "bytes" + "fmt" + + "github.com/aws/aws-sdk-go/aws" + + "github.com/aws/aws-sdk-go/service/cloudwatch" + "github.com/golang/glog" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + clientset "k8s.io/client-go/kubernetes" + + "github.com/aws/amazon-vpc-cni-k8s/pkg/publisher" +) + +type metricMatcher func(metric *dto.Metric) bool +type actionFuncType func(aggregatedValue *float64, sampleValue float64) + +type metricsTarget interface { + grabMetricsFromTarget(target string) ([]byte, error) + getInterestingMetrics() map[string]metricsConvert + getCWMetricsPublisher() publisher.Publisher + getTargetList() []string + submitCloudWatch() bool +} + +type metricsConvert struct { + actions []metricsAction +} + +type metricsAction struct { + cwMetricName string + matchFunc metricMatcher + actionFunc actionFuncType + data *dataPoints + bucket *bucketPoints + logToFile bool +} + +type dataPoints struct { + lastSingleDataPoint float64 + curSingleDataPoint float64 +} + +type bucketPoint struct { + CumulativeCount *float64 + UpperBound *float64 +} + +type bucketPoints struct { + lastBucket []*bucketPoint + curBucket []*bucketPoint +} + +func matchAny(metric *dto.Metric) bool { + return true +} + +func metricsAdd(aggregatedValue *float64, sampleValue float64) { + *aggregatedValue += sampleValue +} + +func metricsMax(aggregatedValue *float64, sampleValue float64) { + if *aggregatedValue < sampleValue { + *aggregatedValue = sampleValue + } +} + +func getMetricsFromPod(client clientset.Interface, podName string, namespace string, port int) ([]byte, error) { + rawOutput, err := client.CoreV1().RESTClient().Get(). + Namespace(namespace). + Resource("pods"). + SubResource("proxy"). + Name(fmt.Sprintf("%v:%v", podName, port)). + Suffix("metrics"). + Do().Raw() + if err != nil { + return nil, err + } + return rawOutput, nil +} + +func processGauge(metric *dto.Metric, act *metricsAction) { + if act.logToFile { + glog.Infof("Label: %v, Value: %v ", metric.GetLabel(), metric.GetGauge().GetValue()) + } else { + glog.V(10).Info("processing GAUGE: ", metric.GetGauge().GetValue()) + } + act.actionFunc(&act.data.curSingleDataPoint, metric.GetGauge().GetValue()) +} + +func processCounter(metric *dto.Metric, act *metricsAction) { + if act.logToFile { + glog.Infof("Label: %v, Value: %v ", metric.GetLabel(), metric.GetCounter().GetValue()) + } else { + glog.V(10).Info("processing COUNTER: ", metric.GetCounter().GetValue()) + } + act.actionFunc(&act.data.curSingleDataPoint, metric.GetCounter().GetValue()) +} + +func processPercentile(metric *dto.Metric, act *metricsAction) { + var p99 float64 + + glog.V(10).Info("processing PERCENTILE: ", p99) + summary := metric.GetSummary() + quantiles := summary.GetQuantile() + + for _, q := range quantiles { + if q.GetQuantile() == 0.99 { + p99 = q.GetValue() + } + } + act.actionFunc(&act.data.curSingleDataPoint, p99) +} + +func processHistogram(metric *dto.Metric, act *metricsAction) { + glog.V(5).Info("processing HISTOGRAM:", metric.GetLabel()) + glog.V(5).Info(metric.GetHistogram()) + histogram := metric.GetHistogram() + + for _, bucket := range histogram.GetBucket() { + glog.V(10).Info("processing bucket:", bucket) + existingBucket := false + for _, bucketInAct := range act.bucket.curBucket { + if bucket.GetUpperBound() == *bucketInAct.UpperBound { + // found the matching bucket + glog.V(10).Infof("Found the matching bucket with UpperBound: %f", *bucketInAct.UpperBound) + act.actionFunc(bucketInAct.CumulativeCount, float64(bucket.GetCumulativeCount())) + glog.V(10).Infof("Found: *bucketInAct.CumulativeCount:%f, bucket.GetCumulativeCount():%f", + *bucketInAct.CumulativeCount, float64(bucket.GetCumulativeCount())) + existingBucket = true + break + } + } + + if !existingBucket { + glog.V(10).Infof("Create a new bucket with upperBound:%f", bucket.GetUpperBound()) + upperBound := new(float64) + *upperBound = float64(bucket.GetUpperBound()) + cumulativeCount := new(float64) + *cumulativeCount = float64(bucket.GetCumulativeCount()) + newBucket := &bucketPoint{UpperBound: upperBound, CumulativeCount: cumulativeCount} + act.bucket.curBucket = append(act.bucket.curBucket, newBucket) + } + } +} + +func postProcessingCounter(convert metricsConvert) bool { + resetDetected := false + noPreviousDataPoint := true + noCurrentDataPoint := true + for _, action := range convert.actions { + currentTotal := action.data.curSingleDataPoint + // Only do delta if metric target did NOT restart + if action.data.curSingleDataPoint < action.data.lastSingleDataPoint { + resetDetected = true + } else { + action.data.curSingleDataPoint -= action.data.lastSingleDataPoint + } + + if action.data.lastSingleDataPoint != 0 { + noPreviousDataPoint = false + } + + if action.data.curSingleDataPoint != 0 { + noCurrentDataPoint = false + } + + action.data.lastSingleDataPoint = currentTotal + } + + if resetDetected || (noPreviousDataPoint && !noCurrentDataPoint) { + glog.Infof("Reset detected resetDetected: %v, noPreviousDataPoint: %v, noCurrentDataPoint: %v", + resetDetected, noPreviousDataPoint, noCurrentDataPoint) + } + return resetDetected || (noPreviousDataPoint && !noCurrentDataPoint) +} + +func postProcessingHistogram(convert metricsConvert) bool { + resetDetected := false + noLastBucket := true + + for _, action := range convert.actions { + numOfBuckets := len(action.bucket.curBucket) + if numOfBuckets == 0 { + glog.Info(" Post Histogram Processing: no bucket found") + continue + } + for i := 1; i < numOfBuckets; i++ { + glog.V(10).Infof("Found numOfBuckets-i:=%d, *action.bucket.curBucket[numOfBuckets-i].CumulativeCount=%f", + numOfBuckets-i, *action.bucket.curBucket[numOfBuckets-i].CumulativeCount) + + // Delta against the previous bucket value + // e.g. diff between bucket LE250000 and previous bucket LE125000 + *action.bucket.curBucket[numOfBuckets-i].CumulativeCount -= *action.bucket.curBucket[numOfBuckets-i-1].CumulativeCount + glog.V(10).Infof("Found numOfBuckets-i:=%d, *action.bucket.curBucket[numOfBuckets-i].CumulativeCount=%f, *action.bucket.curBucket[numOfBuckets-i-1].CumulativeCount=%f", + numOfBuckets-i, *action.bucket.curBucket[numOfBuckets-i].CumulativeCount, *action.bucket.curBucket[numOfBuckets-i-1].CumulativeCount) + + // Delta against the previous value + if action.bucket.lastBucket != nil { + glog.V(10).Infof("Found *action.bucket.lastBucket[numOfBuckets-i].CumulativeCount=%f", + *action.bucket.lastBucket[numOfBuckets-i].CumulativeCount) + currentTotal := *action.bucket.curBucket[numOfBuckets-i].CumulativeCount + // Only do delta if there is no restart for metric target + if *action.bucket.curBucket[numOfBuckets-i].CumulativeCount >= *action.bucket.lastBucket[numOfBuckets-i].CumulativeCount { + *action.bucket.curBucket[numOfBuckets-i].CumulativeCount -= *action.bucket.lastBucket[numOfBuckets-i].CumulativeCount + glog.V(10).Infof("Found *action.bucket.lastBucket[numOfBuckets-i].CumulativeCount=%f, *action.bucket.lastBucket[numOfBuckets-i].CumulativeCount=%f", + *action.bucket.curBucket[numOfBuckets-i].CumulativeCount, *action.bucket.lastBucket[numOfBuckets-i].CumulativeCount) + } else { + resetDetected = true + } + *action.bucket.lastBucket[numOfBuckets-i].CumulativeCount = currentTotal + } + } + + if action.bucket.lastBucket != nil { + currentTotal := *action.bucket.curBucket[0].CumulativeCount + // Only do delta if there is no restart for metric target + if *action.bucket.curBucket[0].CumulativeCount >= *action.bucket.lastBucket[0].CumulativeCount { + *action.bucket.curBucket[0].CumulativeCount -= *action.bucket.lastBucket[0].CumulativeCount + } else { + resetDetected = true + } + *action.bucket.lastBucket[0].CumulativeCount = currentTotal + } + + if action.bucket.lastBucket == nil { + action.bucket.lastBucket = action.bucket.curBucket + } else { + noLastBucket = false + } + } + return resetDetected || noLastBucket +} + +func processMetric(family *dto.MetricFamily, convert metricsConvert) (bool, error) { + glog.Info("Processing metric: ", family.GetName()) + resetDetected := false + + mType := family.GetType() + for _, metric := range family.GetMetric() { + for _, act := range convert.actions { + if act.matchFunc(metric) { + switch mType { + case dto.MetricType_GAUGE: + processGauge(metric, &act) + case dto.MetricType_HISTOGRAM: + processHistogram(metric, &act) + case dto.MetricType_COUNTER: + processCounter(metric, &act) + case dto.MetricType_SUMMARY: + processPercentile(metric, &act) + } + } + } + } + + switch mType { + case dto.MetricType_COUNTER: + curResetDetected := postProcessingCounter(convert) + if curResetDetected { + resetDetected = true + } + case dto.MetricType_GAUGE: + // no addition work needs for GAUGE + case dto.MetricType_SUMMARY: + // no addition work needs for PERCENTILE + case dto.MetricType_HISTOGRAM: + curResetDetected := postProcessingHistogram(convert) + if curResetDetected { + resetDetected = true + } + } + return resetDetected, nil +} + +func produceHistogram(act metricsAction, cw publisher.Publisher) { + prevUpperBound := float64(0) + for _, bucket := range act.bucket.curBucket { + mid := (*bucket.UpperBound-float64(prevUpperBound))/2 + prevUpperBound + if mid == *bucket.UpperBound { + newMid := prevUpperBound + prevUpperBound/2 + mid = newMid + } + + prevUpperBound = *bucket.UpperBound + if *bucket.CumulativeCount != 0 { + glog.Infof("Produce HISTOGRAM metrics: %s, max:%f, min:%f, count: %f, sum: %f", + act.cwMetricName, mid, mid, *bucket.CumulativeCount, mid*float64(*bucket.CumulativeCount)) + dataPoint := &cloudwatch.MetricDatum{ + MetricName: aws.String(act.cwMetricName), + StatisticValues: &cloudwatch.StatisticSet{ + Maximum: aws.Float64(mid), + Minimum: aws.Float64(mid), + SampleCount: aws.Float64(*bucket.CumulativeCount), + Sum: aws.Float64(mid * float64(*bucket.CumulativeCount)), + }, + } + cw.Publish(dataPoint) + } + } +} + +func filterMetrics(originalMetrics map[string]*dto.MetricFamily, + interestingMetrics map[string]metricsConvert) (map[string]*dto.MetricFamily, error) { + result := map[string]*dto.MetricFamily{} + + for metric := range interestingMetrics { + if family, found := originalMetrics[metric]; found { + result[metric] = family + + } + } + return result, nil +} + +func produceCloudWatchMetrics(t metricsTarget, families map[string]*dto.MetricFamily, convertDef map[string]metricsConvert, cw publisher.Publisher) { + for key, family := range families { + convertMetrics := convertDef[key] + mType := family.GetType() + for _, action := range convertMetrics.actions { + switch mType { + case dto.MetricType_COUNTER: + glog.Infof("Produce COUNTER metrics: %s, value: %f", action.cwMetricName, action.data.curSingleDataPoint) + if t.submitCloudWatch() { + dataPoint := &cloudwatch.MetricDatum{ + MetricName: aws.String(action.cwMetricName), + Unit: aws.String(cloudwatch.StandardUnitCount), + Value: aws.Float64(action.data.curSingleDataPoint), + } + cw.Publish(dataPoint) + } + case dto.MetricType_GAUGE: + glog.Infof("Produce GAUGE metrics: %s, value: %f", action.cwMetricName, action.data.curSingleDataPoint) + if t.submitCloudWatch() { + dataPoint := &cloudwatch.MetricDatum{ + MetricName: aws.String(action.cwMetricName), + Unit: aws.String(cloudwatch.StandardUnitCount), + Value: aws.Float64(action.data.curSingleDataPoint), + } + cw.Publish(dataPoint) + } + case dto.MetricType_SUMMARY: + glog.Infof("Produce PERCENTILE metrics: %s, value: %f", action.cwMetricName, action.data.curSingleDataPoint) + if t.submitCloudWatch() { + dataPoint := &cloudwatch.MetricDatum{ + MetricName: aws.String(action.cwMetricName), + Unit: aws.String(cloudwatch.StandardUnitCount), + Value: aws.Float64(action.data.curSingleDataPoint), + } + cw.Publish(dataPoint) + } + case dto.MetricType_HISTOGRAM: + if t.submitCloudWatch() { + produceHistogram(action, cw) + } + } + } + } +} + +func resetMetrics(interestingMetrics map[string]metricsConvert) { + for _, convert := range interestingMetrics { + for _, act := range convert.actions { + if act.data != nil { + act.data.curSingleDataPoint = 0 + } + + if act.bucket != nil { + act.bucket.curBucket = make([]*bucketPoint, 0) + } + } + } +} + +func metricsListGrabAggregateConvert(t metricsTarget) (map[string]*dto.MetricFamily, map[string]metricsConvert, bool, error) { + var resetDetected = false + var families map[string]*dto.MetricFamily + + interestingMetrics := t.getInterestingMetrics() + resetMetrics(interestingMetrics) + + targetList := t.getTargetList() + glog.Info("targetList: ", targetList) + glog.Info("len(targetList)", len(targetList)) + for _, target := range targetList { + glog.Infof("Grab/Aggregate metrics from %v", target) + rawOutput, err := t.grabMetricsFromTarget(target) + if err != nil { + glog.Errorf("Failed to getMetricsFromTarget: %v", err) + // it may take times to remove some metric targets + continue + } + + parser := &expfmt.TextParser{} + origFamilies, err := parser.TextToMetricFamilies(bytes.NewReader(rawOutput)) + + if err != nil { + glog.Warning("Failed to parse metrics:", err) + return nil, nil, true, err + } + + families, err = filterMetrics(origFamilies, interestingMetrics) + if err != nil { + glog.Warning("Failed to filter metrics:", err) + return nil, nil, true, err + } + + for _, family := range families { + convert := interestingMetrics[family.GetName()] + curReset, err := processMetric(family, convert) + if err != nil { + return nil, nil, true, err + } + if curReset { + resetDetected = true + } + } + } + + // TODO resetDetected is NOT right for cniMetrics, so force it for now + if len(targetList) > 1 { + resetDetected = false + } + + return families, interestingMetrics, resetDetected, nil +} + +// Handler grabs metrics from target, aggregates the metrics and convert them into cloudwatch metrics +func Handler(t metricsTarget) { + families, interestingMetrics, resetDetected, err := metricsListGrabAggregateConvert(t) + + if err != nil || resetDetected { + glog.Info("Skipping 1st poll after reset, error:", err) + } + + cw := t.getCWMetricsPublisher() + produceCloudWatchMetrics(t, families, interestingMetrics, cw) +} diff --git a/cni-metrics-helper/metrics/metrics_test.go b/cni-metrics-helper/metrics/metrics_test.go new file mode 100644 index 0000000000..8104c49c54 --- /dev/null +++ b/cni-metrics-helper/metrics/metrics_test.go @@ -0,0 +1,91 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package metrics + +import ( + "io/ioutil" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/aws/amazon-vpc-cni-k8s/pkg/publisher" +) + +type testMetricsTarget struct { + metricFile string + interestingMetrics map[string]metricsConvert + targetList []string +} + +func newTestMetricsTarget(metricFile string, interestingMetrics map[string]metricsConvert) *testMetricsTarget { + return &testMetricsTarget{ + metricFile: metricFile, + interestingMetrics: interestingMetrics} +} + +func (target *testMetricsTarget) grabMetricsFromTarget(targetName string) ([]byte, error) { + testMetrics, _ := ioutil.ReadFile(target.metricFile) + + return testMetrics, nil +} + +func (target *testMetricsTarget) getInterestingMetrics() map[string]metricsConvert { + return target.interestingMetrics +} + +func (target *testMetricsTarget) getCWMetricsPublisher() publisher.Publisher { + return nil +} + +func (target *testMetricsTarget) getTargetList() []string { + return []string{target.metricFile} +} + +func (target *testMetricsTarget) submitCloudWatch() bool { + return false +} + +func TestAPIServerMetric(t *testing.T) { + testTarget := newTestMetricsTarget("cni_test1.data", InterestingCNIMetrics) + + _, _, resetDetected, err := metricsListGrabAggregateConvert(testTarget) + assert.NoError(t, err) + assert.True(t, resetDetected) + + actions := InterestingCNIMetrics["awscni_assigned_ip_addresses"].actions + // verify awscni_assigned_ip_addresses value + assert.Equal(t, 1.0, actions[0].data.curSingleDataPoint) + + actions = InterestingCNIMetrics["awscni_total_ip_addresses"].actions + // verify awscni_total_ip_addresses value + assert.Equal(t, 10.0, actions[0].data.curSingleDataPoint) + + actions = InterestingCNIMetrics["awscni_aws_api_error_count"].actions + // verify awscni_aws_api_error_count value + assert.Equal(t, 14.0, actions[0].data.curSingleDataPoint) + + actions = InterestingCNIMetrics["awscni_eni_allocated"].actions + // verify awscni_eni_allocated value + assert.Equal(t, 2.0, actions[0].data.curSingleDataPoint) + + actions = InterestingCNIMetrics["awscni_add_ip_req_count"].actions + // verify awscni_add_ip_req_count value + assert.Equal(t, 100.0, actions[0].data.curSingleDataPoint) + + actions = InterestingCNIMetrics["awscni_aws_api_latency_ms"].actions + // verify apiserver_request_latencies_bucket + assert.Equal(t, "awsAPILatency", actions[0].cwMetricName) + assert.Equal(t, 1.0, actions[0].data.curSingleDataPoint) + assert.Equal(t, 0.0, actions[0].data.lastSingleDataPoint) +} diff --git a/misc/cni_metrics_helper.yaml b/config/v1.4/cni_metrics_helper.yaml similarity index 97% rename from misc/cni_metrics_helper.yaml rename to config/v1.4/cni_metrics_helper.yaml index 05fdcff4ef..3724fcaa78 100644 --- a/misc/cni_metrics_helper.yaml +++ b/config/v1.4/cni_metrics_helper.yaml @@ -77,9 +77,9 @@ spec: spec: serviceAccountName: cni-metrics-helper containers: - - image: 694065802095.dkr.ecr.us-west-2.amazonaws.com/cni-metrics-helper:0.1.1 + - image: 694065802095.dkr.ecr.us-west-2.amazonaws.com/cni-metrics-helper:v1.4.0 imagePullPolicy: Always name: cni-metrics-helper env: - name: USE_CLOUDWATCH - value: "no" + value: "yes" diff --git a/pkg/ec2metadatawrapper/ec2metadatawrapper.go b/pkg/ec2metadatawrapper/ec2metadatawrapper.go new file mode 100644 index 0000000000..c6a55d7aa5 --- /dev/null +++ b/pkg/ec2metadatawrapper/ec2metadatawrapper.go @@ -0,0 +1,49 @@ +// package ecmetadatawrapper is used to retrieve data from EC2 IMDS +package ec2metadatawrapper + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/ec2metadata" + "github.com/aws/aws-sdk-go/aws/session" +) + +const ( + metadataRetries = 5 +) + +// TODO: Move away from using mock + +// HttpClient is used to help with testing +type HttpClient interface { + GetInstanceIdentityDocument() (ec2metadata.EC2InstanceIdentityDocument, error) + Region() (string, error) +} + +// EC2MetadataClient to used to obtain a subset of information from EC2 IMDS +type EC2MetadataClient interface { + GetInstanceIdentityDocument() (ec2metadata.EC2InstanceIdentityDocument, error) + Region() (string, error) +} + +type ec2MetadataClientImpl struct { + client HttpClient +} + +// New creates an ec2metadata client to retrieve metadata +func New(client HttpClient) EC2MetadataClient { + if client == nil { + return &ec2MetadataClientImpl{client: ec2metadata.New(session.New(), aws.NewConfig().WithMaxRetries(metadataRetries))} + } else { + return &ec2MetadataClientImpl{client: client} + } +} + +// InstanceIdentityDocument returns instance identity documents +// http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-identity-documents.html +func (c *ec2MetadataClientImpl) GetInstanceIdentityDocument() (ec2metadata.EC2InstanceIdentityDocument, error) { + return c.client.GetInstanceIdentityDocument() +} + +func (c *ec2MetadataClientImpl) Region() (string, error) { + return c.client.Region() +} diff --git a/pkg/ec2metadatawrapper/ec2metadatawrapper_test.go b/pkg/ec2metadatawrapper/ec2metadatawrapper_test.go new file mode 100644 index 0000000000..d9f910eee6 --- /dev/null +++ b/pkg/ec2metadatawrapper/ec2metadatawrapper_test.go @@ -0,0 +1,88 @@ +package ec2metadatawrapper + +import ( + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws/ec2metadata" + "github.com/aws/wesley-k8s-metrics/pkg/ec2metadatawrapper/mocks" + + "github.com/golang/mock/gomock" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +const ( + // iidRegion is the instance identity document region + iidRegion = "us-east-1" +) + +var testInstanceIdentityDoc = ec2metadata.EC2InstanceIdentityDocument{ + PrivateIP: "172.1.1.1", + AvailabilityZone: "us-east-1a", + Version: "2010-08-31", + Region: "us-east-1", + AccountID: "012345678901", + InstanceID: "i-01234567", + BillingProducts: []string{"bp-01234567"}, + ImageID: "ami-12345678", + InstanceType: "t2.micro", + PendingTime: time.Now(), + Architecture: "x86_64", +} + +func TestGetInstanceIdentityDocHappyPath(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockGetter := mock_ec2metadatawrapper.NewMockHttpClient(ctrl) + testClient := New(mockGetter) + + mockGetter.EXPECT().GetInstanceIdentityDocument().Return(testInstanceIdentityDoc, nil) + + doc, err := testClient.GetInstanceIdentityDocument() + assert.NoError(t, err) + assert.Equal(t, iidRegion, doc.Region) +} + +func TestGetInstanceIdentityDocError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockGetter := mock_ec2metadatawrapper.NewMockHttpClient(ctrl) + testClient := New(mockGetter) + + mockGetter.EXPECT().GetInstanceIdentityDocument().Return(ec2metadata.EC2InstanceIdentityDocument{}, errors.New("test error")) + + doc, err := testClient.GetInstanceIdentityDocument() + assert.Error(t, err) + assert.Empty(t, doc.Region) +} + +func TestGetRegionHappyPath(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockGetter := mock_ec2metadatawrapper.NewMockHttpClient(ctrl) + testClient := New(mockGetter) + + mockGetter.EXPECT().Region().Return(iidRegion, nil) + + region, err := testClient.Region() + assert.NoError(t, err) + assert.Equal(t, iidRegion, region) +} + +func TestGetRegionErr(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockGetter := mock_ec2metadatawrapper.NewMockHttpClient(ctrl) + testClient := New(mockGetter) + + mockGetter.EXPECT().Region().Return("", errors.New("test error")) + + region, err := testClient.Region() + assert.Error(t, err) + assert.Empty(t, region) +} diff --git a/pkg/ec2metadatawrapper/mocks/mock_ec2metadatawrapper.go b/pkg/ec2metadatawrapper/mocks/mock_ec2metadatawrapper.go new file mode 100644 index 0000000000..5dd4fd3c3a --- /dev/null +++ b/pkg/ec2metadatawrapper/mocks/mock_ec2metadatawrapper.go @@ -0,0 +1,95 @@ +// Automatically generated by MockGen. DO NOT EDIT! +// Source: ec2metadatawrapper.go + +package mock_ec2metadatawrapper + +import ( + ec2metadata "github.com/aws/aws-sdk-go/aws/ec2metadata" + gomock "github.com/golang/mock/gomock" +) + +// Mock of HttpClient interface +type MockHttpClient struct { + ctrl *gomock.Controller + recorder *_MockHttpClientRecorder +} + +// Recorder for MockHttpClient (not exported) +type _MockHttpClientRecorder struct { + mock *MockHttpClient +} + +func NewMockHttpClient(ctrl *gomock.Controller) *MockHttpClient { + mock := &MockHttpClient{ctrl: ctrl} + mock.recorder = &_MockHttpClientRecorder{mock} + return mock +} + +func (_m *MockHttpClient) EXPECT() *_MockHttpClientRecorder { + return _m.recorder +} + +func (_m *MockHttpClient) GetInstanceIdentityDocument() (ec2metadata.EC2InstanceIdentityDocument, error) { + ret := _m.ctrl.Call(_m, "GetInstanceIdentityDocument") + ret0, _ := ret[0].(ec2metadata.EC2InstanceIdentityDocument) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockHttpClientRecorder) GetInstanceIdentityDocument() *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "GetInstanceIdentityDocument") +} + +func (_m *MockHttpClient) Region() (string, error) { + ret := _m.ctrl.Call(_m, "Region") + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockHttpClientRecorder) Region() *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "Region") +} + +// Mock of EC2MetadataClient interface +type MockEC2MetadataClient struct { + ctrl *gomock.Controller + recorder *_MockEC2MetadataClientRecorder +} + +// Recorder for MockEC2MetadataClient (not exported) +type _MockEC2MetadataClientRecorder struct { + mock *MockEC2MetadataClient +} + +func NewMockEC2MetadataClient(ctrl *gomock.Controller) *MockEC2MetadataClient { + mock := &MockEC2MetadataClient{ctrl: ctrl} + mock.recorder = &_MockEC2MetadataClientRecorder{mock} + return mock +} + +func (_m *MockEC2MetadataClient) EXPECT() *_MockEC2MetadataClientRecorder { + return _m.recorder +} + +func (_m *MockEC2MetadataClient) GetInstanceIdentityDocument() (ec2metadata.EC2InstanceIdentityDocument, error) { + ret := _m.ctrl.Call(_m, "GetInstanceIdentityDocument") + ret0, _ := ret[0].(ec2metadata.EC2InstanceIdentityDocument) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockEC2MetadataClientRecorder) GetInstanceIdentityDocument() *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "GetInstanceIdentityDocument") +} + +func (_m *MockEC2MetadataClient) Region() (string, error) { + ret := _m.ctrl.Call(_m, "Region") + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockEC2MetadataClientRecorder) Region() *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "Region") +} diff --git a/pkg/ec2wrapper/ec2wrapper.go b/pkg/ec2wrapper/ec2wrapper.go new file mode 100644 index 0000000000..684cbfd62f --- /dev/null +++ b/pkg/ec2wrapper/ec2wrapper.go @@ -0,0 +1,77 @@ +// package ec2wrapper is used to wrap around the ec2 service APIs +package ec2wrapper + +import ( + "github.com/aws/amazon-vpc-cni-k8s/pkg/ec2metadatawrapper" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/ec2metadata" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/ec2/ec2iface" + "github.com/golang/glog" + + "github.com/pkg/errors" +) + +const ( + maxRetries = 5 + resourceID = "resource-id" + resourceKey = "key" + clusterIDTag = "CLUSTER_ID" +) + +// EC2Wrapper is used to wrap around EC2 service APIs to obtain ClusterID from +// the ec2 instance tags +type EC2Wrapper struct { + ec2ServiceClient ec2iface.EC2API + instanceIdentityDocument ec2metadata.EC2InstanceIdentityDocument +} + +// New returns an instance of the EC2 wrapper +func NewMetricsClient() (*EC2Wrapper, error) { + metricsSession := session.Must(session.NewSession()) + ec2MetadataClient := ec2metadatawrapper.New(nil) + + instanceIdentityDocument, err := ec2MetadataClient.GetInstanceIdentityDocument() + if err != nil { + return &EC2Wrapper{}, err + } + + ec2ServiceClient := ec2.New(metricsSession, aws.NewConfig().WithMaxRetries(maxRetries).WithRegion(instanceIdentityDocument.Region)) + + return &EC2Wrapper{ + ec2ServiceClient: ec2ServiceClient, + instanceIdentityDocument: instanceIdentityDocument, + }, nil +} + +// GetClusterTag is used to retrieve a tag from the ec2 instance +func (e *EC2Wrapper) GetClusterTag(tagKey string) (string, error) { + input := ec2.DescribeTagsInput{ + Filters: []*ec2.Filter{ + { + Name: aws.String(resourceID), + Values: []*string{ + aws.String(e.instanceIdentityDocument.InstanceID), + }, + }, { + Name: aws.String(resourceKey), + Values: []*string{ + aws.String(tagKey), + }, + }, + }, + } + + glog.Info("Calling DescribeTags with key ", tagKey) + results, err := e.ec2ServiceClient.DescribeTags(&input) + if err != nil { + return "", errors.Wrap(err, "GetClusterTag: Unable to obtain EC2 instance tags") + } + + if len(results.Tags) < 1 { + return "", errors.Errorf("GetClusterTag: No tag matching key: %s", tagKey) + } + + return aws.StringValue(results.Tags[0].Value), nil +} diff --git a/pkg/ec2wrapper/ec2wrapper_test.go b/pkg/ec2wrapper/ec2wrapper_test.go new file mode 100644 index 0000000000..6ffc43c3c9 --- /dev/null +++ b/pkg/ec2wrapper/ec2wrapper_test.go @@ -0,0 +1,91 @@ +package ec2wrapper + +import ( + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/ec2metadata" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/ec2/ec2iface" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +var testInstanceIdentityDocument = ec2metadata.EC2InstanceIdentityDocument{ + PrivateIP: "172.1.1.1", + AvailabilityZone: "us-east-1a", + Version: "2010-08-31", + Region: "us-east-1", + AccountID: "012345678901", + InstanceID: "i-01234567", + BillingProducts: []string{"bp-01234567"}, + ImageID: "ami-12345678", + InstanceType: "t2.micro", + PendingTime: time.Now(), + Architecture: "x86_64", +} + +func TestGetClusterID(t *testing.T) { + mockEC2ServiceClient := mockEC2ServiceClient{ + tags: &ec2.DescribeTagsOutput{ + Tags: []*ec2.TagDescription{ + { + Value: aws.String("TEST_CLUSTER_ID"), + }, + }, + }, + } + + ec2wrap := EC2Wrapper{ + ec2ServiceClient: mockEC2ServiceClient, + instanceIdentityDocument: testInstanceIdentityDocument, + } + + clusterID, err := ec2wrap.GetClusterTag(clusterIDTag) + assert.NoError(t, err) + assert.NotNil(t, clusterID) +} + +func TestGetClusterIDWithError(t *testing.T) { + mockEC2ServiceClient := mockEC2ServiceClient{ + tagsErr: errors.New("test error"), + } + + ec2wrap := EC2Wrapper{ + ec2ServiceClient: mockEC2ServiceClient, + instanceIdentityDocument: testInstanceIdentityDocument, + } + + clusterID, err := ec2wrap.GetClusterTag(clusterIDTag) + assert.Error(t, err) + assert.Empty(t, clusterID) +} + +func TestGetClusterIDWithInsufficientTags(t *testing.T) { + mockEC2ServiceClient := mockEC2ServiceClient{ + tags: &ec2.DescribeTagsOutput{ + Tags: []*ec2.TagDescription{}, + }, + } + + ec2wrap := EC2Wrapper{ + ec2ServiceClient: mockEC2ServiceClient, + instanceIdentityDocument: testInstanceIdentityDocument, + } + + clusterID, err := ec2wrap.GetClusterTag(clusterIDTag) + assert.Error(t, err) + assert.Empty(t, clusterID) +} + +type mockEC2ServiceClient struct { + ec2iface.EC2API + tags *ec2.DescribeTagsOutput + tagsErr error +} + +func (f mockEC2ServiceClient) DescribeTags(input *ec2.DescribeTagsInput) (*ec2.DescribeTagsOutput, error) { + return f.tags, f.tagsErr + +} diff --git a/pkg/k8sapi/discovery.go b/pkg/k8sapi/discovery.go index cafbd459cc..a7470124e3 100644 --- a/pkg/k8sapi/discovery.go +++ b/pkg/k8sapi/discovery.go @@ -4,6 +4,7 @@ package k8sapi import ( "fmt" "os" + "strings" "sync" "time" @@ -31,6 +32,10 @@ type controller struct { informer cache.Controller } +const ( + cniPodName = "aws-node" +) + // K8SAPIs defines interface to use kubelet introspection API type K8SAPIs interface { K8SGetLocalPodIPs() ([]*K8SPodInfo, error) @@ -57,6 +62,9 @@ type Controller struct { workerPods map[string]*K8SPodInfo workerPodsLock sync.RWMutex + cniPods map[string]string + cniPodsLock sync.RWMutex + controller *controller kubeClient kubernetes.Interface myNodeName string @@ -67,6 +75,7 @@ type Controller struct { func NewController(clientset kubernetes.Interface) *Controller { return &Controller{kubeClient: clientset, myNodeName: os.Getenv("MY_NODE_NAME"), + cniPods: make(map[string]string), workerPods: make(map[string]*K8SPodInfo)} } @@ -91,6 +100,23 @@ func CreateKubeClient() (clientset.Interface, error) { return kubeClient, nil } +// GetCNIPods return the list of CNI pod names +func (d *Controller) GetCNIPods() []string { + var cniPods []string + + log.Info("GetCNIPods start...") + + d.cniPodsLock.Lock() + defer d.cniPodsLock.Unlock() + + for k := range d.cniPods { + cniPods = append(cniPods, k) + } + + log.Info("GetCNIPods discovered", cniPods) + return cniPods +} + // DiscoverK8SPods discovers Pods running in the cluster func (d *Controller) DiscoverK8SPods() { // create the pod watcher @@ -198,6 +224,11 @@ func (d *Controller) handlePodUpdate(key string) error { d.workerPodsLock.Lock() defer d.workerPodsLock.Unlock() delete(d.workerPods, key) + if strings.HasPrefix(key, metav1.NamespaceSystem+"/"+cniPodName) { + d.cniPodsLock.Lock() + defer d.cniPodsLock.Unlock() + delete(d.cniPods, key) + } return nil } @@ -223,6 +254,12 @@ func (d *Controller) handlePodUpdate(key string) error { } log.Infof(" Add/Update for Pod %s on my node, namespace = %s, IP = %s", podName, d.workerPods[key].Namespace, d.workerPods[key].IP) + } else if strings.HasPrefix(key, metav1.NamespaceSystem+"/"+cniPodName) { + d.cniPodsLock.Lock() + defer d.cniPodsLock.Unlock() + + log.Infof(" Add/Update for CNI pod %s", podName) + d.cniPods[podName] = podName } return nil } diff --git a/pkg/publisher/mock_publisher/mock_publisher.go b/pkg/publisher/mock_publisher/mock_publisher.go new file mode 100644 index 0000000000..36e2d6bf90 --- /dev/null +++ b/pkg/publisher/mock_publisher/mock_publisher.go @@ -0,0 +1,68 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: publisher.go + +// Package mock_publisher is a generated GoMock package. +package mock_publisher + +import ( + cloudwatch "github.com/aws/aws-sdk-go/service/cloudwatch" + gomock "github.com/golang/mock/gomock" + reflect "reflect" +) + +// MockPublisher is a mock of Publisher interface +type MockPublisher struct { + ctrl *gomock.Controller + recorder *MockPublisherMockRecorder +} + +// MockPublisherMockRecorder is the mock recorder for MockPublisher +type MockPublisherMockRecorder struct { + mock *MockPublisher +} + +// NewMockPublisher creates a new mock instance +func NewMockPublisher(ctrl *gomock.Controller) *MockPublisher { + mock := &MockPublisher{ctrl: ctrl} + mock.recorder = &MockPublisherMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockPublisher) EXPECT() *MockPublisherMockRecorder { + return m.recorder +} + +// Publish mocks base method +func (m *MockPublisher) Publish(metricDataPoints ...*cloudwatch.MetricDatum) { + varargs := []interface{}{} + for _, a := range metricDataPoints { + varargs = append(varargs, a) + } + m.ctrl.Call(m, "Publish", varargs...) +} + +// Publish indicates an expected call of Publish +func (mr *MockPublisherMockRecorder) Publish(metricDataPoints ...interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*MockPublisher)(nil).Publish), metricDataPoints...) +} + +// Start mocks base method +func (m *MockPublisher) Start() { + m.ctrl.Call(m, "Start") +} + +// Start indicates an expected call of Start +func (mr *MockPublisherMockRecorder) Start() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockPublisher)(nil).Start)) +} + +// Stop mocks base method +func (m *MockPublisher) Stop() { + m.ctrl.Call(m, "Stop") +} + +// Stop indicates an expected call of Stop +func (mr *MockPublisherMockRecorder) Stop() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockPublisher)(nil).Stop)) +} diff --git a/pkg/publisher/publisher.go b/pkg/publisher/publisher.go new file mode 100644 index 0000000000..399ad7b265 --- /dev/null +++ b/pkg/publisher/publisher.go @@ -0,0 +1,226 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +// Package publisher is used to batch and send metric data to CloudWatch +package publisher + +import ( + "context" + "sync" + "time" + + "github.com/aws/amazon-vpc-cni-k8s/pkg/ec2metadatawrapper" + "github.com/aws/amazon-vpc-cni-k8s/pkg/ec2wrapper" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/cloudwatch" + "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface" + + "github.com/golang/glog" + "github.com/pkg/errors" +) + +const ( + // defaultInterval for monitoring the watch list + defaultInterval = time.Second * 60 + + // cloudwatchMetricNamespace for custom metrics + cloudwatchMetricNamespace = "Kubernetes" + + // Metric dimension constants + clusterIDDimension = "CLUSTER_ID" + + // localMetricData is the default size for the local queue(slice) + localMetricDataSize = 100 + + // cloudwatchClientMaxRetries for configuring CloudWatch client with maximum retries + cloudwatchClientMaxRetries = 20 + + // maxDataPoints is the maximum number of data points per PutMetricData API request + maxDataPoints = 20 +) + +// Publisher defines the interface to publish one or more data points +type Publisher interface { + // Publish publishes one or more metric data points + Publish(metricDataPoints ...*cloudwatch.MetricDatum) + + // Start is to initiate the batch and publish operation + Start() + + // Stop is to terminate the batch and publish operation + Stop() +} + +// cloudWatchPublisher implements the `Publisher` interface for batching and publishing +// metric data to the CloudWatch metrics backend +type cloudWatchPublisher struct { + ctx context.Context + cancel context.CancelFunc + updateIntervalTicker *time.Ticker + clusterID string + cloudwatchClient cloudwatchiface.CloudWatchAPI + localMetricData []*cloudwatch.MetricDatum + lock sync.RWMutex +} + +// New returns a new instance of `Publisher` +func New(ctx context.Context) (Publisher, error) { + // Get AWS session + awsSession := session.Must(session.NewSession()) + + // Get cluster-ID + ec2Client, err := ec2wrapper.NewMetricsClient() + if err != nil { + return nil, errors.Wrap(err, "publisher: unable to obtain EC2 service client") + } + clusterID, err := ec2Client.GetClusterTag("CLUSTER_ID") + if err != nil || clusterID == "" { + glog.Errorf("Failed to obtain cluster-id, fetching name. %v", err) + clusterID, err = ec2Client.GetClusterTag("Name") + if err != nil || clusterID == "" { + glog.Errorf("Failed to obtain cluster-id or name, defaulting to 'k8s-cluster'. %v", err) + clusterID = "k8s-cluster" + } + } + glog.Info("Using cluster ID ", clusterID) + + // Get CloudWatch client + ec2MetadataClient := ec2metadatawrapper.New(nil) + + region, err := ec2MetadataClient.Region() + if err != nil { + return nil, errors.Wrap(err, "publisher: Unable to obtain region") + } + cloudwatchClient := cloudwatch.New(awsSession, aws.NewConfig().WithMaxRetries(cloudwatchClientMaxRetries).WithRegion(region)) + + // Build derived context + derivedContext, cancel := context.WithCancel(ctx) + + return &cloudWatchPublisher{ + ctx: derivedContext, + cancel: cancel, + cloudwatchClient: cloudwatchClient, + clusterID: clusterID, + localMetricData: make([]*cloudwatch.MetricDatum, 0, localMetricDataSize), + }, nil +} + +// Start is used to setup the monitor loop +func (p *cloudWatchPublisher) Start() { + glog.V(2).Info("Starting monitor loop for CloudWatch publisher") + p.monitor(defaultInterval) +} + +// Stop is used to cancel the monitor loop +func (p *cloudWatchPublisher) Stop() { + glog.V(2).Info("Stopping monitor loop for CloudWatch publisher") + p.cancel() +} + +// Publish is a variadic function to publish one or more metric data points +func (p *cloudWatchPublisher) Publish(metricDataPoints ...*cloudwatch.MetricDatum) { + // Fetch dimensions for override + glog.V(2).Info("Fetching CloudWatch dimensions") + dimensions := p.getCloudWatchMetricDatumDimensions() + + // Grab lock + p.lock.Lock() + defer p.lock.Unlock() + + // NOTE: Iteration is used to override the metric dimensions + for _, metricDatum := range metricDataPoints { + metricDatum.Dimensions = dimensions + p.localMetricData = append(p.localMetricData, metricDatum) + } +} + +func (p *cloudWatchPublisher) pushLocal() { + p.lock.Lock() + data := p.localMetricData[:] + p.localMetricData = make([]*cloudwatch.MetricDatum, 0, localMetricDataSize) + p.lock.Unlock() + p.push(data) +} + +func (p *cloudWatchPublisher) push(metricData []*cloudwatch.MetricDatum) { + if len(metricData) == 0 { + glog.Warning("Missing data for publishing CloudWatch metrics") + return + } + + // Setup input + input := cloudwatch.PutMetricDataInput{} + input.Namespace = p.getCloudWatchMetricNamespace() + + // NOTE: Ensure cap of 40K per request and enforce rate limiting + for len(metricData) > 0 { + input.MetricData = metricData[:maxDataPoints] + + // Publish data + err := p.send(input) + if err != nil { + glog.Errorf("Unable to publish CloudWatch metrics: %v", err) + } + + // Mutate slice + index := min(maxDataPoints, len(metricData)) + metricData = metricData[index:] + + // Reset Input + input = cloudwatch.PutMetricDataInput{} + input.Namespace = p.getCloudWatchMetricNamespace() + } +} + +func (p *cloudWatchPublisher) send(input cloudwatch.PutMetricDataInput) error { + glog.V(2).Info("Sending data to CloudWatch metrics") + _, err := p.cloudwatchClient.PutMetricData(&input) + return err +} + +func (p *cloudWatchPublisher) monitor(interval time.Duration) { + p.updateIntervalTicker = time.NewTicker(interval) + + for { + select { + case <-p.updateIntervalTicker.C: + p.pushLocal() + + case <-p.ctx.Done(): + p.Stop() + return + } + } +} + +func (p *cloudWatchPublisher) getCloudWatchMetricNamespace() *string { + return aws.String(cloudwatchMetricNamespace) +} + +func (p *cloudWatchPublisher) getCloudWatchMetricDatumDimensions() []*cloudwatch.Dimension { + return []*cloudwatch.Dimension{ + { + Name: aws.String(clusterIDDimension), + Value: aws.String(p.clusterID), + }, + } +} + +// min is a helper to compute the min of two integers +func min(x, y int) int { + if x < y { + return x + } + return y +} diff --git a/pkg/publisher/publisher_test.go b/pkg/publisher/publisher_test.go new file mode 100644 index 0000000000..35ef0d5958 --- /dev/null +++ b/pkg/publisher/publisher_test.go @@ -0,0 +1,264 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package publisher + +import ( + "context" + "strconv" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/cloudwatch" + "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +const ( + testClusterID = "TEST_CLUSTER_ID" + testMetricOne = "TEST_METRIC_ONE" + testMonitorDuration = time.Millisecond * 10 +) + +func TestCloudWatchPubisherWithSingleDatum(t *testing.T) { + cloudwatchPublisher := getCloudWatchPublisher(t) + + testCloudwatchMetricDatum := &cloudwatch.MetricDatum{ + MetricName: aws.String(testMetricOne), + Unit: aws.String(cloudwatch.StandardUnitNone), + Value: aws.Float64(1.0), + } + + cloudwatchPublisher.Publish(testCloudwatchMetricDatum) + assert.Len(t, cloudwatchPublisher.localMetricData, 1) + assert.EqualValues(t, cloudwatchPublisher.localMetricData[0], testCloudwatchMetricDatum) + + cloudwatchPublisher.pushLocal() + assert.Empty(t, cloudwatchPublisher.localMetricData) +} + +func TestCloudWatchPubisherWithMultipleDatum(t *testing.T) { + cloudwatchPublisher := getCloudWatchPublisher(t) + + var metricDataPoints []*cloudwatch.MetricDatum + + for i := 0; i < 10; i++ { + metricName := "TEST_METRIC_" + strconv.Itoa(i) + testCloudwatchMetricDatum := &cloudwatch.MetricDatum{ + MetricName: aws.String(metricName), + Unit: aws.String(cloudwatch.StandardUnitNone), + Value: aws.Float64(1.0), + } + metricDataPoints = append(metricDataPoints, testCloudwatchMetricDatum) + } + + cloudwatchPublisher.Publish(metricDataPoints...) + assert.Len(t, cloudwatchPublisher.localMetricData, 10) + cloudwatchPublisher.pushLocal() + + assert.Empty(t, cloudwatchPublisher.localMetricData) +} + +func TestCloudWatchPubisherWithGreaterThanMaxDatapoints(t *testing.T) { + cloudwatchPublisher := getCloudWatchPublisher(t) + + var metricDataPoints []*cloudwatch.MetricDatum + + for i := 0; i < 30; i++ { + metricName := "TEST_METRIC_" + strconv.Itoa(i) + testCloudwatchMetricDatum := &cloudwatch.MetricDatum{ + MetricName: aws.String(metricName), + Unit: aws.String(cloudwatch.StandardUnitNone), + Value: aws.Float64(1.0), + } + metricDataPoints = append(metricDataPoints, testCloudwatchMetricDatum) + } + + cloudwatchPublisher.Publish(metricDataPoints...) + assert.Len(t, cloudwatchPublisher.localMetricData, 30) + cloudwatchPublisher.pushLocal() + + assert.Empty(t, cloudwatchPublisher.localMetricData) +} + +func TestCloudWatchPubisherWithGreaterThanMaxDatapointsAndStop(t *testing.T) { + cloudwatchPublisher := getCloudWatchPublisher(t) + + var metricDataPoints []*cloudwatch.MetricDatum + for i := 0; i < 30; i++ { + metricName := "TEST_METRIC_" + strconv.Itoa(i) + testCloudwatchMetricDatum := &cloudwatch.MetricDatum{ + MetricName: aws.String(metricName), + Unit: aws.String(cloudwatch.StandardUnitNone), + Value: aws.Float64(1.0), + } + metricDataPoints = append(metricDataPoints, testCloudwatchMetricDatum) + } + + cloudwatchPublisher.Publish(metricDataPoints...) + assert.Len(t, cloudwatchPublisher.localMetricData, 30) + + go cloudwatchPublisher.monitor(testMonitorDuration) + + <-time.After(2 * testMonitorDuration) + cloudwatchPublisher.Stop() + + assert.Empty(t, cloudwatchPublisher.localMetricData) +} + +func TestCloudWatchPubisherWithSingleDatumWithError(t *testing.T) { + derivedContext, cancel := context.WithCancel(context.TODO()) + + mockCloudWatch := mockCloudWatchClient{mockPutMetricDataError: errors.New("test error")} + + cloudwatchPublisher := &cloudWatchPublisher{ + ctx: derivedContext, + cancel: cancel, + cloudwatchClient: mockCloudWatch, + clusterID: testClusterID, + localMetricData: make([]*cloudwatch.MetricDatum, 0, localMetricDataSize), + } + + testCloudwatchMetricDatum := &cloudwatch.MetricDatum{ + MetricName: aws.String(testMetricOne), + Unit: aws.String(cloudwatch.StandardUnitNone), + Value: aws.Float64(1.0), + } + + cloudwatchPublisher.Publish(testCloudwatchMetricDatum) + assert.Len(t, cloudwatchPublisher.localMetricData, 1) + assert.EqualValues(t, cloudwatchPublisher.localMetricData[0], testCloudwatchMetricDatum) + + cloudwatchPublisher.pushLocal() + assert.Empty(t, cloudwatchPublisher.localMetricData) +} + +func TestCloudWatchPubisherWithGreaterThanMaxDatapointsAndExplicitStartStop(t *testing.T) { + t.Skip() + cloudwatchPublisher := getCloudWatchPublisher(t) + + var metricDataPoints []*cloudwatch.MetricDatum + + for i := 0; i < 30; i++ { + metricName := "TEST_METRIC_" + strconv.Itoa(i) + testCloudwatchMetricDatum := &cloudwatch.MetricDatum{ + MetricName: aws.String(metricName), + Unit: aws.String(cloudwatch.StandardUnitNone), + Value: aws.Float64(1.0), + } + metricDataPoints = append(metricDataPoints, testCloudwatchMetricDatum) + } + + cloudwatchPublisher.Publish(metricDataPoints...) + assert.Len(t, cloudwatchPublisher.localMetricData, 30) + go cloudwatchPublisher.Start() + + <-time.After(2 * defaultInterval) + cloudwatchPublisher.Stop() + + assert.Empty(t, cloudwatchPublisher.localMetricData) +} + +func TestGetCloudWatchMetricNamespace(t *testing.T) { + cloudwatchPublisher := getCloudWatchPublisher(t) + + testNamespace := cloudwatchPublisher.getCloudWatchMetricNamespace() + assert.Equal(t, aws.StringValue(testNamespace), cloudwatchMetricNamespace) +} + +func TestGetCloudWatchMetricDatumDimensions(t *testing.T) { + cloudwatchPublisher := getCloudWatchPublisher(t) + + expectedCloudwatchDimensions := []*cloudwatch.Dimension{ + &cloudwatch.Dimension{ + Name: aws.String(clusterIDDimension), + Value: aws.String(testClusterID), + }, + } + testCloudwatchDimensions := cloudwatchPublisher.getCloudWatchMetricDatumDimensions() + + assert.Equal(t, testCloudwatchDimensions, expectedCloudwatchDimensions) +} + +func TestGetCloudWatchMetricDatumDimensionsWithMissingClusterID(t *testing.T) { + cloudwatchPublisher := &cloudWatchPublisher{} + + expectedCloudwatchDimensions := []*cloudwatch.Dimension{ + &cloudwatch.Dimension{ + Name: aws.String(clusterIDDimension), + Value: aws.String(""), + }, + } + testCloudwatchDimensions := cloudwatchPublisher.getCloudWatchMetricDatumDimensions() + + assert.Equal(t, testCloudwatchDimensions, expectedCloudwatchDimensions) +} + +func TestPublishWithNoData(t *testing.T) { + cloudwatchPublisher := &cloudWatchPublisher{} + + testMetricDataPoints := []*cloudwatch.MetricDatum{} + + cloudwatchPublisher.Publish(testMetricDataPoints...) + assert.Empty(t, cloudwatchPublisher.localMetricData) +} + +func TestPushWithMissingData(t *testing.T) { + cloudwatchPublisher := &cloudWatchPublisher{} + testMetricDataPoints := []*cloudwatch.MetricDatum{} + + cloudwatchPublisher.push(testMetricDataPoints) + assert.Empty(t, cloudwatchPublisher.localMetricData) +} + +func TestPublisherNewWithoutClusterID(t *testing.T) { + cloudwatchPublisher, err := New(context.TODO()) + assert.Error(t, err) + assert.Nil(t, cloudwatchPublisher) +} + +func TestMin(t *testing.T) { + a, b := 1, 2 + + minimum := min(a, b) + assert.Equal(t, minimum, a) + + minimum = min(b, a) + assert.Equal(t, minimum, a) +} + +// mockCloudWatchClient is used to facilitate testing +type mockCloudWatchClient struct { + cloudwatchiface.CloudWatchAPI + mockPutMetricDataError error +} + +func (m mockCloudWatchClient) PutMetricData(input *cloudwatch.PutMetricDataInput) (*cloudwatch.PutMetricDataOutput, error) { + return &cloudwatch.PutMetricDataOutput{}, m.mockPutMetricDataError +} + +func getCloudWatchPublisher(t *testing.T) *cloudWatchPublisher { + // Setup context + derivedContext, cancel := context.WithCancel(context.TODO()) + + return &cloudWatchPublisher{ + ctx: derivedContext, + cancel: cancel, + cloudwatchClient: mockCloudWatchClient{}, + clusterID: testClusterID, + localMetricData: make([]*cloudwatch.MetricDatum, 0, localMetricDataSize), + } +} diff --git a/scripts/dockerfiles/Dockerfile.metrics b/scripts/dockerfiles/Dockerfile.metrics new file mode 100644 index 0000000000..0090a2912c --- /dev/null +++ b/scripts/dockerfiles/Dockerfile.metrics @@ -0,0 +1,32 @@ +FROM golang:1.12-stretch as builder +WORKDIR /go/src/github.com/aws/amazon-vpc-cni-k8s + +ARG arch +ENV ARCH=$arch + +# Force the go compiler to use modules. +ENV GO111MODULE=on + +# go.mod and go.sum go into their own layers. +COPY go.mod . +COPY go.sum . + +# This ensures `go mod download` happens only when go.mod and go.sum change. +RUN go mod download + +COPY . . +RUN make build-metrics + +FROM amazonlinux:2 +RUN yum update -y && \ + yum clean all + +WORKDIR /app + +COPY --from=builder /go/src/github.com/aws/amazon-vpc-cni-k8s/cni-metrics-helper/cni-metrics-helper /app + +# Copy our bundled certs to the first place go will check: see +# https://golang.org/src/pkg/crypto/x509/root_unix.go +COPY --from=builder /go/src/github.com/aws/amazon-vpc-cni-k8s/misc/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt + +ENTRYPOINT /app/cni-metrics-helper --cloudwatch=false