Skip to content

Commit

Permalink
Merge pull request #17 from denis-tingaikin/correctly-update-entry-in…
Browse files Browse the repository at this point in the history
…-etcd
  • Loading branch information
haiodo authored Jan 15, 2021
2 parents 74d1cc8 + 61b5027 commit db696f8
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 59 deletions.
66 changes: 35 additions & 31 deletions pkg/registry/etcd/ns_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,34 +41,35 @@ type etcdNSRegistryServer struct {
}

func (n *etcdNSRegistryServer) Register(ctx context.Context, request *registry.NetworkService) (*registry.NetworkService, error) {
resp, err := n.client.NetworkservicemeshV1().NetworkServices(n.ns).Create(
resp, err := next.NetworkServiceRegistryServer(ctx).Register(ctx, request)
if err != nil {
return nil, err
}
apiResp, err := n.client.NetworkservicemeshV1().NetworkServices(n.ns).Create(
ctx,
&v1.NetworkService{
ObjectMeta: metav1.ObjectMeta{
Name: request.Name,
},
Spec: *(*v1.NetworkServiceSpec)(request),
Spec: *(*v1.NetworkServiceSpec)(resp),
},
metav1.CreateOptions{},
)
if apierrors.IsAlreadyExists(err) {
resp, err = n.client.NetworkservicemeshV1().NetworkServices(n.ns).Update(
ctx,
&v1.NetworkService{
ObjectMeta: metav1.ObjectMeta{
Name: request.Name,
},
Spec: *(*v1.NetworkServiceSpec)(request),
},
metav1.UpdateOptions{},
)
var exist *v1.NetworkService
exist, err = n.client.NetworkservicemeshV1().NetworkServices(n.ns).Get(ctx, request.Name, metav1.GetOptions{})
if err == nil {
exist.Spec = *(*v1.NetworkServiceSpec)(request)
apiResp, err = n.client.NetworkservicemeshV1().NetworkServices(n.ns).Update(ctx, exist, metav1.UpdateOptions{})
}
}
if err != nil {
return nil, err
}
resp.Spec.DeepCopyInto((*v1.NetworkServiceSpec)(request))
request.Name = resp.Name
return next.NetworkServiceRegistryServer(ctx).Register(ctx, request)
apiResp.Spec.DeepCopyInto((*v1.NetworkServiceSpec)(request))
request.Name = apiResp.Name

return (*registry.NetworkService)(&apiResp.Spec), nil
}

func (n *etcdNSRegistryServer) watch(query *registry.NetworkServiceQuery, s registry.NetworkServiceRegistry_FindServer) error {
Expand Down Expand Up @@ -97,38 +98,41 @@ func (n *etcdNSRegistryServer) watch(query *registry.NetworkServiceQuery, s regi
}

func (n *etcdNSRegistryServer) Find(query *registry.NetworkServiceQuery, s registry.NetworkServiceRegistry_FindServer) error {
list, err := n.client.NetworkservicemeshV1().NetworkServices(n.ns).List(s.Context(), metav1.ListOptions{})
if err != nil {
return err
}
for i := 0; i < len(list.Items); i++ {
item := (*registry.NetworkService)(&list.Items[i].Spec)
if matchutils.MatchNetworkServices(query.NetworkService, item) {
err := s.Send(item)
if err != nil {
return err
}
}
}
if query.Watch {
if err := n.watch(query, s); err != nil && !errors.Is(err, io.EOF) {
return err
}
} else {
list, err := n.client.NetworkservicemeshV1().NetworkServices(n.ns).List(s.Context(), metav1.ListOptions{})
if err != nil {
return err
}
for i := 0; i < len(list.Items); i++ {
item := (*registry.NetworkService)(&list.Items[i].Spec)
if matchutils.MatchNetworkServices(query.NetworkService, item) {
err := s.Send(item)
if err != nil {
return err
}
}
}
}
return next.NetworkServiceRegistryServer(s.Context()).Find(query, s)
}

func (n *etcdNSRegistryServer) Unregister(ctx context.Context, request *registry.NetworkService) (*empty.Empty, error) {
err := n.client.NetworkservicemeshV1().NetworkServices(n.ns).Delete(
resp, err := next.NetworkServiceRegistryServer(ctx).Unregister(ctx, request)
if err != nil {
return nil, err
}
err = n.client.NetworkservicemeshV1().NetworkServices(n.ns).Delete(
ctx,
request.Name,
metav1.DeleteOptions{},
)
if err != nil {
return nil, err
}
return next.NetworkServiceRegistryServer(ctx).Unregister(ctx, request)
return resp, nil
}

// NewNetworkServiceRegistryServer creates new registry.NetworkServiceRegistryServer that is using etcd to store network services.
Expand Down
60 changes: 32 additions & 28 deletions pkg/registry/etcd/nse_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,70 +41,74 @@ type etcdNSERegistryServer struct {
}

func (n *etcdNSERegistryServer) Register(ctx context.Context, request *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) {
resp, err := next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, request)
if err != nil {
return nil, err
}
meta := metav1.ObjectMeta{}
if request.Name == "" {
meta.GenerateName = "nse-"
} else {
meta.Name = request.Name
}
resp, err := n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Create(
apiResp, err := n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Create(
ctx,
&v1.NetworkServiceEndpoint{
ObjectMeta: meta,
Spec: *(*v1.NetworkServiceEndpointSpec)(request),
Spec: *(*v1.NetworkServiceEndpointSpec)(resp),
},
metav1.CreateOptions{},
)
if apierrors.IsAlreadyExists(err) {
resp, err = n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Update(
ctx,
&v1.NetworkServiceEndpoint{
ObjectMeta: meta,
Spec: *(*v1.NetworkServiceEndpointSpec)(request),
},
metav1.UpdateOptions{},
)
var exist *v1.NetworkServiceEndpoint
exist, err = n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Get(ctx, request.Name, metav1.GetOptions{})
if err == nil {
exist.Spec = *(*v1.NetworkServiceEndpointSpec)(request)
apiResp, err = n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Update(ctx, exist, metav1.UpdateOptions{})
}
}
if err != nil {
return nil, err
}

resp.Spec.DeepCopyInto((*v1.NetworkServiceEndpointSpec)(request))
return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, request)
return (*registry.NetworkServiceEndpoint)(&apiResp.Spec), nil
}

func (n *etcdNSERegistryServer) Find(query *registry.NetworkServiceEndpointQuery, s registry.NetworkServiceEndpointRegistry_FindServer) error {
list, err := n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).List(s.Context(), metav1.ListOptions{})
if err != nil {
return err
}
for i := 0; i < len(list.Items); i++ {
item := (*registry.NetworkServiceEndpoint)(&list.Items[i].Spec)
if matchutils.MatchNetworkServiceEndpoints(query.NetworkServiceEndpoint, item) {
err := s.Send(item)
if err != nil {
return err
}
}
}
if query.Watch {
if err := n.watch(query, s); err != nil && !errors.Is(err, io.EOF) {
return err
}
} else {
list, err := n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).List(s.Context(), metav1.ListOptions{})
if err != nil {
return err
}
for i := 0; i < len(list.Items); i++ {
item := (*registry.NetworkServiceEndpoint)(&list.Items[i].Spec)
if matchutils.MatchNetworkServiceEndpoints(query.NetworkServiceEndpoint, item) {
err := s.Send(item)
if err != nil {
return err
}
}
}
}
return next.NetworkServiceEndpointRegistryServer(s.Context()).Find(query, s)
}

func (n *etcdNSERegistryServer) Unregister(ctx context.Context, request *registry.NetworkServiceEndpoint) (*empty.Empty, error) {
err := n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Delete(
resp, err := next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, request)
if err != nil {
return nil, err
}
err = n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Delete(
ctx,
request.Name,
metav1.DeleteOptions{})
if err != nil {
return nil, err
}
return next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, request)
return resp, nil
}

func (n *etcdNSERegistryServer) watch(query *registry.NetworkServiceEndpointQuery, s registry.NetworkServiceEndpointRegistry_FindServer) error {
Expand Down

0 comments on commit db696f8

Please sign in to comment.