From 7f9f9865a23a478373ddc9f708e878dee3cd78e5 Mon Sep 17 00:00:00 2001 From: Alexander Yastrebov Date: Thu, 8 Feb 2024 17:23:23 +0100 Subject: [PATCH] skipper: discover redis shards without fetching full cluster state Add LoadEndpointAddresses to kubernetes dataclient that is similar to caching GetEndpointAddresses but does not rely on previous call to Load or LoadAll to allow discovery of redis shards without fetching full cluster state. Fixes #2476 Signed-off-by: Alexander Yastrebov --- dataclients/kubernetes/client_test.go | 26 +++ dataclients/kubernetes/clusterclient.go | 38 ++++ dataclients/kubernetes/endpointslices.go | 4 +- dataclients/kubernetes/kube.go | 8 +- redis_test.go | 235 +++++++++++++++-------- skipper.go | 60 ++++-- 6 files changed, 267 insertions(+), 104 deletions(-) diff --git a/dataclients/kubernetes/client_test.go b/dataclients/kubernetes/client_test.go index 4d274c9952..0dc2ac36bf 100644 --- a/dataclients/kubernetes/client_test.go +++ b/dataclients/kubernetes/client_test.go @@ -86,3 +86,29 @@ func TestClientGetEndpointAddresses(t *testing.T) { assert.Equal(t, []string{"42.0.1.1", "42.0.1.2", "42.0.1.3", "42.0.1.4"}, addrs) }) } + +func TestClientLoadEndpointAddresses(t *testing.T) { + t.Run("from endpoints", func(t *testing.T) { + client := newTestClient(t, + kubernetes.Options{}, + "testdata/ingressV1/ingress-data/lb-target-multi.yaml", + ) + + addrs, err := client.LoadEndpointAddresses("namespace1", "service1") + assert.NoError(t, err) + assert.Equal(t, []string{"42.0.1.2", "42.0.1.3"}, addrs) + }) + + t.Run("from endpointslices", func(t *testing.T) { + client := newTestClient(t, + kubernetes.Options{ + KubernetesEnableEndpointslices: true, + }, + "testdata/ingressV1/ingress-data/lb-target-multi-multiple-endpointslices-conditions-all-ready.yaml", + ) + + addrs, err := client.LoadEndpointAddresses("namespace1", "service1") + assert.NoError(t, err) + assert.Equal(t, []string{"42.0.1.1", "42.0.1.2", "42.0.1.3", "42.0.1.4"}, addrs) + }) +} diff --git a/dataclients/kubernetes/clusterclient.go b/dataclients/kubernetes/clusterclient.go index f09684910d..c677635f9c 100644 --- a/dataclients/kubernetes/clusterclient.go +++ b/dataclients/kubernetes/clusterclient.go @@ -497,6 +497,10 @@ func (c *clusterClient) loadEndpointSlices() (map[definitions.ResourceID]*skippe } log.Debugf("all endpointslices received: %d", len(endpointSlices.Items)) + return mapEndpointSlices(&endpointSlices), nil +} + +func mapEndpointSlices(endpointSlices *endpointSliceList) map[definitions.ResourceID]*skipperEndpointSlice { mapSlices := make(map[definitions.ResourceID][]*endpointSlice) for _, endpointSlice := range endpointSlices.Items { resID := endpointSlice.ToResourceID() // service resource ID @@ -550,6 +554,40 @@ func (c *clusterClient) loadEndpointSlices() (map[definitions.ResourceID]*skippe result[resID].Endpoints = append(result[resID].Endpoints, o) } } + return result +} + +// LoadEndpointAddresses returns the list of all addresses for the given service using endpoints or endpointslices API. +func (c *clusterClient) loadEndpointAddresses(namespace, name string) ([]string, error) { + var result []string + if c.enableEndpointSlices { + url := fmt.Sprintf(EndpointSlicesNamespaceFmt, namespace) + + toLabelSelectorQuery(map[string]string{endpointSliceServiceNameLabel: name}) + + var endpointSlices endpointSliceList + if err := c.getJSON(url, &endpointSlices); err != nil { + return nil, fmt.Errorf("requesting endpointslices for %s/%s failed: %v", namespace, name, err) + } + + mapped := mapEndpointSlices(&endpointSlices) + if len(mapped) != 1 { + return nil, fmt.Errorf("unexpected number of endpoint slices for %s/%s: %d", namespace, name, len(mapped)) + } + + for _, eps := range mapped { + result = eps.addresses() + break + } + } else { + url := fmt.Sprintf(EndpointsNamespaceFmt, namespace) + "/" + name + + var ep endpoint + if err := c.getJSON(url, &ep); err != nil { + return nil, fmt.Errorf("requesting endpoints for %s/%s failed: %v", namespace, name, err) + } + result = ep.addresses() + } + sort.Strings(result) return result, nil } diff --git a/dataclients/kubernetes/endpointslices.go b/dataclients/kubernetes/endpointslices.go index 436c3e4a42..e91818c80b 100644 --- a/dataclients/kubernetes/endpointslices.go +++ b/dataclients/kubernetes/endpointslices.go @@ -4,6 +4,8 @@ import ( "github.com/zalando/skipper/dataclients/kubernetes/definitions" ) +const endpointSliceServiceNameLabel = "kubernetes.io/service-name" + // There are [1..N] Kubernetes endpointslices created for a single Kubernetes service. // Kubernetes endpointslices of a given service can have duplicates with different states. // Therefore Kubernetes endpointslices need to be de-duplicated before usage. @@ -97,7 +99,7 @@ type endpointSlice struct { // ToResourceID returns the same string for a group endpointlisces created for the same svc func (eps *endpointSlice) ToResourceID() definitions.ResourceID { - svcName := eps.Meta.Labels["kubernetes.io/service-name"] + svcName := eps.Meta.Labels[endpointSliceServiceNameLabel] namespace := eps.Meta.Namespace return newResourceID(namespace, svcName) } diff --git a/dataclients/kubernetes/kube.go b/dataclients/kubernetes/kube.go index 7810fe7486..523b71fdd8 100644 --- a/dataclients/kubernetes/kube.go +++ b/dataclients/kubernetes/kube.go @@ -574,7 +574,8 @@ func (c *Client) fetchDefaultFilterConfigs() defaultFilters { return filters } -// GetEndpointAddresses returns the list of all addresses for the given service. +// GetEndpointAddresses returns the list of all addresses for the given service +// loaded by previous call to LoadAll or LoadUpdate. func (c *Client) GetEndpointAddresses(ns, name string) []string { c.mu.Lock() defer c.mu.Unlock() @@ -584,6 +585,11 @@ func (c *Client) GetEndpointAddresses(ns, name string) []string { return c.state.getEndpointAddresses(ns, name) } +// LoadEndpointAddresses returns the list of all addresses for the given service. +func (c *Client) LoadEndpointAddresses(namespace, name string) ([]string, error) { + return c.ClusterClient.loadEndpointAddresses(namespace, name) +} + func compareStringList(a, b []string) []string { c := make([]string, 0) for i := len(a) - 1; i >= 0; i-- { diff --git a/redis_test.go b/redis_test.go index b4bff88256..cd2f21a589 100644 --- a/redis_test.go +++ b/redis_test.go @@ -8,7 +8,6 @@ import ( "net" "net/http" stdlibhttptest "net/http/httptest" - "net/url" "os" "syscall" "testing" @@ -20,6 +19,7 @@ import ( flog "github.com/zalando/skipper/filters/accesslog" fscheduler "github.com/zalando/skipper/filters/scheduler" "github.com/zalando/skipper/loadbalancer" + "github.com/zalando/skipper/metrics" "github.com/zalando/skipper/metrics/metricstest" "github.com/zalando/skipper/net/httptest" "github.com/zalando/skipper/net/redistest" @@ -82,27 +82,15 @@ spec: // apiserver1 redisSpec1 := createRedisEndpointsSpec(t, redis1) - apiServer1, u1, err := createApiserver(kubeSpec + redisSpec1) - if err != nil { - t.Fatalf("Failed to start apiserver1: %v", err) - } - defer apiServer1.Close() + apiServer1 := createApiserver(t, kubeSpec+redisSpec1) // apiserver2 redisSpec2 := createRedisEndpointsSpec(t, redis1, redis2) - apiServer2, u2, err := createApiserver(kubeSpec + redisSpec2) - if err != nil { - t.Fatalf("Failed to start apiserver2: %v", err) - } - defer apiServer2.Close() + apiServer2 := createApiserver(t, kubeSpec+redisSpec2) // apiserver3 redisSpec3 := createRedisEndpointsSpec(t, redis1, redis2, redis3) - apiServer3, u3, err := createApiserver(kubeSpec + redisSpec3) - if err != nil { - t.Fatalf("Failed to start apiserver3: %v", err) - } - defer apiServer3.Close() + apiServer3 := createApiserver(t, kubeSpec+redisSpec3) // create skipper as LB to kube-apiservers fr := createFilterRegistry(fscheduler.NewFifo(), flog.NewEnableAccessLog()) @@ -113,11 +101,9 @@ spec: }) defer reg.Close() - docFmt := ` -r1: * -> enableAccessLog(4,5) -> fifo(100,100,"3s") -> ; -r2: PathRegexp("/endpoints") -> enableAccessLog(2,4,5) -> fifo(100,100,"3s") -> ; -` - docApiserver := fmt.Sprintf(docFmt, u1.String(), u2.String(), u3.String(), u1.String(), u2.String(), u3.String()) + docApiserver := fmt.Sprintf(`r1: * -> enableAccessLog(4,5) -> fifo(100,100,"3s") -> ;`, + apiServer1.URL, apiServer2.URL, apiServer3.URL) + dc, err := testdataclient.NewDoc(docApiserver) if err != nil { t.Fatalf("Failed to create testdataclient: %v", err) @@ -149,7 +135,7 @@ r2: PathRegexp("/endpoints") -> enableAccessLog(2,4,5) -> fifo(100,100,"3s") -> defer lb.Close() rsvo := skipper.Options{ - Address: ":8082", + Address: findAddress(t), KubernetesURL: lb.URL, KubernetesRedisServiceNamespace: "skipper", KubernetesRedisServiceName: "redis", @@ -160,21 +146,15 @@ r2: PathRegexp("/endpoints") -> enableAccessLog(2,4,5) -> fifo(100,100,"3s") -> go routesrv.Run(rsvo) - for { - rsp, _ := http.DefaultClient.Head("http://localhost:8082/routes") - if rsp != nil && rsp.StatusCode == 200 { - break - } - time.Sleep(100 * time.Millisecond) - } + waitForOK(t, "http://"+rsvo.Address+"/routes", 1*time.Second) // run skipper proxy that we want to test o := skipper.Options{ - Address: ":9090", + Address: findAddress(t), EnableRatelimiters: true, EnableSwarm: true, Kubernetes: true, - SwarmRedisEndpointsRemoteURL: "http://localhost:8082/swarm/redis/shards", + SwarmRedisEndpointsRemoteURL: "http://" + rsvo.Address + "/swarm/redis/shards", KubernetesURL: lb.URL, KubernetesHealthcheck: true, SourcePollTimeout: 1500 * time.Millisecond, @@ -182,26 +162,19 @@ r2: PathRegexp("/endpoints") -> enableAccessLog(2,4,5) -> fifo(100,100,"3s") -> ClusterRatelimitMaxGroupShards: 2, SwarmRedisDialTimeout: 100 * time.Millisecond, SuppressRouteUpdateLogs: false, - SupportListener: ":9091", + SupportListener: findAddress(t), SwarmRedisUpdateInterval: time.Second, } + runResult := make(chan error) sigs := make(chan os.Signal, 1) - go skipper.RunWithShutdown(o, sigs, nil) + go func() { runResult <- skipper.RunWithShutdown(o, sigs, nil) }() - for i := 0; i < 10; i++ { - t.Logf("Waiting for proxy being ready") - - rsp, _ := http.DefaultClient.Get("http://localhost:9090/kube-system/healthz") - if rsp != nil && rsp.StatusCode == 200 { - break - } - time.Sleep(100 * time.Millisecond) - } + waitForOK(t, "http://"+o.Address+"/kube-system/healthz", 1*time.Second) rate := 10 sec := 5 - va := httptest.NewVegetaAttacker("http://localhost:9090/test", rate, time.Second, time.Second) + va := httptest.NewVegetaAttacker("http://"+o.Address+"/test", rate, time.Second, time.Second) va.Attack(io.Discard, time.Duration(sec)*time.Second, "mytest") successRate := va.Success() @@ -231,6 +204,7 @@ r2: PathRegexp("/endpoints") -> enableAccessLog(2,4,5) -> fifo(100,100,"3s") -> assert.InEpsilon(t, sec*rate-(1*sec), countLimited, epsilon, fmt.Sprintf("Test should have limited requests between %d and %d", countLimited-int(epsilon), countLimited+int(epsilon))) sigs <- syscall.SIGTERM + assert.NoError(t, <-runResult) } func TestConcurrentKubernetesClusterStateAccess(t *testing.T) { @@ -281,27 +255,15 @@ spec: // apiserver1 redisSpec1 := createRedisEndpointsSpec(t, redis1) - apiServer1, u1, err := createApiserver(kubeSpec + redisSpec1) - if err != nil { - t.Fatalf("Failed to start apiserver1: %v", err) - } - defer apiServer1.Close() + apiServer1 := createApiserver(t, kubeSpec+redisSpec1) // apiserver2 redisSpec2 := createRedisEndpointsSpec(t, redis1, redis2) - apiServer2, u2, err := createApiserver(kubeSpec + redisSpec2) - if err != nil { - t.Fatalf("Failed to start apiserver2: %v", err) - } - defer apiServer2.Close() + apiServer2 := createApiserver(t, kubeSpec+redisSpec2) // apiserver3 redisSpec3 := createRedisEndpointsSpec(t, redis1, redis2, redis3) - apiServer3, u3, err := createApiserver(kubeSpec + redisSpec3) - if err != nil { - t.Fatalf("Failed to start apiserver3: %v", err) - } - defer apiServer3.Close() + apiServer3 := createApiserver(t, kubeSpec+redisSpec3) // create skipper as LB to kube-apiservers fr := createFilterRegistry(fscheduler.NewFifo(), flog.NewEnableAccessLog()) @@ -312,11 +274,9 @@ spec: }) defer reg.Close() - docFmt := ` -r1: * -> enableAccessLog(4,5) -> fifo(100,100,"3s") -> ; -r2: PathRegexp("/endpoints") -> enableAccessLog(2,4,5) -> fifo(100,100,"3s") -> ; -` - docApiserver := fmt.Sprintf(docFmt, u1.String(), u2.String(), u3.String(), u1.String(), u2.String(), u3.String()) + docApiserver := fmt.Sprintf(`r1: * -> enableAccessLog(4,5) -> fifo(100,100,"3s") -> ;`, + apiServer1.URL, apiServer2.URL, apiServer3.URL) + dc, err := testdataclient.NewDoc(docApiserver) if err != nil { t.Fatalf("Failed to create testdataclient: %v", err) @@ -349,7 +309,7 @@ r2: PathRegexp("/endpoints") -> enableAccessLog(2,4,5) -> fifo(100,100,"3s") -> // run skipper proxy that we want to test o := skipper.Options{ - Address: ":9090", + Address: findAddress(t), EnableRatelimiters: true, EnableSwarm: true, Kubernetes: true, @@ -363,26 +323,19 @@ r2: PathRegexp("/endpoints") -> enableAccessLog(2,4,5) -> fifo(100,100,"3s") -> ClusterRatelimitMaxGroupShards: 2, SwarmRedisDialTimeout: 100 * time.Millisecond, SuppressRouteUpdateLogs: false, - SupportListener: ":9091", + SupportListener: findAddress(t), SwarmRedisUpdateInterval: time.Second, } + runResult := make(chan error) sigs := make(chan os.Signal, 1) - go skipper.RunWithShutdown(o, sigs, nil) + go func() { runResult <- skipper.RunWithShutdown(o, sigs, nil) }() - for i := 0; i < 10; i++ { - t.Logf("Waiting for proxy being ready") - - rsp, _ := http.DefaultClient.Get("http://localhost:9090/kube-system/healthz") - if rsp != nil && rsp.StatusCode == 200 { - break - } - time.Sleep(100 * time.Millisecond) - } + waitForOK(t, "http://"+o.Address+"/kube-system/healthz", 1*time.Second) rate := 10 sec := 5 - va := httptest.NewVegetaAttacker("http://localhost:9090/test", rate, time.Second, time.Second) + va := httptest.NewVegetaAttacker("http://"+o.Address+"/test", rate, time.Second, time.Second) va.Attack(io.Discard, time.Duration(sec)*time.Second, "mytest") t.Logf("Success [0..1]: %0.2f", va.Success()) @@ -403,16 +356,100 @@ r2: PathRegexp("/endpoints") -> enableAccessLog(2,4,5) -> fifo(100,100,"3s") -> } sigs <- syscall.SIGTERM + assert.NoError(t, <-runResult) +} + +func TestRedisUpdater(t *testing.T) { + dm := metrics.Default + t.Cleanup(func() { metrics.Default = dm }) + + const redisUpdateInterval = 10 * time.Millisecond + const kubeSpec = ` +apiVersion: zalando.org/v1 +kind: RouteGroup +metadata: + name: target +spec: + backends: + - name: shunt + type: shunt + defaultBackends: + - backendName: shunt + routes: + - pathSubtree: /test + filters: + - disableAccessLog() + - clusterRatelimit("foo", 1, "1s") + - status(200) + - inlineContent("OK") +--- +apiVersion: v1 +kind: Service +metadata: + labels: + application: skipper-ingress-redis + name: redis + namespace: skipper +spec: + clusterIP: None + ports: + - port: 6379 + protocol: TCP + targetPort: 6379 + selector: + application: skipper-ingress-redis + type: ClusterIP +` + spec := kubeSpec + createRedisEndpointsSpec(t, "10.2.0.1:6379", "10.2.0.2:6379", "10.2.0.3:6379") + apiServer := createApiserver(t, spec) + + t.Run("dataclient disabled", func(t *testing.T) { + addr := findAddress(t) + metrics := &metricstest.MockMetrics{} + + o := skipper.Options{ + Address: addr, + EnableRatelimiters: true, + EnableSwarm: true, + Kubernetes: false, // do not enable kubernetes dataclient + KubernetesURL: apiServer.URL, + KubernetesRedisServiceNamespace: "skipper", + KubernetesRedisServiceName: "redis", + KubernetesRedisServicePort: 6379, + SwarmRedisUpdateInterval: redisUpdateInterval, + InlineRoutes: `Path("/ready") -> inlineContent("OK") -> `, + MetricsBackend: metrics, + } + + runResult := make(chan error) + sigs := make(chan os.Signal, 1) + go func() { runResult <- skipper.RunWithShutdown(o, sigs, nil) }() + + waitForOK(t, "http://"+addr+"/ready", 1*time.Second) + time.Sleep(2 * redisUpdateInterval) + + metrics.WithGauges(func(g map[string]float64) { + t.Logf("gauges: %v", g) + + assert.Equal(t, 1.0, g["routes.total"], "expected only the /ready route") + assert.Equal(t, 3.0, g["swarm.redis.shards"]) + }) + + sigs <- syscall.SIGTERM + assert.NoError(t, <-runResult) + }) } -func createApiserver(spec string) (*stdlibhttptest.Server, *url.URL, error) { +func createApiserver(t *testing.T, spec string) *stdlibhttptest.Server { + t.Helper() + api, err := kubernetestest.NewAPI(kubernetestest.TestAPIOptions{}, bytes.NewBufferString(spec)) - if err != nil { - return nil, nil, err - } + require.NoError(t, err) + apiServer := stdlibhttptest.NewServer(api) - u, err := url.Parse(apiServer.URL) - return apiServer, u, err + t.Cleanup(apiServer.Close) + + return apiServer } func createFilterRegistry(specs ...filters.Spec) filters.Registry { @@ -457,3 +494,37 @@ func createRedisEndpointsSpec(t *testing.T, addrs ...string) string { return fmt.Sprintf("---\n%s\n", b) } + +func findAddress(t *testing.T) string { + t.Helper() + + l, err := net.ListenTCP("tcp6", &net.TCPAddr{}) + require.NoError(t, err) + + addr := l.Addr().String() + require.NoError(t, l.Close()) + + return addr +} + +func waitForOK(t *testing.T, url string, timeout time.Duration) { + t.Helper() + + to := time.After(timeout) + for { + rsp, err := http.DefaultClient.Get(url) + if err == nil { + rsp.Body.Close() + if rsp.StatusCode == http.StatusOK { + return + } + } + + select { + case <-to: + t.Fatalf("timeout waiting for %s", url) + default: + time.Sleep(100 * time.Millisecond) + } + } +} diff --git a/skipper.go b/skipper.go index 95f1d5a2f0..a2701a7e86 100644 --- a/skipper.go +++ b/skipper.go @@ -1357,22 +1357,35 @@ func findKubernetesDataclient(dataClients []routing.DataClient) *kubernetes.Clie return kdc } -func getRedisUpdaterFunc(opts *Options, kdc *kubernetes.Client) func() ([]string, error) { - // TODO(sszuecs): make sure kubernetes dataclient is already initialized and - // has polled the data once or kdc.GetEndpointAdresses should be blocking - // call to kubernetes API - return func() ([]string, error) { - a := kdc.GetEndpointAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName) - log.Debugf("Redis updater called and found %d redis endpoints", len(a)) +func getKubernetesRedisUpdater(opts *Options, kdc *kubernetes.Client, loaded bool) func() ([]string, error) { + if loaded { + // TODO(sszuecs): make sure kubernetes dataclient is already initialized and + // has polled the data once or kdc.GetEndpointAdresses should be blocking + // call to kubernetes API + return func() ([]string, error) { + a := kdc.GetEndpointAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName) + log.Debugf("GetEndpointAddresses found %d redis endpoints", len(a)) + + return joinPort(a, opts.KubernetesRedisServicePort), nil + } + } else { + return func() ([]string, error) { + a, err := kdc.LoadEndpointAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName) + log.Debugf("LoadEndpointAddresses found %d redis endpoints, err: %v", len(a), err) - port := strconv.Itoa(opts.KubernetesRedisServicePort) - for i := 0; i < len(a); i++ { - a[i] = net.JoinHostPort(a[i], port) + return joinPort(a, opts.KubernetesRedisServicePort), err } - return a, nil } } +func joinPort(addrs []string, port int) []string { + p := strconv.Itoa(port) + for i := 0; i < len(addrs); i++ { + addrs[i] = net.JoinHostPort(addrs[i], p) + } + return addrs +} + type RedisEndpoint struct { Address string `json:"address"` } @@ -1381,7 +1394,7 @@ type RedisEndpoints struct { Endpoints []RedisEndpoint `json:"endpoints"` } -func updateEndpointsFromURL(address string) func() ([]string, error) { +func getRemoteURLRedisUpdater(address string) func() ([]string, error) { /* #nosec */ return func() ([]string, error) { resp, err := http.Get(address) @@ -1688,25 +1701,32 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { kdc := findKubernetesDataclient(dataClients) if kdc != nil { - redisOptions.AddrUpdater = getRedisUpdaterFunc(&o, kdc) - _, err = redisOptions.AddrUpdater() + redisOptions.AddrUpdater = getKubernetesRedisUpdater(&o, kdc, true) + } else { + kdc, err := kubernetes.New(o.KubernetesDataClientOptions()) if err != nil { - log.Errorf("Failed to update redis address %v", err) return err } - } else { - log.Errorf("Failed to find kubernetes dataclient, but redis shards should be get by kubernetes svc %s/%s", o.KubernetesRedisServiceNamespace, o.KubernetesRedisServiceName) + defer kdc.Close() + + redisOptions.AddrUpdater = getKubernetesRedisUpdater(&o, kdc, false) + } + + _, err = redisOptions.AddrUpdater() + if err != nil { + log.Errorf("Failed to update redis addresses from kubernetes: %v", err) + return err } } else if redisOptions != nil && o.SwarmRedisEndpointsRemoteURL != "" { log.Infof("Use remote address %s to fetch updates redis shards", o.SwarmRedisEndpointsRemoteURL) - redisOptions.AddrUpdater = updateEndpointsFromURL(o.SwarmRedisEndpointsRemoteURL) + redisOptions.AddrUpdater = getRemoteURLRedisUpdater(o.SwarmRedisEndpointsRemoteURL) + _, err = redisOptions.AddrUpdater() if err != nil { - log.Errorf("Failed to update redis endpoints from URL %v", err) + log.Errorf("Failed to update redis addresses from URL: %v", err) return err } } - } var ratelimitRegistry *ratelimit.Registry