Skip to content

Commit

Permalink
refactor: divide subscribeToConfigPod method to manage GRPC server co…
Browse files Browse the repository at this point in the history
…nnection separately inside the modules (#69)

* refactor: divide subscribeToConfigPod method to manage GRPC server connection separately through the modules

refactor:
- add ConnectToGrpcServer method to ConfClient interface
- change signatures of PublishOnConfigChange and subsribeToConfigPod methods

Signed-off-by: gatici <gulsum.atici@canonical.com>

* chore: remove ConfigWatcher method

ConfigWather method is not necessary anymore as all the NFs will call PublishOnConfigChange

Signed-off-by: gatici <gulsum.atici@canonical.com>

* chore: address review comments

Signed-off-by: gatici <gulsum.atici@canonical.com>

* fix: static analysis

Signed-off-by: gatici <gulsum.atici@canonical.com>

* chore: address review comments

Signed-off-by: gatici <gulsum.atici@canonical.com>

* chore: update version

Signed-off-by: gatici <gulsum.atici@canonical.com>

* fix: minor things

Signed-off-by: gatici <gulsum.atici@canonical.com>

---------

Signed-off-by: gatici <gulsum.atici@canonical.com>
  • Loading branch information
gatici authored Oct 17, 2024
1 parent e34b08f commit 4eee5b2
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 93 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.5.2-dev
1.5.2
168 changes: 76 additions & 92 deletions proto/client/gClient.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// SPDX-FileCopyrightText: 2021 Open Networking Foundation <info@opennetworking.org>
//
// SPDX-FileCopyrightText: 2024 Canonical Ltd.
// SPDX-License-Identifier: Apache-2.0

package client

import (
"context"
"fmt"
"math/rand"
"os"
"time"
Expand Down Expand Up @@ -50,52 +51,46 @@ type ConfigClient struct {
}

type ConfClient interface {
// channel is created on which subscription is done.
// PublishOnConfigChange creates a channel to perform the subscription using it.
// On Receiving Configuration from ConfigServer, this api publishes
// on created channel and returns the channel
PublishOnConfigChange(bool) chan *protos.NetworkSliceResponse
PublishOnConfigChange(metadataRequested bool, stream protos.ConfigService_NetworkSliceSubscribeClient) chan *protos.NetworkSliceResponse

// returns grpc connection object
GetConfigClientConn() *grpc.ClientConn
// getConfigClientConn returns grpc connection object
getConfigClientConn() *grpc.ClientConn

// Client Subscribing channel to ConfigPod to receive configuration
subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse)
subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient)

// CheckGrpcConnectivity checks the connectivity status and
// subscribes to a stream of NetworkSlice if connectivity is ready
CheckGrpcConnectivity() (stream protos.ConfigService_NetworkSliceSubscribeClient, err error)
}

// This API is added to control metadata from NF Clients
func ConnectToConfigServer(host string) ConfClient {
confClient := CreateChannel(host, 10000)
// ConnectToConfigServer this API is added to control metadata from NF clients
// Connects to the ConfigServer using host address
func ConnectToConfigServer(host string) (ConfClient, error) {
confClient := CreateConfClient(host)
if confClient == nil {
logger.GrpcLog.Errorln("create grpc channel to config pod failed")
return nil
return nil, fmt.Errorf("create grpc channel to config pod failed")
}
return confClient
return confClient, nil
}

func (confClient *ConfigClient) PublishOnConfigChange(mdataFlag bool) chan *protos.NetworkSliceResponse {
confClient.MetadataRequested = mdataFlag
// PublishOnConfigChange creates a communication channel to publish the messages from ConfigServer to the channel
// then NFs gets the messages
func (confClient *ConfigClient) PublishOnConfigChange(metadataFlag bool, stream protos.ConfigService_NetworkSliceSubscribeClient) chan *protos.NetworkSliceResponse {
confClient.MetadataRequested = metadataFlag
commChan := make(chan *protos.NetworkSliceResponse)
confClient.Channel = commChan
go confClient.subscribeToConfigPod(commChan)
return commChan
}

// pass structr which has configChangeUpdate interface
func ConfigWatcher(webuiUri string) chan *protos.NetworkSliceResponse {
// var confClient *gClient.ConfigClient
// TODO: use port from configmap.
confClient := CreateChannel(webuiUri, 10000)
if confClient == nil {
logger.GrpcLog.Errorf("create grpc channel to config pod failed")
return nil
}
commChan := make(chan *protos.NetworkSliceResponse)
go confClient.subscribeToConfigPod(commChan)
logger.GrpcLog.Debugln("a communication channel is created for ConfigServer")
go confClient.subscribeToConfigPod(commChan, stream)
return commChan
}

func CreateChannel(host string, timeout uint32) ConfClient {
logger.GrpcLog.Infoln("create config client")
// CreateConfClient creates a GRPC client by connecting to GRPC server (host).
func CreateConfClient(host string) ConfClient {
logger.GrpcLog.Debugln("create config client")
// Second, check to see if we can reuse the gRPC connection for a new P4RT client
conn, err := newClientConnection(host)
if err != nil {
Expand Down Expand Up @@ -130,9 +125,9 @@ var retryPolicy = `{
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}}]}`

// newClientConnection opens a GRPC connection to the host
func newClientConnection(host string) (conn *grpc.ClientConn, err error) {
/* get connection */
logger.GrpcLog.Infoln("dial grpc connection:", host)
logger.GrpcLog.Debugln("dial grpc connection:", host)

bd := 1 * time.Second
mltpr := 1.0
Expand All @@ -144,76 +139,65 @@ func newClientConnection(host string) (conn *grpc.ClientConn, err error) {
dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithKeepaliveParams(kacp), grpc.WithDefaultServiceConfig(retryPolicy), grpc.WithConnectParams(crt)}
conn, err = grpc.NewClient(host, dialOptions...)
if err != nil {
logger.GrpcLog.Errorln("grpc newclient err:", err)
return nil, err
return nil, fmt.Errorf("grpc newclient creation failed: %v", err)
}
conn.Connect()
// defer conn.Close()
return conn, err
return conn, nil
}

func (confClient *ConfigClient) GetConfigClientConn() *grpc.ClientConn {
// getConfigClientConn exposes the GRPC client connection
func (confClient *ConfigClient) getConfigClientConn() *grpc.ClientConn {
return confClient.Conn
}

func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse) {
logger.GrpcLog.Infoln("subscribeToConfigPod")
// CheckGrpcConnectivity checks the connectivity status and subscribes to a stream of NetworkSlice
// if connectivity is Ready. It returns a stream if connection is successful else returns nil.
func (confClient *ConfigClient) CheckGrpcConnectivity() (stream protos.ConfigService_NetworkSliceSubscribeClient, err error) {
logger.GrpcLog.Debugln("connectToGrpcServer")
myid := os.Getenv("HOSTNAME")
var stream protos.ConfigService_NetworkSliceSubscribeClient
for {
if stream == nil {
status := confClient.Conn.GetState()
var err error
if status == connectivity.Ready {
logger.GrpcLog.Infoln("connectivity ready")
rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: myid, MetadataRequested: confClient.MetadataRequested}
if stream, err = confClient.Client.NetworkSliceSubscribe(context.Background(), rreq); err != nil {
logger.GrpcLog.Errorf("failed to subscribe: %v", err)
time.Sleep(time.Second * 5)
// Retry on failure
continue
}
} else if status == connectivity.Idle {
logger.GrpcLog.Errorln("connectivity status idle, trying to connect again")
time.Sleep(time.Second * 5)
continue
} else {
logger.GrpcLog.Errorln("connectivity status not ready")
time.Sleep(time.Second * 5)
continue
}
}
rsp, err := stream.Recv()
if err != nil {
logger.GrpcLog.Errorf("failed to receive message: %v", err)
// Clearing the stream will force the client to resubscribe on next iteration
stream = nil
time.Sleep(time.Second * 5)
// Retry on failure
continue
status := confClient.Conn.GetState()
if status == connectivity.Ready {
logger.GrpcLog.Debugln("connectivity ready")
rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: myid, MetadataRequested: confClient.MetadataRequested}
if stream, err = confClient.Client.NetworkSliceSubscribe(context.Background(), rreq); err != nil {
return stream, fmt.Errorf("failed to subscribe: %v", err)
}
return stream, nil
} else if status == connectivity.Idle {
return nil, fmt.Errorf("connectivity status idle")
} else {
return nil, fmt.Errorf("connectivity status not ready")
}
}

logger.GrpcLog.Infoln("stream msg received")
logger.GrpcLog.Debugf("network slices %d, RC of configpod %d", len(rsp.NetworkSlice), rsp.RestartCounter)
if configPodRestartCounter == 0 || (configPodRestartCounter == rsp.RestartCounter) {
// first time connection or config update
configPodRestartCounter = rsp.RestartCounter
if len(rsp.NetworkSlice) > 0 {
// always carries full config copy
logger.GrpcLog.Infoln("first time config received", rsp)
commChan <- rsp
} else if rsp.ConfigUpdated == 1 {
// config delete , all slices deleted
logger.GrpcLog.Infoln("complete config deleted")
commChan <- rsp
}
} else if len(rsp.NetworkSlice) > 0 {
logger.GrpcLog.Errorln("config received after config pod restart")
// config received after config pod restart
configPodRestartCounter = rsp.RestartCounter
// subscribeToConfigPod subscribing channel to ConfigPod to receive configuration
// using stream and communication channel as inputs
func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient) {
rsp, err := stream.Recv()
if err != nil {
logger.GrpcLog.Errorf("failed to receive message from stream: %v", err)
return
}

logger.GrpcLog.Infoln("stream message received")
logger.GrpcLog.Debugf("network slices %d, RC of config pod %d", len(rsp.NetworkSlice), rsp.RestartCounter)
if configPodRestartCounter == 0 || (configPodRestartCounter == rsp.RestartCounter) {
// first time connection or config update
configPodRestartCounter = rsp.RestartCounter
if len(rsp.NetworkSlice) > 0 {
// always carries full config copy
logger.GrpcLog.Infoln("first time config received", rsp)
commChan <- rsp
} else if rsp.ConfigUpdated == 1 {
// config delete, all slices deleted
logger.GrpcLog.Infoln("complete config deleted")
commChan <- rsp
} else {
logger.GrpcLog.Errorln("config pod is restarted and no config received")
}
} else if len(rsp.NetworkSlice) > 0 {
logger.GrpcLog.Errorln("config received after config pod restart")
configPodRestartCounter = rsp.RestartCounter
commChan <- rsp
} else {
logger.GrpcLog.Errorln("config pod is restarted and no config received")
}
}

0 comments on commit 4eee5b2

Please sign in to comment.