Skip to content

Commit

Permalink
feat: Make all gRPC streams implement IDisposable
Browse files Browse the repository at this point in the history
This is done virtually, to make mocking easier where necessary, but we won't override the implementation in production code.
  • Loading branch information
jskeet committed Apr 24, 2023
1 parent daa2e0b commit e51f919
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 11 deletions.
23 changes: 19 additions & 4 deletions Google.Api.Gax.Grpc/BidirectionalStreamingBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,25 @@

using Grpc.Core;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Google.Api.Gax.Grpc
{
/// <summary>
/// Base class for bidirectional streaming RPC methods.
/// Base class for bidirectional streaming RPC methods. This wraps an underlying call returned by gRPC,
/// in order to provide a wrapper for the async response stream, allowing users to take advantage
/// of <code>await foreach</code> support from C# 8 onwards. Additionally, it wraps the
/// request stream in a buffer, allowing multiple requests to be written without waiting for them
/// to be transmitted.
/// </summary>
/// <remarks>
/// To avoid memory leaks, users must dispose of gRPC streams.
/// Additionally, you are strongly advised to read the whole response stream, even if the data
/// is not required - this avoids effectively cancelling the call.
/// </remarks>
/// <typeparam name="TRequest">RPC request type</typeparam>
/// <typeparam name="TResponse">RPC response type</typeparam>
public abstract class BidirectionalStreamingBase<TRequest, TResponse>
public abstract class BidirectionalStreamingBase<TRequest, TResponse> : IDisposable
{
/// <summary>
/// The underlying gRPC duplex streaming call.
Expand Down Expand Up @@ -123,5 +130,13 @@ public virtual Task WriteCompleteAsync()
/// </remarks>
public virtual AsyncResponseStream<TResponse> GetResponseStream() =>
new AsyncResponseStream<TResponse>(GrpcCall.ResponseStream);

/// <summary>
/// Disposes of the underlying gRPC call. There is no need to dispose of both the wrapper
/// and the underlying call; it's typically simpler to dispose of the wrapper with a
/// <code>using</code> statement as the wrapper is returned by client libraries.
/// </summary>
/// <remarks>The default implementation just calls Dispose on the result of <see cref="GrpcCall"/>.</remarks>
public virtual void Dispose() => GrpcCall.Dispose();
}
}
17 changes: 15 additions & 2 deletions Google.Api.Gax.Grpc/ClientStreamingBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@
namespace Google.Api.Gax.Grpc
{
/// <summary>
/// Base class for the client-side streaming RPC methods.
/// Base class for the client-side streaming RPC methods. This wraps the
/// request stream in a buffer, allowing multiple requests to be written without waiting for them
/// to be transmitted.
/// </summary>
/// <remarks>
/// To avoid memory leaks, users must dispose of gRPC streams.
/// </remarks>
/// <typeparam name="TRequest">RPC request type</typeparam>
/// <typeparam name="TResponse">RPC response type</typeparam>
public abstract class ClientStreamingBase<TRequest, TResponse>
public abstract class ClientStreamingBase<TRequest, TResponse> : IDisposable
{
/// <summary>
/// The underlying gRPC client streaming call.
Expand Down Expand Up @@ -92,6 +97,14 @@ public virtual Task TryWriteCompleteAsync() =>
public virtual Task WriteCompleteAsync() =>
throw new NotImplementedException();

/// <summary>
/// Disposes of the underlying gRPC call. There is no need to dispose of both the wrapper
/// and the underlying call; it's typically simpler to dispose of the wrapper with a
/// <code>using</code> statement as the wrapper is returned by client libraries.
/// </summary>
/// <remarks>The default implementation just calls Dispose on the result of <see cref="GrpcCall"/>.</remarks>
public virtual void Dispose() => GrpcCall.Dispose();

/// <summary>
/// Asynchronous call result. This task will only complete after
/// <see cref="WriteCompleteAsync"/> has already been called.
Expand Down
22 changes: 17 additions & 5 deletions Google.Api.Gax.Grpc/ServerStreamingBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,21 @@

using Grpc.Core;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Google.Api.Gax.Grpc
{
/// <summary>
/// Base class for server streaming RPC methods.
/// Base class for server streaming RPC methods. This wraps an underlying call returned by gRPC,
/// in order to provide a wrapper for the async response stream, allowing users to take advantage
/// of <code>await foreach</code> support from C# 8 onwards.
/// </summary>
/// <remarks>
/// To avoid memory leaks, users must dispose of gRPC streams.
/// Additionally, you are strongly advised to read the whole response stream, even if the data
/// is not required - this avoids effectively cancelling the call.
/// </remarks>
/// <typeparam name="TResponse">RPC streaming response type</typeparam>
public class ServerStreamingBase<TResponse>
public class ServerStreamingBase<TResponse> : IDisposable
{
/// <summary>
/// The underlying gRPC duplex streaming call.
Expand All @@ -40,5 +44,13 @@ public virtual AsyncServerStreamingCall<TResponse> GrpcCall
/// </remarks>
public virtual AsyncResponseStream<TResponse> GetResponseStream() =>
new AsyncResponseStream<TResponse>(GrpcCall.ResponseStream);

/// <summary>
/// Disposes of the underlying gRPC call. There is no need to dispose of both the wrapper
/// and the underlying call; it's typically simpler to dispose of the wrapper with a
/// <code>using</code> statement as the wrapper is returned by client libraries.
/// </summary>
/// <remarks>The default implementation just calls Dispose on the result of <see cref="GrpcCall"/>.</remarks>
public virtual void Dispose() => GrpcCall.Dispose();
}
}

0 comments on commit e51f919

Please sign in to comment.