Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(kuma-cp) migrate DiscoveryRequest/Response in KDS to V3 #2541

Merged
merged 4 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 62 additions & 182 deletions api/mesh/v1alpha1/kds.pb.go

Large diffs are not rendered by default.

13 changes: 3 additions & 10 deletions api/mesh/v1alpha1/kds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,12 @@ package kuma.mesh.v1alpha1;
option go_package = "github.com/kumahq/kuma/api/mesh/v1alpha1";

import "google/protobuf/timestamp.proto";
import "envoy/api/v2/discovery.proto";
import "envoy/service/discovery/v3/discovery.proto";
import "google/protobuf/any.proto";

service KumaDiscoveryService {

rpc DeltaKumaResources(stream envoy.api.v2.DeltaDiscoveryRequest)
returns (stream envoy.api.v2.DeltaDiscoveryResponse);

rpc StreamKumaResources(stream envoy.api.v2.DiscoveryRequest)
returns (stream envoy.api.v2.DiscoveryResponse);

rpc FetchKumaResources(envoy.api.v2.DiscoveryRequest)
returns (envoy.api.v2.DiscoveryResponse);
rpc StreamKumaResources(stream envoy.service.discovery.v3.DiscoveryRequest)
returns (stream envoy.service.discovery.v3.DiscoveryResponse);
}

message KumaResource {
Expand Down
101 changes: 74 additions & 27 deletions api/mesh/v1alpha1/mux.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions api/mesh/v1alpha1/mux.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package kuma.mesh.v1alpha1;
option go_package = "github.com/kumahq/kuma/api/mesh/v1alpha1";

import "envoy/api/v2/discovery.proto";
import "envoy/service/discovery/v3/discovery.proto";

service MultiplexService {
rpc StreamMessage(stream Message) returns (stream Message);
Expand All @@ -14,5 +15,7 @@ message Message {
oneof value {
envoy.api.v2.DiscoveryRequest request = 1;
envoy.api.v2.DiscoveryResponse response = 2;
envoy.service.discovery.v3.DiscoveryRequest requestV3 = 3;
envoy.service.discovery.v3.DiscoveryResponse responseV3 = 4;
jakubdyszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
}
}
12 changes: 6 additions & 6 deletions pkg/kds/cache/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@ import (
"fmt"

envoy_types "github.com/envoyproxy/go-control-plane/pkg/cache/types"
envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v2"
envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/pkg/errors"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/registry"
util_xds "github.com/kumahq/kuma/pkg/util/xds"
util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3"
)

type ResourceBuilder interface {
}

type SnapshotBuilder interface {
With(typ string, resources []envoy_types.Resource) SnapshotBuilder
Build(version string) util_xds.Snapshot
Build(version string) util_xds_v3.Snapshot
}

type builder struct {
Expand All @@ -37,7 +37,7 @@ func (b *builder) With(typ string, resources []envoy_types.Resource) SnapshotBui
return b
}

func (b *builder) Build(version string) util_xds.Snapshot {
func (b *builder) Build(version string) util_xds_v3.Snapshot {
snapshot := &Snapshot{Resources: map[string]envoy_cache.Resources{}}
for _, typ := range snapshot.GetSupportedTypes() {
snapshot.Resources[typ] = envoy_cache.NewResources(version, nil)
Expand All @@ -57,7 +57,7 @@ type Snapshot struct {
Resources map[string]envoy_cache.Resources
}

var _ util_xds.Snapshot = &Snapshot{}
var _ util_xds_v3.Snapshot = &Snapshot{}

func (s *Snapshot) GetSupportedTypes() (types []string) {
for _, def := range registry.Global().ObjectTypes(model.HasKdsEnabled()) {
Expand Down Expand Up @@ -110,7 +110,7 @@ func (s *Snapshot) GetVersion(typ string) string {
return ""
}

func (s *Snapshot) WithVersion(typ string, version string) util_xds.Snapshot {
func (s *Snapshot) WithVersion(typ string, version string) util_xds_v3.Snapshot {
if s == nil {
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kds/client/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package client_test
import (
"time"

envoy_api_v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

Expand Down Expand Up @@ -37,18 +37,18 @@ var _ = Describe("KDS Sink", func() {

It("", func() {
jakubdyszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
vrf := kds_verifier.New().
Exec(kds_verifier.WaitRequest(defaultTimeout, func(req *envoy_api_v2.DiscoveryRequest) {
Exec(kds_verifier.WaitRequest(defaultTimeout, func(req *envoy_sd.DiscoveryRequest) {
Expect(req.TypeUrl).To(Equal(string(mesh.MeshType)))
})).
Exec(kds_verifier.WaitRequest(defaultTimeout, func(req *envoy_api_v2.DiscoveryRequest) {
Exec(kds_verifier.WaitRequest(defaultTimeout, func(req *envoy_sd.DiscoveryRequest) {
Expect(req.TypeUrl).To(Equal(string(mesh.DataplaneType)))
})).
Exec(kds_verifier.DiscoveryResponse(
&mesh.MeshResourceList{Items: []*mesh.MeshResource{
{Meta: &test_model.ResourceMeta{Name: "mesh1"}, Spec: samples.Mesh1},
{Meta: &test_model.ResourceMeta{Name: "mesh2"}, Spec: samples.Mesh2},
}}, "1", "1")).
Exec(kds_verifier.WaitRequest(defaultTimeout, func(rs *envoy_api_v2.DiscoveryRequest) {
Exec(kds_verifier.WaitRequest(defaultTimeout, func(rs *envoy_sd.DiscoveryRequest) {
Expect(rs.VersionInfo).To(Equal("1"))
Expect(rs.ResponseNonce).To(Equal("1"))
Expect(rs.TypeUrl).To(Equal(string(mesh.MeshType)))
Expand Down
18 changes: 9 additions & 9 deletions pkg/kds/client/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package client
import (
"fmt"

envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoy_core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/protobuf/types/known/structpb"

Expand All @@ -28,16 +28,16 @@ var _ KDSStream = &stream{}

type stream struct {
streamClient mesh_proto.KumaDiscoveryService_StreamKumaResourcesClient
latestACKed map[string]*envoy.DiscoveryResponse
latestReceived map[string]*envoy.DiscoveryResponse
latestACKed map[string]*envoy_sd.DiscoveryResponse
latestReceived map[string]*envoy_sd.DiscoveryResponse
clientId string
}

func NewKDSStream(s mesh_proto.KumaDiscoveryService_StreamKumaResourcesClient, clientId string) KDSStream {
return &stream{
streamClient: s,
latestACKed: make(map[string]*envoy.DiscoveryResponse),
latestReceived: make(map[string]*envoy.DiscoveryResponse),
latestACKed: make(map[string]*envoy_sd.DiscoveryResponse),
latestReceived: make(map[string]*envoy_sd.DiscoveryResponse),
clientId: clientId,
}
}
Expand All @@ -54,7 +54,7 @@ func (s *stream) DiscoveryRequest(resourceType model.ResourceType) error {
if err != nil {
return err
}
return s.streamClient.Send(&envoy.DiscoveryRequest{
return s.streamClient.Send(&envoy_sd.DiscoveryRequest{
VersionInfo: "",
ResponseNonce: "",
Node: &envoy_core.Node{
Expand Down Expand Up @@ -88,7 +88,7 @@ func (s *stream) ACK(typ string) error {
if latestReceived == nil {
return nil
}
err := s.streamClient.Send(&envoy.DiscoveryRequest{
err := s.streamClient.Send(&envoy_sd.DiscoveryRequest{
VersionInfo: latestReceived.VersionInfo,
ResponseNonce: latestReceived.Nonce,
ResourceNames: []string{},
Expand All @@ -109,7 +109,7 @@ func (s *stream) NACK(typ string, err error) error {
return nil
}
latestACKed := s.latestACKed[typ]
return s.streamClient.Send(&envoy.DiscoveryRequest{
return s.streamClient.Send(&envoy_sd.DiscoveryRequest{
VersionInfo: latestACKed.GetVersionInfo(),
ResponseNonce: latestReceived.Nonce,
ResourceNames: []string{},
Expand Down
7 changes: 5 additions & 2 deletions pkg/kds/mux/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,11 @@ func (c *client) Start(stop <-chan struct{}) (errs error) {
}()
muxClient := mesh_proto.NewMultiplexServiceClient(conn)

withClientIDCtx := metadata.AppendToOutgoingContext(c.ctx, "client-id", c.clientID)
stream, err := muxClient.StreamMessage(withClientIDCtx)
withKDSCtx := metadata.AppendToOutgoingContext(c.ctx,
"client-id", c.clientID,
KDSVersionHeaderKey, KDSVersionV3,
)
stream, err := muxClient.StreamMessage(withKDSCtx)
if err != nil {
return err
}
Expand Down
24 changes: 18 additions & 6 deletions pkg/kds/mux/clientstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,38 @@ import (
"context"
"io"

envoy_api_v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"google.golang.org/grpc/metadata"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
)

type kdsClientStream struct {
MultiplexStream
responses chan *envoy_api_v2.DiscoveryResponse
responses chan *envoy_sd.DiscoveryResponse
}

func (k *kdsClientStream) put(response *envoy_api_v2.DiscoveryResponse) {
func (k *kdsClientStream) put(response *envoy_sd.DiscoveryResponse) {
k.responses <- response
}

func (k *kdsClientStream) Send(request *envoy_api_v2.DiscoveryRequest) error {
return k.MultiplexStream.Send(&mesh_proto.Message{Value: &mesh_proto.Message_Request{Request: request}})
func (k *kdsClientStream) Send(request *envoy_sd.DiscoveryRequest) error {
var msg *mesh_proto.Message

kdsVersion := KDSVersion(k.Context())
switch kdsVersion {
case KDSVersionV2:
msg = &mesh_proto.Message{Value: &mesh_proto.Message_Request{Request: DiscoveryRequestV2(request)}}
case KDSVersionV3:
msg = &mesh_proto.Message{Value: &mesh_proto.Message_RequestV3{RequestV3: request}}
default:
return UnsupportedKDSVersion(kdsVersion)
}

return k.MultiplexStream.Send(msg)
}

func (k *kdsClientStream) Recv() (*envoy_api_v2.DiscoveryResponse, error) {
func (k *kdsClientStream) Recv() (*envoy_sd.DiscoveryResponse, error) {
if r, ok := <-k.responses; ok {
return r, nil
}
Expand Down
Loading