Skip to content

Commit

Permalink
RPC test cleanup
Browse files Browse the repository at this point in the history
Addresses test flakes due to using not-quite-unique queue names.
  • Loading branch information
lukebakken committed Dec 17, 2024
1 parent afa3c82 commit a230f01
Showing 1 changed file with 136 additions and 101 deletions.
237 changes: 136 additions & 101 deletions Tests/Rpc/RpcServerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,46 @@ namespace Tests.Rpc
{
public class RpcServerTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper)
{
private string _requestQueueName = string.Empty;
private string _replyToName = $"queueReplyTo-{Now}-{Guid.NewGuid()}";
private string _correlationId = $"my-correlation-id-{Guid.NewGuid()}";

public override async Task InitializeAsync()
{
await base.InitializeAsync();

Assert.NotNull(_management);

IQueueInfo requestQueueInfo = await _management.Queue(_requestQueueName)
.Exclusive(true)
.AutoDelete(true)
.DeclareAsync();

Assert.Equal(_requestQueueName, requestQueueInfo.Name());
_requestQueueName = requestQueueInfo.Name();
}

[Fact]
public async Task MockRpcServerPingPong()
{
Assert.NotNull(_connection);
Assert.NotNull(_management);
await _management.Queue(_queueName).Exclusive(true).AutoDelete(true).DeclareAsync();
TaskCompletionSource<IMessage> tcs = CreateTaskCompletionSource<IMessage>();
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) =>

Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request)
{
var reply = context.Message("pong");
IMessage reply = context.Message("pong");
tcs.SetResult(reply);
return Task.FromResult(reply);
}).RequestQueue(_queueName).BuildAsync();
Assert.NotNull(rpcServer);
IPublisher p = await _connection.PublisherBuilder().Queue(_queueName).BuildAsync();
}

IRpcServer rpcServer = await _connection.RpcServerBuilder()
.Handler(RpcHandler)
.RequestQueue(_requestQueueName)
.BuildAsync();

IPublisher p = await _connection.PublisherBuilder()
.Queue(_requestQueueName)
.BuildAsync();

await p.PublishAsync(new AmqpMessage("test"));
IMessage m = await WhenTcsCompletes(tcs);
Expand All @@ -41,15 +66,21 @@ public async Task MockRpcServerPingPong()
public async Task RpcServerValidateStateChange()
{
Assert.NotNull(_connection);
Assert.NotNull(_management);

List<(State, State)> states = [];
await _management.Queue(_queueName).Exclusive(true).AutoDelete(true).DeclareAsync();
TaskCompletionSource<int> tcs = CreateTaskCompletionSource<int>();
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) =>

static Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request)
{
var m = context.Message(request.Body());
IMessage m = context.Message(request.Body());
return Task.FromResult(m);
}).RequestQueue(_queueName).BuildAsync();
}

IRpcServer rpcServer = await _connection.RpcServerBuilder()
.Handler(RpcHandler)
.RequestQueue(_requestQueueName)
.BuildAsync();

rpcServer.ChangeState += (sender, fromState, toState, e) =>
{
states.Add((fromState, toState));
Expand All @@ -58,8 +89,9 @@ public async Task RpcServerValidateStateChange()
tcs.SetResult(states.Count);
}
};
Assert.NotNull(rpcServer);

await rpcServer.CloseAsync();

int count = await WhenTcsCompletes(tcs);
Assert.Equal(2, count);
Assert.Equal(State.Open, states[0].Item1);
Expand All @@ -76,33 +108,38 @@ public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess()
{
Assert.NotNull(_connection);
Assert.NotNull(_management);
string requestQueue = _queueName;
await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync();
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) =>
{
var reply = context.Message("pong");
return Task.FromResult(reply);
}).RequestQueue(requestQueue).BuildAsync();

Assert.NotNull(rpcServer);
string queueReplyTo = $"queueReplyTo-{Now}";
IQueueSpecification spec = _management.Queue(queueReplyTo).Exclusive(true).AutoDelete(true);
await spec.DeclareAsync();
IRpcServer rpcServer = await _connection.RpcServerBuilder()
.Handler(PongRpcHandler)
.RequestQueue(_requestQueueName)
.BuildAsync();

IQueueSpecification replyQueueSpec = _management.Queue(_replyToName)
.Exclusive(true)
.AutoDelete(true);
await replyQueueSpec.DeclareAsync();

TaskCompletionSource<IMessage> tcs = CreateTaskCompletionSource<IMessage>();

IConsumer consumer = await _connection.ConsumerBuilder().Queue(queueReplyTo).MessageHandler(
(context, message) =>
{
context.Accept();
tcs.SetResult(message);
return Task.CompletedTask;
}).BuildAndStartAsync();
Task MessageHandler(IContext context, IMessage message)
{
context.Accept();
tcs.SetResult(message);
return Task.CompletedTask;
}

IPublisher publisher = await _connection.PublisherBuilder().Queue(requestQueue).BuildAsync();
Assert.NotNull(publisher);
AddressBuilder addressBuilder = new();
IConsumer consumer = await _connection.ConsumerBuilder()
.Queue(replyQueueSpec)
.MessageHandler(MessageHandler)
.BuildAndStartAsync();

IPublisher publisher = await _connection.PublisherBuilder()
.Queue(_requestQueueName)
.BuildAsync();

IMessage message = new AmqpMessage("test").ReplyTo(addressBuilder.Queue(queueReplyTo).Address());
AddressBuilder addressBuilder = new();
string replyToAddress = addressBuilder.Queue(replyQueueSpec).Address();
IMessage message = new AmqpMessage("test").ReplyTo(replyToAddress);
PublishResult pr = await publisher.PublishAsync(message);
Assert.Equal(OutcomeState.Accepted, pr.Outcome.State);

Expand All @@ -123,17 +160,15 @@ public async Task RpcServerClientPingPongWithDefault()
{
Assert.NotNull(_connection);
Assert.NotNull(_management);
string requestQueue = _queueName;
await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync();
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) =>
{
var reply = context.Message("pong");
return Task.FromResult(reply);
}).RequestQueue(_queueName).BuildAsync();
Assert.NotNull(rpcServer);

IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress()
.Queue(requestQueue)
IRpcServer rpcServer = await _connection.RpcServerBuilder()
.Handler(PongRpcHandler)
.RequestQueue(_requestQueueName)
.BuildAsync();

IRpcClient rpcClient = await _connection.RpcClientBuilder()
.RequestAddress()
.Queue(_requestQueueName)
.RpcClient()
.BuildAsync();

Expand All @@ -153,27 +188,22 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS
{
Assert.NotNull(_connection);
Assert.NotNull(_management);
string requestQueue = _queueName;
await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync();
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) =>
{
var reply = context.Message("pong");
return Task.FromResult(reply);
}).RequestQueue(_queueName)
.BuildAsync();
Assert.NotNull(rpcServer);

// custom replyTo queue
IQueueInfo replyTo =
await _management.Queue($"replyTo-{Now}").Exclusive(true).AutoDelete(true).DeclareAsync();
IRpcServer rpcServer = await _connection.RpcServerBuilder()
.Handler(PongRpcHandler)
.RequestQueue(_requestQueueName)
.BuildAsync();

// custom correlationId supplier
const string correlationId = "my-correlation-id";
IQueueInfo replyTo = await _management.Queue(_replyToName)
.Exclusive(true)
.AutoDelete(true)
.DeclareAsync();

IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress()
.Queue(requestQueue)
IRpcClient rpcClient = await _connection.RpcClientBuilder()
.RequestAddress()
.Queue(_requestQueueName)
.RpcClient()
.CorrelationIdSupplier(() => correlationId)
.CorrelationIdSupplier(() => _correlationId)
.CorrelationIdExtractor(message => message.CorrelationId())
.ReplyToQueue(replyTo.Name())
.BuildAsync();
Expand All @@ -182,7 +212,7 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS

IMessage response = await rpcClient.PublishAsync(message);
Assert.Equal("pong", response.Body());
Assert.Equal(correlationId, response.CorrelationId());
Assert.Equal(_correlationId, response.CorrelationId());
await rpcClient.CloseAsync();
await rpcServer.CloseAsync();
}
Expand All @@ -199,34 +229,29 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
{
Assert.NotNull(_connection);
Assert.NotNull(_management);
string requestQueue = _queueName;
await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync();
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) =>
{
var reply = context.Message("pong");
return Task.FromResult(reply);
}).RequestQueue(_queueName)
//come from the client

IRpcServer rpcServer = await _connection.RpcServerBuilder()
.Handler(PongRpcHandler)
.RequestQueue(_requestQueueName)
.CorrelationIdExtractor(message => message.Property("correlationId"))
// replace the correlation id location with Application properties
.ReplyPostProcessor((reply, replyCorrelationId) => reply.Property("correlationId",
replyCorrelationId.ToString() ?? throw new InvalidOperationException()))
.BuildAsync();
Assert.NotNull(rpcServer);

IQueueInfo replyTo =
await _management.Queue($"replyTo-{Now}").Exclusive(true).AutoDelete(true).DeclareAsync();
IQueueInfo replyTo = await _management.Queue(_replyToName)
.Exclusive(true)
.AutoDelete(true)
.DeclareAsync();

// custom correlationId supplier
const string correlationId = "my-correlation-id";
int correlationIdCounter = 0;

IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress()
.Queue(requestQueue)
IRpcClient rpcClient = await _connection.RpcClientBuilder()
.RequestAddress()
.Queue(_requestQueueName)
.RpcClient()
.ReplyToQueue(replyTo.Name())
// replace the correlation id creation with a custom function
.CorrelationIdSupplier(() => $"{correlationId}_{Interlocked.Increment(ref correlationIdCounter)}")
.CorrelationIdSupplier(() => $"{_correlationId}_{Interlocked.Increment(ref correlationIdCounter)}")
// The server will reply with the correlation id in application properties
.CorrelationIdExtractor(message => message.Property("correlationId"))
// The client will use application properties to set the correlation id
Expand All @@ -244,8 +269,8 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
IMessage response = await rpcClient.PublishAsync(message);
Assert.Equal("pong", response.Body());
// the server replies with the correlation id in the application properties
Assert.Equal($"{correlationId}_{i}", response.Property("correlationId"));
Assert.Equal($"{correlationId}_{i}", response.Properties()["correlationId"]);
Assert.Equal($"{_correlationId}_{i}", response.Property("correlationId"));
Assert.Equal($"{_correlationId}_{i}", response.Properties()["correlationId"]);
Assert.Single(response.Properties());
i++;
}
Expand All @@ -259,18 +284,16 @@ public async Task RpcClientMultiThreadShouldBeSafe()
{
Assert.NotNull(_connection);
Assert.NotNull(_management);

string requestQueue = _queueName;

await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync();
const int messagesToSend = 99;

TaskCompletionSource<bool> tcs = CreateTaskCompletionSource();
List<IMessage> messagesReceived = [];
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) =>

Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request)
{
try
{
var reply = context.Message("pong");
IMessage reply = context.Message("pong");
messagesReceived.Add(request);
return Task.FromResult(reply);
}
Expand All @@ -281,17 +304,19 @@ public async Task RpcClientMultiThreadShouldBeSafe()
tcs.SetResult(true);
}
}
}).RequestQueue(requestQueue).BuildAsync();
}

Assert.NotNull(rpcServer);
IRpcServer rpcServer = await _connection.RpcServerBuilder()
.Handler(RpcHandler)
.RequestQueue(_requestQueueName)
.BuildAsync();

IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress()
.Queue(requestQueue)
.Queue(_requestQueueName)
.RpcClient()
.BuildAsync();

List<Task> tasks = [];

// we simulate a multi-thread environment
// where multiple threads send messages to the server
// and the server replies to each message in a consistent way
Expand Down Expand Up @@ -332,25 +357,29 @@ public async Task RpcClientShouldRaiseTimeoutError()
{
Assert.NotNull(_connection);
Assert.NotNull(_management);
string requestQueue = _queueName;
await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync();
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler(async (context, request) =>

static async Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request)
{
IMessage reply = context.Message("pong");
object millisecondsToWait = request.Property("wait");
await Task.Delay(TimeSpan.FromMilliseconds((int)millisecondsToWait));
return reply;
}).RequestQueue(_queueName).BuildAsync();
Assert.NotNull(rpcServer);
}

IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress()
.Queue(requestQueue)
IRpcServer rpcServer = await _connection.RpcServerBuilder()
.Handler(RpcHandler)
.RequestQueue(_requestQueueName)
.BuildAsync();

IRpcClient rpcClient = await _connection.RpcClientBuilder()
.RequestAddress()
.Queue(_requestQueueName)
.RpcClient()
.Timeout(TimeSpan.FromMilliseconds(300))
.BuildAsync();

IMessage reply = await rpcClient.PublishAsync(
new AmqpMessage("ping").Property("wait", 1));
IMessage msg = new AmqpMessage("ping").Property("wait", 1);
IMessage reply = await rpcClient.PublishAsync(msg);
Assert.Equal("pong", reply.Body());

await Assert.ThrowsAsync<TimeoutException>(() => rpcClient.PublishAsync(
Expand All @@ -359,5 +388,11 @@ await Assert.ThrowsAsync<TimeoutException>(() => rpcClient.PublishAsync(
await rpcClient.CloseAsync();
await rpcServer.CloseAsync();
}

private static Task<IMessage> PongRpcHandler(IRpcServer.IContext context, IMessage request)
{
IMessage reply = context.Message("pong");
return Task.FromResult(reply);
}
}
}

0 comments on commit a230f01

Please sign in to comment.