Skip to content

Commit

Permalink
feat: add DuplexSteaming support.
Browse files Browse the repository at this point in the history
  • Loading branch information
netcore-jroger committed Apr 14, 2024
1 parent 8be1baa commit ea53c0b
Show file tree
Hide file tree
Showing 14 changed files with 300 additions and 84 deletions.
35 changes: 27 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ A light-weight RPC wrap of google gRPC framework.
## Getting Started

```csharp
// gRPC client side
// gRPC Client side
var configuration = new ConfigurationBuilder()
.AddJsonFile("appsettings.json", false, true)
.Build();
Expand All @@ -25,7 +25,7 @@ var provider = new ServiceCollection()

#### Unary
```csharp
// gRPC client side
// gRPC Client side
var tokenSource = new CancellationTokenSource(1000 * 60 * 2);
var userService = provider.GetService<IUserService>();
var userDto = await userService.GetUserBy(userRequest, tokenSource.Token);
Expand All @@ -47,8 +47,8 @@ Console.WriteLine($"Id: {userDto.Id}, Name: {userDto.Name}, CreateDate: {userDto
```

// gRPC Server side
- see `IUserService.cs` file line: 17-18
- see `UserService.cs` file line: 35-50
- see `IUserService.cs` line: 19-20
- see `UserService.cs` line: 36-50

#### ServerStreaming
```csharp
Expand All @@ -66,11 +66,30 @@ Console.WriteLine($"ServerStreaming: Id: {userDto.Id}, Name: {userDto.Name}, Cre
```

// gRPC Server side
- see `IUserService.cs` file line: 20-21
- see `UserService.cs` file line: 52-78
- see `IUserService.cs` line: 22-23
- see `UserService.cs` line: 52-76

#### DuplexStreaming
> not supported
```csharp
var tokenSource = new CancellationTokenSource(1000 * 60 * 2);
var rpcChannel = provider.GetService<IRpcChannel>();
var call = rpcChannel.AsyncDuplexStreamingCall<UserRequest, UserDto>("greet.Greeter", "TestDuplexStreaming", tokenSource.Token);
await call.ResponseStream.MoveNext(tokenSource.Token).ConfigureAwait(false);
var userDto = call.ResponseStream.Current;
Console.WriteLine($"DuplexStreaming: Id: {userDto.Id}, Name: {userDto.Name}, CreateDate: {userDto.CreateDate:yyyy-MM-dd HH:mm:ss fff}");

await call.ResponseStream.MoveNext(tokenSource.Token).ConfigureAwait(false);
userDto = call.ResponseStream.Current;
Console.WriteLine($"DuplexStreaming: Id: {userDto.Id}, Name: {userDto.Name}, CreateDate: {userDto.CreateDate:yyyy-MM-dd HH:mm:ss fff}");

await call.RequestStream.WriteAsync(new UserRequest { Id = 1, Keyword = $"client[DuplexStreaming]1 - {input}" });
await call.RequestStream.WriteAsync(new UserRequest { Id = 2, Keyword = $"client[DuplexStreaming]2 - {input}" });
await call.RequestStream.CompleteAsync();
```

// gRPC Server side
- see `IUserService.cs` line: 25-26
- see `UserService.cs` line: 78-107

## Roadmap

Expand All @@ -80,4 +99,4 @@ Console.WriteLine($"ServerStreaming: Id: {userDto.Id}, Name: {userDto.Name}, Cre

- [x] ServerStreaming supported.

- [ ] DuplexStreaming supported.
- [x] DuplexStreaming supported.
27 changes: 22 additions & 5 deletions sample/ClientSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ async static Task Main(string[] args)
Id = 1
};

while (true)
while ( true )
{
Console.Write("Please input keyword:");
var input = Console.ReadLine();
if (input.Equals("Q", StringComparison.OrdinalIgnoreCase))
if ( input.Equals("Q", StringComparison.OrdinalIgnoreCase) )
{
break;
}

if (input.StartsWith("cs:", StringComparison.OrdinalIgnoreCase))
if ( input.StartsWith("cs:", StringComparison.OrdinalIgnoreCase) )
{
userRequest.Keyword = input;
var tokenSource = new CancellationTokenSource(1000 * 60 * 2);
Expand All @@ -48,7 +48,7 @@ async static Task Main(string[] args)

Console.WriteLine($"ClientStreaming: Id: {userDto.Id}, Name: {userDto.Name}, CreateDate: {userDto.CreateDate:yyyy-MM-dd HH:mm:ss fff}");
}
else if (input.StartsWith(value: "ss:", StringComparison.OrdinalIgnoreCase))
else if ( input.StartsWith(value: "ss:", StringComparison.OrdinalIgnoreCase) )
{
var tokenSource = new CancellationTokenSource(1000 * 60 * 2);
var rpcChannel = provider.GetService<IRpcChannel>();
Expand All @@ -61,12 +61,29 @@ async static Task Main(string[] args)
userDto = call.ResponseStream.Current;
Console.WriteLine($"ServerStreaming: Id: {userDto.Id}, Name: {userDto.Name}, CreateDate: {userDto.CreateDate:yyyy-MM-dd HH:mm:ss fff}");
}
else if ( input.StartsWith(value: "ds:", StringComparison.OrdinalIgnoreCase) )
{
var tokenSource = new CancellationTokenSource(1000 * 60 * 2);
var rpcChannel = provider.GetService<IRpcChannel>();
var call = rpcChannel.AsyncDuplexStreamingCall<UserRequest, UserDto>("greet.Greeter", "TestDuplexStreaming", tokenSource.Token);
await call.ResponseStream.MoveNext(tokenSource.Token).ConfigureAwait(false);
var userDto = call.ResponseStream.Current;
Console.WriteLine($"DuplexStreaming: Id: {userDto.Id}, Name: {userDto.Name}, CreateDate: {userDto.CreateDate:yyyy-MM-dd HH:mm:ss fff}");

await call.ResponseStream.MoveNext(tokenSource.Token).ConfigureAwait(false);
userDto = call.ResponseStream.Current;
Console.WriteLine($"DuplexStreaming: Id: {userDto.Id}, Name: {userDto.Name}, CreateDate: {userDto.CreateDate:yyyy-MM-dd HH:mm:ss fff}");

await call.RequestStream.WriteAsync(new UserRequest { Id = 1, Keyword = $"client[DuplexStreaming]1 - {input}" });
await call.RequestStream.WriteAsync(new UserRequest { Id = 2, Keyword = $"client[DuplexStreaming]2 - {input}" });
await call.RequestStream.CompleteAsync();
}
else
{
userRequest.Keyword = input;
var tokenSource = new CancellationTokenSource(1000 * 60 * 2);
var userService = provider.GetService<IUserService>();
var userDto = await userService.GetUserBy(userRequest, tokenSource.Token);
var userDto = await userService.TestUnary(userRequest, tokenSource.Token);

Console.WriteLine($"Unary: Id: {userDto.Id}, Name: {userDto.Name}, CreateDate: {userDto.CreateDate:yyyy-MM-dd HH:mm:ss fff}");
}
Expand Down
5 changes: 4 additions & 1 deletion sample/InterfaceLib/IUserService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ namespace InterfaceLib
public interface IUserService : IRpcService
{
[RpcMethod]
Task<UserDto> GetUserBy(UserRequest request, CancellationToken token = default);
Task<UserDto> TestUnary(UserRequest request, CancellationToken token = default);

[RpcMethod(MethodType = MethodType.ClientStreaming, RequestDataType = typeof(UserDto))]
Task<UserDto> TestClientStreaming(CancellationToken token = default);

[RpcMethod(MethodType = MethodType.ServerStreaming, RequestDataType = typeof(UserRequest), ResponseDataType = typeof(UserDto))]
Task TestServerStreaming(UserRequest request, CancellationToken token = default);

[RpcMethod(MethodType = MethodType.DuplexStreaming, RequestDataType = typeof(UserRequest), ResponseDataType = typeof(UserDto))]
Task TestDuplexStreaming(CancellationToken token = default);
}

[ProtoContract]
Expand Down
53 changes: 40 additions & 13 deletions sample/ServerSample/UserService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ public UserService(ILoggerFactory loggerFactory)
this._logger = loggerFactory.CreateLogger<UserService>();
}

public Task<UserDto> GetUserBy(UserRequest request, CancellationToken token = default)
public Task<UserDto> TestUnary(UserRequest request, CancellationToken token = default)
{
this._logger.LogInformation($"Receive client message:{JsonSerializer.Serialize(request, _options)}");
this._logger.LogInformation($"Receive client Unary message:{JsonSerializer.Serialize(request, _options)}");

return Task.FromResult(new UserDto
{
return Task.FromResult(new UserDto {
Id = (int)DateTime.Now.Ticks / 10000,
Name = Guid.NewGuid().ToString("D") + request.Keyword,
CreateDate = DateTime.Now
Expand All @@ -38,13 +37,12 @@ public async Task<UserDto> TestClientStreaming(CancellationToken token = default
{
var requestStream = this.GetAsyncStreamReader<UserDto>();

while (await requestStream.MoveNext(token).ConfigureAwait(false))
while ( await requestStream.MoveNext(token).ConfigureAwait(false) )
{
this._logger.LogInformation($"Receive client client stream message:{JsonSerializer.Serialize(requestStream.Current)}");
this._logger.LogInformation($"Receive client ClientStreaming message:{JsonSerializer.Serialize(requestStream.Current)}");
}

return new UserDto
{
return new UserDto {
Id = (int)DateTime.Now.Ticks / 10000,
Name = Guid.NewGuid().ToString("D"),
CreateDate = DateTime.Now
Expand All @@ -53,14 +51,13 @@ public async Task<UserDto> TestClientStreaming(CancellationToken token = default

public async Task TestServerStreaming(UserRequest request, CancellationToken token = default)
{
this._logger.LogInformation($"Receive client server streaming message:{JsonSerializer.Serialize(request)}");
this._logger.LogInformation($"Receive client ServerStreaming message:{JsonSerializer.Serialize(request)}");

var responseStream = this.GetServerStreamWriter<UserDto>();

// NOTE: do not use method signature: Task WriteAsync(T message, CancellationToken cancellationToken)
await responseStream.WriteAsync(
new UserDto
{
new UserDto {
Id = (int)DateTime.Now.Ticks / 10000,
Name = Guid.NewGuid().ToString("D"),
CreateDate = DateTime.Now
Expand All @@ -70,12 +67,42 @@ await responseStream.WriteAsync(
await Task.Delay(1000 * 2, token);

await responseStream.WriteAsync(
new UserDto
{
new UserDto {
Id = (int)DateTime.Now.Ticks / 10000,
Name = Guid.NewGuid().ToString("D"),
CreateDate = DateTime.Now
}
);
}

public async Task TestDuplexStreaming(CancellationToken token = default)
{
var responseStream = this.GetServerStreamWriter<UserDto>();

// NOTE: do not use method signature: Task WriteAsync(T message, CancellationToken cancellationToken)
await responseStream.WriteAsync(
new UserDto {
Id = (int)DateTime.Now.Ticks / 10000,
Name = $"From Server --> DuplexStreaming: {Guid.NewGuid():D}",
CreateDate = DateTime.Now
}
);

await Task.Delay(1000 * 2, token);

await responseStream.WriteAsync(
new UserDto {
Id = (int)DateTime.Now.Ticks / 10000,
Name = $"From Server --> DuplexStreaming: {Guid.NewGuid():D}",
CreateDate = DateTime.Now
}
);

var requestStream = this.GetAsyncStreamReader<UserRequest>();

while ( await requestStream.MoveNext(token).ConfigureAwait(false) )
{
this._logger.LogInformation($"Receive client DuplexStreaming message:{JsonSerializer.Serialize(requestStream.Current)}");
}
}
}
4 changes: 2 additions & 2 deletions src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<RepositoryUrl>https://github.com/netcore-jroger/SimpleRpc</RepositoryUrl>
<PackageProjectUrl>https://github.com/netcore-jroger/SimpleRpc</PackageProjectUrl>
<PackageTags>gRPC</PackageTags>
<Version>0.6.1</Version>
<Version>0.7.1</Version>
<Copyright>JRoger © 2019-2024</Copyright>
</PropertyGroup>
</Project>
</Project>
11 changes: 11 additions & 0 deletions src/SimpleRpc.Client/DefaultRpcChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ public AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TR
return result;
}

public AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(string serviceName, string methodName, CancellationToken token)
where TRequest : class
where TResponse : class
{
var callOptions = new CallOptions(cancellationToken: token).WithWaitForReady();
var methodDefinition = this.GetMethodDefinition<TRequest, TResponse>(MethodType.DuplexStreaming, serviceName, methodName);
var result = this._invoker.AsyncDuplexStreamingCall<TRequest, TResponse>(methodDefinition, this._host, callOptions);

return result;
}

private Method<TRequest, TResponse> GetMethodDefinition<TRequest, TResponse>(MethodType methodType, string serviceName, string methodName)
where TRequest : class
where TResponse : class
Expand Down
4 changes: 4 additions & 0 deletions src/SimpleRpc.Client/IRpcChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,8 @@ AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest,
AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(string serviceName, string methodName, TRequest request, CancellationToken token)
where TRequest : class
where TResponse : class;

AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(string serviceName, string methodName, CancellationToken token)
where TRequest : class
where TResponse : class;
}
9 changes: 9 additions & 0 deletions src/SimpleRpc.Client/Internal/GrpcClientBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,13 @@ protected AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest,

return call;
}

protected AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(string serviceName, string methodName, CancellationToken token)
where TRequest : class
where TResponse : class
{
var call = this._rpcChannel.AsyncDuplexStreamingCall<TRequest, TResponse>(serviceName, methodName, token);

return call;
}
}
44 changes: 39 additions & 5 deletions src/SimpleRpc.Client/Internal/GrpcClientTypeBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ internal class GrpcClientTypeBuilder
{
private static readonly Type _clientBaseType = typeof(GrpcClientBase);
private static readonly ConstructorInfo _ctorToCall = _clientBaseType.GetConstructor(BindingFlags.NonPublic | BindingFlags.Instance, null, new[] { typeof(IRpcChannel) }, null);

private static readonly MethodInfo _unaryMethodToCall = _clientBaseType.GetMethod(nameof(IRpcChannel.CallUnaryMethodAsync), BindingFlags.Instance | BindingFlags.NonPublic);
private static readonly MethodInfo _clientStreamingMethodToCall = _clientBaseType.GetMethod(nameof(IRpcChannel.AsyncClientStreamingCall), BindingFlags.Instance | BindingFlags.NonPublic);
private static readonly MethodInfo _serverStreamingMethodToCall = _clientBaseType.GetMethod(nameof(IRpcChannel.AsyncServerStreamingCall), BindingFlags.Instance | BindingFlags.NonPublic);
private static readonly MethodInfo _duplexStreamingMethodToCall = _clientBaseType.GetMethod(nameof(IRpcChannel.AsyncDuplexStreamingCall), BindingFlags.Instance | BindingFlags.NonPublic);

public TypeInfo Create<TService>() where TService : class, IRpcService
{
Expand Down Expand Up @@ -52,10 +54,10 @@ private static void AddConstructor(TypeBuilder typeBuilder)
private static void AddMethods(TypeBuilder typeBuilder, Type serviceType)
{
var serviceDescription = new RpcServiceDescription(serviceType);
foreach (var methodDescription in serviceDescription.RpcMethods)

foreach ( var methodDescription in serviceDescription.RpcMethods )
{
switch (methodDescription.RpcMethodType)
switch ( methodDescription.RpcMethodType )
{
case MethodType.Unary:
AddUnaryMethod(typeBuilder, serviceDescription, methodDescription);
Expand All @@ -69,6 +71,10 @@ private static void AddMethods(TypeBuilder typeBuilder, Type serviceType)
AddServerStreamingMethod(typeBuilder, serviceDescription, methodDescription);
break;

case MethodType.DuplexStreaming:
AddDuplexStreamingMethod(typeBuilder, serviceDescription, methodDescription);
break;

default:
throw new NotSupportedException($"Not support MethodType: {methodDescription.RpcMethodType}");
}
Expand All @@ -93,7 +99,7 @@ private static void AddUnaryMethod(TypeBuilder typeBuilder, RpcServiceDescriptio

il.Emit(
OpCodes.Call,
_unaryMethodToCall.MakeGenericMethod(new [] {
_unaryMethodToCall.MakeGenericMethod(new[] {
methodDescription.RpcMethod.GetParameters()[0].ParameterType,
methodDescription.RpcMethod.ReturnType.GetGenericArguments()[0]
})
Expand Down Expand Up @@ -151,7 +157,35 @@ private static void AddServerStreamingMethod(TypeBuilder typeBuilder, RpcService
il.Emit(
OpCodes.Call,
_serverStreamingMethodToCall.MakeGenericMethod(new[] {
methodDescription.ResponseDataType,
methodDescription.RequestDataType,
methodDescription.ResponseDataType
})
);

il.Emit(OpCodes.Ret);

typeBuilder.DefineMethodOverride(methodBuilder, methodDescription.RpcMethod);
}

private static void AddDuplexStreamingMethod(TypeBuilder typeBuilder, RpcServiceDescription serviceDescription, RpcMethodDescription methodDescription)
{
var args = methodDescription.RpcMethod.GetParameters();
var methodBuilder = typeBuilder.DefineMethod(
methodDescription.RpcMethodName,
MethodAttributes.Public | MethodAttributes.Virtual,
methodDescription.RpcMethod.ReturnType,
(from arg in args select arg.ParameterType).ToArray()
);
var il = methodBuilder.GetILGenerator();
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Ldstr, serviceDescription.RpcServiceName);
il.Emit(OpCodes.Ldstr, methodDescription.RpcMethodName);
il.Emit(OpCodes.Ldarg_1);

il.Emit(
OpCodes.Call,
_duplexStreamingMethodToCall.MakeGenericMethod(new[] {
methodDescription.RequestDataType,
methodDescription.ResponseDataType
})
);
Expand Down
Loading

0 comments on commit ea53c0b

Please sign in to comment.