diff --git a/sonic-ycabled/tests/test_y_cable_helper.py b/sonic-ycabled/tests/test_y_cable_helper.py index 3f0d78ce2b0c..528acbffbe38 100644 --- a/sonic-ycabled/tests/test_y_cable_helper.py +++ b/sonic-ycabled/tests/test_y_cable_helper.py @@ -4962,10 +4962,18 @@ def test_setup_grpc_channel_for_port(self): with patch('ycable.ycable_utilities.y_cable_helper.y_cable_platform_sfputil') as patched_util: patched_util.get_asic_id_for_logical_port.return_value = 0 - rc = setup_grpc_channel_for_port("Ethernet0", "192.168.0.1") + (channel, stub) = setup_grpc_channel_for_port("Ethernet0", "192.168.0.1") - assert(rc == (None, None)) + assert(stub == True) + assert(channel != None) + def test_connect_channel(self): + + with patch('grpc.channel_ready_future') as patched_util: + + patched_util.result.return_value = 0 + rc = connect_channel(patched_util, None, None) + assert(rc == None) def test_setup_grpc_channels(self): diff --git a/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py b/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py index 9cb099dfd832..ffe82915ee23 100644 --- a/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py +++ b/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py @@ -332,7 +332,6 @@ def wrapper(*args, **kwargs): return wrapper - def retry_setup_grpc_channel_for_port(port, asic_index): global grpc_port_stubs @@ -409,34 +408,61 @@ def get_grpc_credentials(type, kvp): return credential -def create_channel(type,level, kvp, soc_ip): +def connect_channel(channel, stub, port): + channel_ready = grpc.channel_ready_future(channel) retries = 3 + for _ in range(retries): + try: + channel_ready.result(timeout=2) + except grpc.FutureTimeoutError: + helper_logger.log_warning("gRPC port {} state changed to SHUTDOWN".format(port)) + else: + break - if type == "secure": - credential = get_grpc_credentials(level, kvp) - target_name = kvp.get("grpc_ssl_credential", None) - if credential is None or target_name is None: - return (None, None) +def create_channel(type, level, kvp, soc_ip, port): - GRPC_CLIENT_OPTIONS.append(('grpc.ssl_target_name_override', '{}'.format(target_name))) - channel = grpc.secure_channel("{}:{}".format(soc_ip, GRPC_PORT), credential, options=GRPC_CLIENT_OPTIONS) - else: - channel = grpc.insecure_channel("{}:{}".format(soc_ip, GRPC_PORT), options=GRPC_CLIENT_OPTIONS) + #Helper callback to get an channel connectivity state + def wait_for_state_change(channel_connectivity): + if channel_connectivity == grpc.ChannelConnectivity.TRANSIENT_FAILURE: + helper_logger.log_notice("gRPC port {} state changed to TRANSIENT_FAILURE".format(port)) + if channel_connectivity == grpc.ChannelConnectivity.CONNECTING: + helper_logger.log_notice("gRPC port {} state changed to CONNECTING".format(port)) + if channel_connectivity == grpc.ChannelConnectivity.READY: + helper_logger.log_notice("gRPC port {} state changed to READY".format(port)) + if channel_connectivity == grpc.ChannelConnectivity.IDLE: + helper_logger.log_notice("gRPC port {} state changed to IDLE".format(port)) + if channel_connectivity == grpc.ChannelConnectivity.SHUTDOWN: + helper_logger.log_notice("gRPC port {} state changed to SHUTDOWN".format(port)) - stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel) - channel_ready = grpc.channel_ready_future(channel) + if type == "secure": + credential = get_grpc_credentials(level, kvp) + target_name = kvp.get("grpc_ssl_credential", None) + if credential is None or target_name is None: + return (None, None) - try: - channel_ready.result(timeout=2) - except grpc.FutureTimeoutError: - channel = None - stub = None - else: - break + GRPC_CLIENT_OPTIONS.append(('grpc.ssl_target_name_override', '{}'.format(target_name))) + + channel = grpc.secure_channel("{}:{}".format(soc_ip, GRPC_PORT), credential, options=GRPC_CLIENT_OPTIONS) + else: + channel = grpc.insecure_channel("{}:{}".format(soc_ip, GRPC_PORT), options=GRPC_CLIENT_OPTIONS) + + + stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel) + + + if channel is not None: + channel.subscribe(wait_for_state_change) + + #connect_channel(channel, stub, port) + """ + Comment the connect channel call for now, since it is not required for normal gRPC I/O + and all use cases work without it. + TODO: check if this subroutine call can be ommitted for all use cases in future enhancements + """ return channel, stub @@ -490,12 +516,12 @@ def setup_grpc_channel_for_port(port, soc_ip): kvp = dict(fvs) - channel, stub = create_channel(type, level, kvp, soc_ip) + channel, stub = create_channel(type, level, kvp, soc_ip, port) if stub is None: helper_logger.log_warning("stub was not setup for gRPC soc ip {} port {}, no gRPC soc server running ?".format(soc_ip, port)) if channel is None: - helper_logger.log_warning("channel was not setup for gRPC soc ip {} port {}, no gRPC server running ?".format(soc_ip, port)) + helper_logger.log_warning("channel was not setup for gRPC soc ip {} port {}, no gRPC soc server running ?".format(soc_ip, port)) return channel, stub