From ea53c0b480ee5328f5ff5c0eb4ccb05e9d7397b3 Mon Sep 17 00:00:00 2001 From: JRoger Date: Sun, 14 Apr 2024 16:06:34 +0800 Subject: [PATCH] feat: add DuplexSteaming support. --- README.md | 35 ++++-- sample/ClientSample/Program.cs | 27 ++++- sample/InterfaceLib/IUserService.cs | 5 +- sample/ServerSample/UserService.cs | 53 ++++++--- src/Directory.Build.props | 4 +- src/SimpleRpc.Client/DefaultRpcChannel.cs | 11 ++ src/SimpleRpc.Client/IRpcChannel.cs | 4 + .../Internal/GrpcClientBase.cs | 9 ++ .../Internal/GrpcClientTypeBuilder.cs | 44 +++++++- src/SimpleRpc.Server/IRpcHostBuilder.cs | 19 ++++ .../Internal/GrpcHostBuilder.cs | 106 ++++++++++++------ .../Internal/GrpcHostBuilderExtensions.cs | 43 +++++-- .../Internal/MethodHandlerGenerator.cs | 14 +++ src/SimpleRpc.Server/RpcHostedService.cs | 10 +- 14 files changed, 300 insertions(+), 84 deletions(-) diff --git a/README.md b/README.md index d312cb1..5f521a2 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ A light-weight RPC wrap of google gRPC framework. ## Getting Started ```csharp -// gRPC client side +// gRPC Client side var configuration = new ConfigurationBuilder() .AddJsonFile("appsettings.json", false, true) .Build(); @@ -25,7 +25,7 @@ var provider = new ServiceCollection() #### Unary ```csharp -// gRPC client side +// gRPC Client side var tokenSource = new CancellationTokenSource(1000 * 60 * 2); var userService = provider.GetService(); var userDto = await userService.GetUserBy(userRequest, tokenSource.Token); @@ -47,8 +47,8 @@ Console.WriteLine($"Id: {userDto.Id}, Name: {userDto.Name}, CreateDate: {userDto ``` // gRPC Server side -- see `IUserService.cs` file line: 17-18 -- see `UserService.cs` file line: 35-50 +- see `IUserService.cs` line: 19-20 +- see `UserService.cs` line: 36-50 #### ServerStreaming ```csharp @@ -66,11 +66,30 @@ Console.WriteLine($"ServerStreaming: Id: {userDto.Id}, Name: {userDto.Name}, Cre ``` // gRPC Server side -- see `IUserService.cs` file line: 20-21 -- see `UserService.cs` file line: 52-78 +- see `IUserService.cs` line: 22-23 +- see `UserService.cs` line: 52-76 #### DuplexStreaming -> not supported +```csharp +var tokenSource = new CancellationTokenSource(1000 * 60 * 2); +var rpcChannel = provider.GetService(); +var call = rpcChannel.AsyncDuplexStreamingCall("greet.Greeter", "TestDuplexStreaming", tokenSource.Token); +await call.ResponseStream.MoveNext(tokenSource.Token).ConfigureAwait(false); +var userDto = call.ResponseStream.Current; +Console.WriteLine($"DuplexStreaming: Id: {userDto.Id}, Name: {userDto.Name}, CreateDate: {userDto.CreateDate:yyyy-MM-dd HH:mm:ss fff}"); + +await call.ResponseStream.MoveNext(tokenSource.Token).ConfigureAwait(false); +userDto = call.ResponseStream.Current; +Console.WriteLine($"DuplexStreaming: Id: {userDto.Id}, Name: {userDto.Name}, CreateDate: {userDto.CreateDate:yyyy-MM-dd HH:mm:ss fff}"); + +await call.RequestStream.WriteAsync(new UserRequest { Id = 1, Keyword = $"client[DuplexStreaming]1 - {input}" }); +await call.RequestStream.WriteAsync(new UserRequest { Id = 2, Keyword = $"client[DuplexStreaming]2 - {input}" }); +await call.RequestStream.CompleteAsync(); +``` + +// gRPC Server side +- see `IUserService.cs` line: 25-26 +- see `UserService.cs` line: 78-107 ## Roadmap @@ -80,4 +99,4 @@ Console.WriteLine($"ServerStreaming: Id: {userDto.Id}, Name: {userDto.Name}, Cre - [x] ServerStreaming supported. -- [ ] DuplexStreaming supported. +- [x] DuplexStreaming supported. diff --git a/sample/ClientSample/Program.cs b/sample/ClientSample/Program.cs index cc7b721..78653b7 100644 --- a/sample/ClientSample/Program.cs +++ b/sample/ClientSample/Program.cs @@ -26,16 +26,16 @@ async static Task Main(string[] args) Id = 1 }; - while (true) + while ( true ) { Console.Write("Please input keyword:"); var input = Console.ReadLine(); - if (input.Equals("Q", StringComparison.OrdinalIgnoreCase)) + if ( input.Equals("Q", StringComparison.OrdinalIgnoreCase) ) { break; } - if (input.StartsWith("cs:", StringComparison.OrdinalIgnoreCase)) + if ( input.StartsWith("cs:", StringComparison.OrdinalIgnoreCase) ) { userRequest.Keyword = input; var tokenSource = new CancellationTokenSource(1000 * 60 * 2); @@ -48,7 +48,7 @@ async static Task Main(string[] args) Console.WriteLine($"ClientStreaming: Id: {userDto.Id}, Name: {userDto.Name}, CreateDate: {userDto.CreateDate:yyyy-MM-dd HH:mm:ss fff}"); } - else if (input.StartsWith(value: "ss:", StringComparison.OrdinalIgnoreCase)) + else if ( input.StartsWith(value: "ss:", StringComparison.OrdinalIgnoreCase) ) { var tokenSource = new CancellationTokenSource(1000 * 60 * 2); var rpcChannel = provider.GetService(); @@ -61,12 +61,29 @@ async static Task Main(string[] args) userDto = call.ResponseStream.Current; Console.WriteLine($"ServerStreaming: Id: {userDto.Id}, Name: {userDto.Name}, CreateDate: {userDto.CreateDate:yyyy-MM-dd HH:mm:ss fff}"); } + else if ( input.StartsWith(value: "ds:", StringComparison.OrdinalIgnoreCase) ) + { + var tokenSource = new CancellationTokenSource(1000 * 60 * 2); + var rpcChannel = provider.GetService(); + var call = rpcChannel.AsyncDuplexStreamingCall("greet.Greeter", "TestDuplexStreaming", tokenSource.Token); + await call.ResponseStream.MoveNext(tokenSource.Token).ConfigureAwait(false); + var userDto = call.ResponseStream.Current; + Console.WriteLine($"DuplexStreaming: Id: {userDto.Id}, Name: {userDto.Name}, CreateDate: {userDto.CreateDate:yyyy-MM-dd HH:mm:ss fff}"); + + await call.ResponseStream.MoveNext(tokenSource.Token).ConfigureAwait(false); + userDto = call.ResponseStream.Current; + Console.WriteLine($"DuplexStreaming: Id: {userDto.Id}, Name: {userDto.Name}, CreateDate: {userDto.CreateDate:yyyy-MM-dd HH:mm:ss fff}"); + + await call.RequestStream.WriteAsync(new UserRequest { Id = 1, Keyword = $"client[DuplexStreaming]1 - {input}" }); + await call.RequestStream.WriteAsync(new UserRequest { Id = 2, Keyword = $"client[DuplexStreaming]2 - {input}" }); + await call.RequestStream.CompleteAsync(); + } else { userRequest.Keyword = input; var tokenSource = new CancellationTokenSource(1000 * 60 * 2); var userService = provider.GetService(); - var userDto = await userService.GetUserBy(userRequest, tokenSource.Token); + var userDto = await userService.TestUnary(userRequest, tokenSource.Token); Console.WriteLine($"Unary: Id: {userDto.Id}, Name: {userDto.Name}, CreateDate: {userDto.CreateDate:yyyy-MM-dd HH:mm:ss fff}"); } diff --git a/sample/InterfaceLib/IUserService.cs b/sample/InterfaceLib/IUserService.cs index 79f3e35..66e45ee 100644 --- a/sample/InterfaceLib/IUserService.cs +++ b/sample/InterfaceLib/IUserService.cs @@ -14,13 +14,16 @@ namespace InterfaceLib public interface IUserService : IRpcService { [RpcMethod] - Task GetUserBy(UserRequest request, CancellationToken token = default); + Task TestUnary(UserRequest request, CancellationToken token = default); [RpcMethod(MethodType = MethodType.ClientStreaming, RequestDataType = typeof(UserDto))] Task TestClientStreaming(CancellationToken token = default); [RpcMethod(MethodType = MethodType.ServerStreaming, RequestDataType = typeof(UserRequest), ResponseDataType = typeof(UserDto))] Task TestServerStreaming(UserRequest request, CancellationToken token = default); + + [RpcMethod(MethodType = MethodType.DuplexStreaming, RequestDataType = typeof(UserRequest), ResponseDataType = typeof(UserDto))] + Task TestDuplexStreaming(CancellationToken token = default); } [ProtoContract] diff --git a/sample/ServerSample/UserService.cs b/sample/ServerSample/UserService.cs index 74ba438..e56bcd4 100644 --- a/sample/ServerSample/UserService.cs +++ b/sample/ServerSample/UserService.cs @@ -22,12 +22,11 @@ public UserService(ILoggerFactory loggerFactory) this._logger = loggerFactory.CreateLogger(); } - public Task GetUserBy(UserRequest request, CancellationToken token = default) + public Task TestUnary(UserRequest request, CancellationToken token = default) { - this._logger.LogInformation($"Receive client message:{JsonSerializer.Serialize(request, _options)}"); + this._logger.LogInformation($"Receive client Unary message:{JsonSerializer.Serialize(request, _options)}"); - return Task.FromResult(new UserDto - { + return Task.FromResult(new UserDto { Id = (int)DateTime.Now.Ticks / 10000, Name = Guid.NewGuid().ToString("D") + request.Keyword, CreateDate = DateTime.Now @@ -38,13 +37,12 @@ public async Task TestClientStreaming(CancellationToken token = default { var requestStream = this.GetAsyncStreamReader(); - while (await requestStream.MoveNext(token).ConfigureAwait(false)) + while ( await requestStream.MoveNext(token).ConfigureAwait(false) ) { - this._logger.LogInformation($"Receive client client stream message:{JsonSerializer.Serialize(requestStream.Current)}"); + this._logger.LogInformation($"Receive client ClientStreaming message:{JsonSerializer.Serialize(requestStream.Current)}"); } - return new UserDto - { + return new UserDto { Id = (int)DateTime.Now.Ticks / 10000, Name = Guid.NewGuid().ToString("D"), CreateDate = DateTime.Now @@ -53,14 +51,13 @@ public async Task TestClientStreaming(CancellationToken token = default public async Task TestServerStreaming(UserRequest request, CancellationToken token = default) { - this._logger.LogInformation($"Receive client server streaming message:{JsonSerializer.Serialize(request)}"); + this._logger.LogInformation($"Receive client ServerStreaming message:{JsonSerializer.Serialize(request)}"); var responseStream = this.GetServerStreamWriter(); // NOTE: do not use method signature: Task WriteAsync(T message, CancellationToken cancellationToken) await responseStream.WriteAsync( - new UserDto - { + new UserDto { Id = (int)DateTime.Now.Ticks / 10000, Name = Guid.NewGuid().ToString("D"), CreateDate = DateTime.Now @@ -70,12 +67,42 @@ await responseStream.WriteAsync( await Task.Delay(1000 * 2, token); await responseStream.WriteAsync( - new UserDto - { + new UserDto { Id = (int)DateTime.Now.Ticks / 10000, Name = Guid.NewGuid().ToString("D"), CreateDate = DateTime.Now } ); } + + public async Task TestDuplexStreaming(CancellationToken token = default) + { + var responseStream = this.GetServerStreamWriter(); + + // NOTE: do not use method signature: Task WriteAsync(T message, CancellationToken cancellationToken) + await responseStream.WriteAsync( + new UserDto { + Id = (int)DateTime.Now.Ticks / 10000, + Name = $"From Server --> DuplexStreaming: {Guid.NewGuid():D}", + CreateDate = DateTime.Now + } + ); + + await Task.Delay(1000 * 2, token); + + await responseStream.WriteAsync( + new UserDto { + Id = (int)DateTime.Now.Ticks / 10000, + Name = $"From Server --> DuplexStreaming: {Guid.NewGuid():D}", + CreateDate = DateTime.Now + } + ); + + var requestStream = this.GetAsyncStreamReader(); + + while ( await requestStream.MoveNext(token).ConfigureAwait(false) ) + { + this._logger.LogInformation($"Receive client DuplexStreaming message:{JsonSerializer.Serialize(requestStream.Current)}"); + } + } } diff --git a/src/Directory.Build.props b/src/Directory.Build.props index 7a10b63..ad0ddf4 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -5,7 +5,7 @@ https://github.com/netcore-jroger/SimpleRpc https://github.com/netcore-jroger/SimpleRpc gRPC - 0.6.1 + 0.7.1 JRoger © 2019-2024 - \ No newline at end of file + diff --git a/src/SimpleRpc.Client/DefaultRpcChannel.cs b/src/SimpleRpc.Client/DefaultRpcChannel.cs index a25f49c..e98a042 100644 --- a/src/SimpleRpc.Client/DefaultRpcChannel.cs +++ b/src/SimpleRpc.Client/DefaultRpcChannel.cs @@ -63,6 +63,17 @@ public AsyncServerStreamingCall AsyncServerStreamingCall AsyncDuplexStreamingCall(string serviceName, string methodName, CancellationToken token) + where TRequest : class + where TResponse : class + { + var callOptions = new CallOptions(cancellationToken: token).WithWaitForReady(); + var methodDefinition = this.GetMethodDefinition(MethodType.DuplexStreaming, serviceName, methodName); + var result = this._invoker.AsyncDuplexStreamingCall(methodDefinition, this._host, callOptions); + + return result; + } + private Method GetMethodDefinition(MethodType methodType, string serviceName, string methodName) where TRequest : class where TResponse : class diff --git a/src/SimpleRpc.Client/IRpcChannel.cs b/src/SimpleRpc.Client/IRpcChannel.cs index 6969ffe..928daae 100644 --- a/src/SimpleRpc.Client/IRpcChannel.cs +++ b/src/SimpleRpc.Client/IRpcChannel.cs @@ -19,4 +19,8 @@ AsyncClientStreamingCall AsyncClientStreamingCall AsyncServerStreamingCall(string serviceName, string methodName, TRequest request, CancellationToken token) where TRequest : class where TResponse : class; + + AsyncDuplexStreamingCall AsyncDuplexStreamingCall(string serviceName, string methodName, CancellationToken token) + where TRequest : class + where TResponse : class; } diff --git a/src/SimpleRpc.Client/Internal/GrpcClientBase.cs b/src/SimpleRpc.Client/Internal/GrpcClientBase.cs index a672888..c82bf9c 100644 --- a/src/SimpleRpc.Client/Internal/GrpcClientBase.cs +++ b/src/SimpleRpc.Client/Internal/GrpcClientBase.cs @@ -43,4 +43,13 @@ protected AsyncServerStreamingCall AsyncServerStreamingCall AsyncDuplexStreamingCall(string serviceName, string methodName, CancellationToken token) + where TRequest : class + where TResponse : class + { + var call = this._rpcChannel.AsyncDuplexStreamingCall(serviceName, methodName, token); + + return call; + } } diff --git a/src/SimpleRpc.Client/Internal/GrpcClientTypeBuilder.cs b/src/SimpleRpc.Client/Internal/GrpcClientTypeBuilder.cs index ea2d1a7..4242dbc 100644 --- a/src/SimpleRpc.Client/Internal/GrpcClientTypeBuilder.cs +++ b/src/SimpleRpc.Client/Internal/GrpcClientTypeBuilder.cs @@ -14,9 +14,11 @@ internal class GrpcClientTypeBuilder { private static readonly Type _clientBaseType = typeof(GrpcClientBase); private static readonly ConstructorInfo _ctorToCall = _clientBaseType.GetConstructor(BindingFlags.NonPublic | BindingFlags.Instance, null, new[] { typeof(IRpcChannel) }, null); + private static readonly MethodInfo _unaryMethodToCall = _clientBaseType.GetMethod(nameof(IRpcChannel.CallUnaryMethodAsync), BindingFlags.Instance | BindingFlags.NonPublic); private static readonly MethodInfo _clientStreamingMethodToCall = _clientBaseType.GetMethod(nameof(IRpcChannel.AsyncClientStreamingCall), BindingFlags.Instance | BindingFlags.NonPublic); private static readonly MethodInfo _serverStreamingMethodToCall = _clientBaseType.GetMethod(nameof(IRpcChannel.AsyncServerStreamingCall), BindingFlags.Instance | BindingFlags.NonPublic); + private static readonly MethodInfo _duplexStreamingMethodToCall = _clientBaseType.GetMethod(nameof(IRpcChannel.AsyncDuplexStreamingCall), BindingFlags.Instance | BindingFlags.NonPublic); public TypeInfo Create() where TService : class, IRpcService { @@ -52,10 +54,10 @@ private static void AddConstructor(TypeBuilder typeBuilder) private static void AddMethods(TypeBuilder typeBuilder, Type serviceType) { var serviceDescription = new RpcServiceDescription(serviceType); - - foreach (var methodDescription in serviceDescription.RpcMethods) + + foreach ( var methodDescription in serviceDescription.RpcMethods ) { - switch (methodDescription.RpcMethodType) + switch ( methodDescription.RpcMethodType ) { case MethodType.Unary: AddUnaryMethod(typeBuilder, serviceDescription, methodDescription); @@ -69,6 +71,10 @@ private static void AddMethods(TypeBuilder typeBuilder, Type serviceType) AddServerStreamingMethod(typeBuilder, serviceDescription, methodDescription); break; + case MethodType.DuplexStreaming: + AddDuplexStreamingMethod(typeBuilder, serviceDescription, methodDescription); + break; + default: throw new NotSupportedException($"Not support MethodType: {methodDescription.RpcMethodType}"); } @@ -93,7 +99,7 @@ private static void AddUnaryMethod(TypeBuilder typeBuilder, RpcServiceDescriptio il.Emit( OpCodes.Call, - _unaryMethodToCall.MakeGenericMethod(new [] { + _unaryMethodToCall.MakeGenericMethod(new[] { methodDescription.RpcMethod.GetParameters()[0].ParameterType, methodDescription.RpcMethod.ReturnType.GetGenericArguments()[0] }) @@ -151,7 +157,35 @@ private static void AddServerStreamingMethod(TypeBuilder typeBuilder, RpcService il.Emit( OpCodes.Call, _serverStreamingMethodToCall.MakeGenericMethod(new[] { - methodDescription.ResponseDataType, + methodDescription.RequestDataType, + methodDescription.ResponseDataType + }) + ); + + il.Emit(OpCodes.Ret); + + typeBuilder.DefineMethodOverride(methodBuilder, methodDescription.RpcMethod); + } + + private static void AddDuplexStreamingMethod(TypeBuilder typeBuilder, RpcServiceDescription serviceDescription, RpcMethodDescription methodDescription) + { + var args = methodDescription.RpcMethod.GetParameters(); + var methodBuilder = typeBuilder.DefineMethod( + methodDescription.RpcMethodName, + MethodAttributes.Public | MethodAttributes.Virtual, + methodDescription.RpcMethod.ReturnType, + (from arg in args select arg.ParameterType).ToArray() + ); + var il = methodBuilder.GetILGenerator(); + il.Emit(OpCodes.Ldarg_0); + il.Emit(OpCodes.Ldstr, serviceDescription.RpcServiceName); + il.Emit(OpCodes.Ldstr, methodDescription.RpcMethodName); + il.Emit(OpCodes.Ldarg_1); + + il.Emit( + OpCodes.Call, + _duplexStreamingMethodToCall.MakeGenericMethod(new[] { + methodDescription.RequestDataType, methodDescription.ResponseDataType }) ); diff --git a/src/SimpleRpc.Server/IRpcHostBuilder.cs b/src/SimpleRpc.Server/IRpcHostBuilder.cs index 1cd6853..026f5dd 100644 --- a/src/SimpleRpc.Server/IRpcHostBuilder.cs +++ b/src/SimpleRpc.Server/IRpcHostBuilder.cs @@ -66,6 +66,25 @@ string methodName where TRequest : class where TResponse : class; + /// + /// 添加 DuplexStreamingMethod RPC 方法 + /// + /// + /// + /// + /// + /// + /// + /// + IRpcHostBuilder AddDuplexStreamingMethod( + Func handler, + string serviceName, + string methodName + ) + where TService : class, IRpcService + where TRequest : class + where TResponse : class; + /// /// 创建 Rpc 宿主服务. /// diff --git a/src/SimpleRpc.Server/Internal/GrpcHostBuilder.cs b/src/SimpleRpc.Server/Internal/GrpcHostBuilder.cs index caf512d..d7f6754 100644 --- a/src/SimpleRpc.Server/Internal/GrpcHostBuilder.cs +++ b/src/SimpleRpc.Server/Internal/GrpcHostBuilder.cs @@ -36,7 +36,7 @@ public GrpcHostBuilder(IServiceProvider serviceProvider, ISerializer serializer, this._logger = loggerFactory.CreateLogger(); } - public IRpcHostBuilder AddUnaryMethod ( + public IRpcHostBuilder AddUnaryMethod( Func> handler, string serviceName, string methodName @@ -45,28 +45,31 @@ string methodName where TRequest : class where TResponse : class { - this._builder.AddMethod( - MethodDefinitionGenerator.CreateMethodDefinition(MethodType.Unary, serviceName, methodName, this._serializer), - (request, context) => { - using (var scope = this._serviceProvider.CreateScope()) - { - var service = scope.ServiceProvider.GetServices().First(s => !s.GetType().Name.EndsWith("GrpcClientProxy", StringComparison.OrdinalIgnoreCase)); - if (service is RpcServiceBase baseService) - { - baseService.Context = context; - } + var method = MethodDefinitionGenerator.CreateMethodDefinition(MethodType.Unary, serviceName, methodName, this._serializer); - this._logger.LogInformation($"Request gRPC endpoint: {context.Method}"); + this._builder.AddMethod(method, UnaryServerMethodDelegate); - return handler(service, request, context.CancellationToken); + // See delegate: UnaryServerMethod + Task UnaryServerMethodDelegate(TRequest request, ServerCallContext context) + { + using ( var scope = this._serviceProvider.CreateScope() ) + { + var service = scope.ServiceProvider.GetServices().First(s => !s.GetType().Name.EndsWith("GrpcClientProxy", StringComparison.OrdinalIgnoreCase)); + if ( service is RpcServiceBase baseService ) + { + baseService.Context = context; } + + this._logger.LogInformation($"Request gRPC endpoint: {context.Method}"); + + return handler(service, request, context.CancellationToken); } - ); + } return this; } - public IRpcHostBuilder AddClientStreamingMethod ( + public IRpcHostBuilder AddClientStreamingMethod( Func> handler, string serviceName, string methodName @@ -77,24 +80,25 @@ string methodName { var method = MethodDefinitionGenerator.CreateMethodDefinition(MethodType.ClientStreaming, serviceName, methodName, this._serializer); - this._builder.AddMethod( - method, - (requestStream, context) => { - using (var scope = this._serviceProvider.CreateScope()) + this._builder.AddMethod(method, ClientStreamingServerMethodDelegate); + + // See delegate: ClientStreamingServerMethod + Task ClientStreamingServerMethodDelegate(IAsyncStreamReader requestStream, ServerCallContext context) + { + using ( var scope = this._serviceProvider.CreateScope() ) + { + var service = scope.ServiceProvider.GetServices().First(s => !s.GetType().Name.EndsWith("GrpcClientProxy", StringComparison.OrdinalIgnoreCase)); + if ( service is RpcServiceBase baseService ) { - var service = scope.ServiceProvider.GetServices().First(s => !s.GetType().Name.EndsWith("GrpcClientProxy", StringComparison.OrdinalIgnoreCase)); - if (service is RpcServiceBase baseService) - { - baseService.Context = context; - baseService.SetAsyncStreamReader(requestStream); - } + baseService.Context = context; + baseService.SetAsyncStreamReader(requestStream); + } - this._logger.LogInformation($"Request gRPC endpoint: {context.Method}"); + this._logger.LogInformation($"Request gRPC endpoint: {context.Method}"); - return handler(service, context.CancellationToken); - } + return handler(service, context.CancellationToken); } - ); + } return this; } @@ -110,14 +114,15 @@ string methodName { var method = MethodDefinitionGenerator.CreateMethodDefinition(MethodType.ServerStreaming, serviceName, methodName, this._serializer); - this._builder.AddMethod(method, LocalServerStreamingServerMethod); + this._builder.AddMethod(method, ServerStreamingServerMethodDelegate); - Task LocalServerStreamingServerMethod(TRequest request, IServerStreamWriter responseStream, ServerCallContext context) + // See delegate: ServerStreamingServerMethod + Task ServerStreamingServerMethodDelegate(TRequest request, IServerStreamWriter responseStream, ServerCallContext context) { - using (var scope = this._serviceProvider.CreateScope()) + using ( var scope = this._serviceProvider.CreateScope() ) { var service = scope.ServiceProvider.GetServices().First(s => !s.GetType().Name.EndsWith("GrpcClientProxy", StringComparison.OrdinalIgnoreCase)); - if (service is RpcServiceBase baseService) + if ( service is RpcServiceBase baseService ) { baseService.Context = context; baseService.SetServerStreamWriter(responseStream); @@ -132,6 +137,41 @@ Task LocalServerStreamingServerMethod(TRequest request, IServerStreamWriter( + Func handler, + string serviceName, + string methodName + ) + where TService : class, IRpcService + where TRequest : class + where TResponse : class + { + var method = MethodDefinitionGenerator.CreateMethodDefinition(MethodType.DuplexStreaming, serviceName, methodName, this._serializer); + + this._builder.AddMethod(method, DuplexStreamingServerMethodDelegate); + + // See delegate: DuplexStreamingServerMethod + Task DuplexStreamingServerMethodDelegate(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) + { + using ( var scope = this._serviceProvider.CreateScope() ) + { + var service = scope.ServiceProvider.GetServices().First(s => !s.GetType().Name.EndsWith("GrpcClientProxy", StringComparison.OrdinalIgnoreCase)); + if ( service is RpcServiceBase baseService ) + { + baseService.Context = context; + baseService.SetAsyncStreamReader(requestStream); + baseService.SetServerStreamWriter(responseStream); + } + + this._logger.LogInformation($"Request gRPC endpoint: {context.Method}"); + + return handler(service, context.CancellationToken); + } + } + + return this; + } + public IRpcHost Build() { var healthService = new HealthServiceImpl(); diff --git a/src/SimpleRpc.Server/Internal/GrpcHostBuilderExtensions.cs b/src/SimpleRpc.Server/Internal/GrpcHostBuilderExtensions.cs index e66e761..0ad3cb9 100644 --- a/src/SimpleRpc.Server/Internal/GrpcHostBuilderExtensions.cs +++ b/src/SimpleRpc.Server/Internal/GrpcHostBuilderExtensions.cs @@ -27,20 +27,24 @@ internal static class GrpcHostBuilderExtensions private static readonly MethodInfo _serverStreamingHandlerGenerator = typeof(MethodHandlerGenerator).GetMethod(nameof(MethodHandlerGenerator.GenerateServerStreamingMethodHandler)); private static readonly MethodInfo _addServerStreamingMethod = typeof(GrpcHostBuilder).GetMethod(nameof(GrpcHostBuilder.AddServerStreamingMethod), BindingFlags.Public | BindingFlags.Instance); + // DuplexStreaming + private static readonly MethodInfo _duplexStreamingHandlerGenerator = typeof(MethodHandlerGenerator).GetMethod(nameof(MethodHandlerGenerator.GenerateDuplexStreamingMethodHandler)); + private static readonly MethodInfo _addDuplexStreamingMethod = typeof(GrpcHostBuilder).GetMethod(nameof(GrpcHostBuilder.AddDuplexStreamingMethod), BindingFlags.Public | BindingFlags.Instance); + public static IRpcHostBuilder RegisterRpcService(this IRpcHostBuilder builder, List rpcServiceDescriptions) { - if (rpcServiceDescriptions.Count == 0) + if ( rpcServiceDescriptions.Count == 0 ) { return builder; } - foreach (var rpcServiceDescription in rpcServiceDescriptions) + foreach ( var rpcServiceDescription in rpcServiceDescriptions ) { - foreach (var rpcMethodDescription in rpcServiceDescription.RpcMethods) + foreach ( var rpcMethodDescription in rpcServiceDescription.RpcMethods ) { var requestType = rpcMethodDescription.RpcMethod.GetParameters()[0].ParameterType; - switch (rpcMethodDescription.RpcMethodType) + switch ( rpcMethodDescription.RpcMethodType ) { case MethodType.Unary: var unaryResponseType = rpcMethodDescription.RpcMethod.ReturnType.GenericTypeArguments[0]; @@ -70,6 +74,15 @@ public static IRpcHostBuilder RegisterRpcService(this IRpcHostBuilder builder, L addServerStreamingMethod.Invoke(builder, new[] { serverStreamingHandler, rpcServiceDescription.RpcServiceName, rpcMethodDescription.RpcMethodName }); break; + case MethodType.DuplexStreaming: + // Func + var duplexStreamingHandlerGenerator = _duplexStreamingHandlerGenerator.MakeGenericMethod(rpcServiceDescription.RpcServiceType, rpcMethodDescription.ResponseDataType); + var duplexStreamingHandler = duplexStreamingHandlerGenerator.Invoke(null, new[] { rpcMethodDescription.RpcMethod }); + + var addDuplexStreamingMethod = _addDuplexStreamingMethod.MakeGenericMethod(rpcServiceDescription.RpcServiceType, rpcMethodDescription.RequestDataType, rpcMethodDescription.ResponseDataType); + addDuplexStreamingMethod.Invoke(builder, new[] { duplexStreamingHandler, rpcServiceDescription.RpcServiceName, rpcMethodDescription.RpcMethodName }); + break; + default: throw new NotSupportedException($"unsupport gRPC MethodType: {rpcMethodDescription.RpcMethodType}"); } @@ -82,12 +95,12 @@ public static IRpcHostBuilder RegisterRpcService(this IRpcHostBuilder builder, L public static IRpcHostBuilder AddUnaryMethods(this IRpcHostBuilder builder, Type serviceType) { var serviceName = ((RpcServiceAttribute)serviceType.GetCustomAttribute(typeof(RpcServiceAttribute)))?.Name; - if (string.IsNullOrWhiteSpace(serviceName)) + if ( string.IsNullOrWhiteSpace(serviceName) ) { serviceName = serviceType.Name; } - foreach (var method in serviceType.GetMethods().Where(_ => _.GetCustomAttribute(typeof(RpcMethodAttribute), true) != null)) + foreach ( var method in serviceType.GetMethods().Where(_ => _.GetCustomAttribute(typeof(RpcMethodAttribute), true) != null) ) { CheckRpcMethodParameterType(method); @@ -109,9 +122,12 @@ public static IRpcHostBuilder AddUnaryMethods(this IRpcHostBuilder builder, Type public static IRpcHostBuilder AddClientStreamingMethods(this IRpcHostBuilder builder, Type serviceType) { var serviceName = ((RpcServiceAttribute)serviceType.GetCustomAttribute(typeof(RpcServiceAttribute)))?.Name; - if (string.IsNullOrWhiteSpace(serviceName)) serviceName = serviceType.Name; + if ( string.IsNullOrWhiteSpace(serviceName) ) + { + serviceName = serviceType.Name; + } - foreach (var method in serviceType.GetMethods().Where(_ => _.GetCustomAttribute(typeof(RpcMethodAttribute), true) != null)) + foreach ( var method in serviceType.GetMethods().Where(_ => _.GetCustomAttribute(typeof(RpcMethodAttribute), true) != null) ) { CheckRpcMethodParameterType(method); @@ -134,7 +150,10 @@ public static IRpcHostBuilder AddClientStreamingMethods(this IRpcHostBuilder bui private static string GetMethodName(MethodInfo method) { var methodName = ((RpcMethodAttribute)method.GetCustomAttribute(typeof(RpcMethodAttribute)))?.Name; - if (string.IsNullOrWhiteSpace(methodName)) methodName = method.Name; + if ( string.IsNullOrWhiteSpace(methodName) ) + { + methodName = method.Name; + } return methodName; } @@ -142,17 +161,17 @@ private static string GetMethodName(MethodInfo method) [MethodImpl(MethodImplOptions.AggressiveInlining)] private static void CheckRpcMethodParameterType(MethodInfo method) { - if (method.ReturnType.GenericTypeArguments.Length != 1) + if ( method.ReturnType.GenericTypeArguments.Length != 1 ) { throw new RpcDefineException("The return value type of RPC method must be Task."); } - if (method.GetParameters().Length != 2) + if ( method.GetParameters().Length != 2 ) { throw new RpcDefineException("The RPC method can only contain two parameters, the first one is the generic TRequest and the other is the System.Threading.CancellationToken type."); } - if (method.GetParameters()[1].ParameterType != typeof(CancellationToken)) + if ( method.GetParameters()[1].ParameterType != typeof(CancellationToken) ) { throw new RpcDefineException("The second parameter of the RPC method must be the System.Threading.CancellationToken type."); } diff --git a/src/SimpleRpc.Server/Internal/MethodHandlerGenerator.cs b/src/SimpleRpc.Server/Internal/MethodHandlerGenerator.cs index e851f57..424df61 100644 --- a/src/SimpleRpc.Server/Internal/MethodHandlerGenerator.cs +++ b/src/SimpleRpc.Server/Internal/MethodHandlerGenerator.cs @@ -53,4 +53,18 @@ public static Func GenerateServerSt return func; } + + public static Func GenerateDuplexStreamingMethodHandler(MethodInfo method) + where TResponse : class + { + var serviceParameter = Expression.Parameter(typeof(TService)); + var ctParameter = Expression.Parameter(typeof(CancellationToken)); + var invocation = Expression.Call(serviceParameter, method, new[] { ctParameter }); + var func = Expression.Lambda>( + invocation, false, new[] { serviceParameter, ctParameter } + ) + .Compile(); + + return func; + } } diff --git a/src/SimpleRpc.Server/RpcHostedService.cs b/src/SimpleRpc.Server/RpcHostedService.cs index b6bf37f..a76bcc4 100644 --- a/src/SimpleRpc.Server/RpcHostedService.cs +++ b/src/SimpleRpc.Server/RpcHostedService.cs @@ -26,22 +26,22 @@ public RpcHostedService(IRpcHost host, IOptions options, IRpcS public async Task StartAsync(CancellationToken cancellationToken) { - this._logger.LogInformation("------ Starting RPC hosted service. ------"); + this._logger.LogInformation("------ Starting RPC host service. ------"); await this._host.StartAsync().ConfigureAwait(false); - this._logger.LogInformation($"------ RPC hosted service started at 0.0.0.0:{this._options.Port}. ------"); - + this._logger.LogInformation($"------ RPC host service started at 0.0.0.0:{this._options.Port}. ------"); + await this._rpcServiceRegister.RegisterAsync(); } public async Task StopAsync(CancellationToken cancellationToken) { - this._logger.LogInformation("------ Stopping RPC hosted service. ------"); + this._logger.LogInformation("------ Stopping RPC host service. ------"); await this._host.StopAsync().ConfigureAwait(false); - this._logger.LogInformation("------ RPC hosted service stoped. ------"); + this._logger.LogInformation("------ RPC host service stoped. ------"); await this._rpcServiceRegister.UnregisterAsync(); }