Skip to content

Commit

Permalink
chore(kuma-cp) migrate DiscoveryRequest/Response in KDS to V3 (#2541)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dyszkiewicz <jakub.dyszkiewicz@gmail.com>
  • Loading branch information
jakubdyszkiewicz committed Aug 13, 2021
1 parent 834b4fe commit 163d808
Show file tree
Hide file tree
Showing 30 changed files with 550 additions and 357 deletions.
244 changes: 62 additions & 182 deletions api/mesh/v1alpha1/kds.pb.go

Large diffs are not rendered by default.

13 changes: 3 additions & 10 deletions api/mesh/v1alpha1/kds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,12 @@ package kuma.mesh.v1alpha1;
option go_package = "github.com/kumahq/kuma/api/mesh/v1alpha1";

import "google/protobuf/timestamp.proto";
import "envoy/api/v2/discovery.proto";
import "envoy/service/discovery/v3/discovery.proto";
import "google/protobuf/any.proto";

service KumaDiscoveryService {

rpc DeltaKumaResources(stream envoy.api.v2.DeltaDiscoveryRequest)
returns (stream envoy.api.v2.DeltaDiscoveryResponse);

rpc StreamKumaResources(stream envoy.api.v2.DiscoveryRequest)
returns (stream envoy.api.v2.DiscoveryResponse);

rpc FetchKumaResources(envoy.api.v2.DiscoveryRequest)
returns (envoy.api.v2.DiscoveryResponse);
rpc StreamKumaResources(stream envoy.service.discovery.v3.DiscoveryRequest)
returns (stream envoy.service.discovery.v3.DiscoveryResponse);
}

message KumaResource {
Expand Down
108 changes: 78 additions & 30 deletions api/mesh/v1alpha1/mux.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions api/mesh/v1alpha1/mux.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ package kuma.mesh.v1alpha1;
option go_package = "github.com/kumahq/kuma/api/mesh/v1alpha1";

import "envoy/api/v2/discovery.proto";
import "envoy/service/discovery/v3/discovery.proto";

service MultiplexService {
rpc StreamMessage(stream Message) returns (stream Message);
}

message Message {
oneof value {
envoy.api.v2.DiscoveryRequest request = 1;
envoy.api.v2.DiscoveryResponse response = 2;
envoy.api.v2.DiscoveryRequest legacy_request = 1;
envoy.api.v2.DiscoveryResponse legacy_response = 2;
envoy.service.discovery.v3.DiscoveryRequest request = 3;
envoy.service.discovery.v3.DiscoveryResponse response = 4;
}
}
12 changes: 6 additions & 6 deletions pkg/kds/cache/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@ import (
"fmt"

envoy_types "github.com/envoyproxy/go-control-plane/pkg/cache/types"
envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v2"
envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/pkg/errors"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/registry"
util_xds "github.com/kumahq/kuma/pkg/util/xds"
util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3"
)

type ResourceBuilder interface {
}

type SnapshotBuilder interface {
With(typ string, resources []envoy_types.Resource) SnapshotBuilder
Build(version string) util_xds.Snapshot
Build(version string) util_xds_v3.Snapshot
}

type builder struct {
Expand All @@ -37,7 +37,7 @@ func (b *builder) With(typ string, resources []envoy_types.Resource) SnapshotBui
return b
}

func (b *builder) Build(version string) util_xds.Snapshot {
func (b *builder) Build(version string) util_xds_v3.Snapshot {
snapshot := &Snapshot{Resources: map[string]envoy_cache.Resources{}}
for _, typ := range snapshot.GetSupportedTypes() {
snapshot.Resources[typ] = envoy_cache.NewResources(version, nil)
Expand All @@ -57,7 +57,7 @@ type Snapshot struct {
Resources map[string]envoy_cache.Resources
}

var _ util_xds.Snapshot = &Snapshot{}
var _ util_xds_v3.Snapshot = &Snapshot{}

func (s *Snapshot) GetSupportedTypes() (types []string) {
for _, def := range registry.Global().ObjectTypes(model.HasKdsEnabled()) {
Expand Down Expand Up @@ -110,7 +110,7 @@ func (s *Snapshot) GetVersion(typ string) string {
return ""
}

func (s *Snapshot) WithVersion(typ string, version string) util_xds.Snapshot {
func (s *Snapshot) WithVersion(typ string, version string) util_xds_v3.Snapshot {
if s == nil {
return nil
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/kds/client/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package client_test
import (
"time"

envoy_api_v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

Expand Down Expand Up @@ -35,20 +35,20 @@ var _ = Describe("KDS Sink", func() {
}
})

It("", func() {
It("should verify KDS exchange", func() {
vrf := kds_verifier.New().
Exec(kds_verifier.WaitRequest(defaultTimeout, func(req *envoy_api_v2.DiscoveryRequest) {
Exec(kds_verifier.WaitRequest(defaultTimeout, func(req *envoy_sd.DiscoveryRequest) {
Expect(req.TypeUrl).To(Equal(string(mesh.MeshType)))
})).
Exec(kds_verifier.WaitRequest(defaultTimeout, func(req *envoy_api_v2.DiscoveryRequest) {
Exec(kds_verifier.WaitRequest(defaultTimeout, func(req *envoy_sd.DiscoveryRequest) {
Expect(req.TypeUrl).To(Equal(string(mesh.DataplaneType)))
})).
Exec(kds_verifier.DiscoveryResponse(
&mesh.MeshResourceList{Items: []*mesh.MeshResource{
{Meta: &test_model.ResourceMeta{Name: "mesh1"}, Spec: samples.Mesh1},
{Meta: &test_model.ResourceMeta{Name: "mesh2"}, Spec: samples.Mesh2},
}}, "1", "1")).
Exec(kds_verifier.WaitRequest(defaultTimeout, func(rs *envoy_api_v2.DiscoveryRequest) {
Exec(kds_verifier.WaitRequest(defaultTimeout, func(rs *envoy_sd.DiscoveryRequest) {
Expect(rs.VersionInfo).To(Equal("1"))
Expect(rs.ResponseNonce).To(Equal("1"))
Expect(rs.TypeUrl).To(Equal(string(mesh.MeshType)))
Expand Down
18 changes: 9 additions & 9 deletions pkg/kds/client/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package client
import (
"fmt"

envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoy_core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/protobuf/types/known/structpb"

Expand All @@ -28,16 +28,16 @@ var _ KDSStream = &stream{}

type stream struct {
streamClient mesh_proto.KumaDiscoveryService_StreamKumaResourcesClient
latestACKed map[string]*envoy.DiscoveryResponse
latestReceived map[string]*envoy.DiscoveryResponse
latestACKed map[string]*envoy_sd.DiscoveryResponse
latestReceived map[string]*envoy_sd.DiscoveryResponse
clientId string
}

func NewKDSStream(s mesh_proto.KumaDiscoveryService_StreamKumaResourcesClient, clientId string) KDSStream {
return &stream{
streamClient: s,
latestACKed: make(map[string]*envoy.DiscoveryResponse),
latestReceived: make(map[string]*envoy.DiscoveryResponse),
latestACKed: make(map[string]*envoy_sd.DiscoveryResponse),
latestReceived: make(map[string]*envoy_sd.DiscoveryResponse),
clientId: clientId,
}
}
Expand All @@ -54,7 +54,7 @@ func (s *stream) DiscoveryRequest(resourceType model.ResourceType) error {
if err != nil {
return err
}
return s.streamClient.Send(&envoy.DiscoveryRequest{
return s.streamClient.Send(&envoy_sd.DiscoveryRequest{
VersionInfo: "",
ResponseNonce: "",
Node: &envoy_core.Node{
Expand Down Expand Up @@ -88,7 +88,7 @@ func (s *stream) ACK(typ string) error {
if latestReceived == nil {
return nil
}
err := s.streamClient.Send(&envoy.DiscoveryRequest{
err := s.streamClient.Send(&envoy_sd.DiscoveryRequest{
VersionInfo: latestReceived.VersionInfo,
ResponseNonce: latestReceived.Nonce,
ResourceNames: []string{},
Expand All @@ -109,7 +109,7 @@ func (s *stream) NACK(typ string, err error) error {
return nil
}
latestACKed := s.latestACKed[typ]
return s.streamClient.Send(&envoy.DiscoveryRequest{
return s.streamClient.Send(&envoy_sd.DiscoveryRequest{
VersionInfo: latestACKed.GetVersionInfo(),
ResponseNonce: latestReceived.Nonce,
ResourceNames: []string{},
Expand Down
7 changes: 5 additions & 2 deletions pkg/kds/mux/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,11 @@ func (c *client) Start(stop <-chan struct{}) (errs error) {
}()
muxClient := mesh_proto.NewMultiplexServiceClient(conn)

withClientIDCtx := metadata.AppendToOutgoingContext(c.ctx, "client-id", c.clientID)
stream, err := muxClient.StreamMessage(withClientIDCtx)
withKDSCtx := metadata.AppendToOutgoingContext(c.ctx,
"client-id", c.clientID,
KDSVersionHeaderKey, KDSVersionV3,
)
stream, err := muxClient.StreamMessage(withKDSCtx)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 163d808

Please sign in to comment.