-
Notifications
You must be signed in to change notification settings - Fork 41
/
SuperStreamProducer.cs
66 lines (56 loc) · 2.65 KB
/
SuperStreamProducer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2020 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries..
using System.Text;
using Microsoft.Extensions.Logging;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.AMQP;
using RabbitMQ.Stream.Client.Reliable;
namespace SuperStream;
public class SuperStreamProducer
{
public static async Task Start()
{
var loggerFactory = LoggerFactory.Create(builder =>
{
builder.AddSimpleConsole();
builder.AddFilter("RabbitMQ.Stream", LogLevel.Information);
});
var logger = loggerFactory.CreateLogger<Producer>();
var loggerMain = loggerFactory.CreateLogger<SuperStreamProducer>();
loggerMain.LogInformation("Starting SuperStream Producer");
var config = new StreamSystemConfig();
var system = await StreamSystem.Create(config).ConfigureAwait(false);
loggerMain.LogInformation("Super Stream Producer connected to RabbitMQ");
// tag::super-stream-creation[]
await system.CreateSuperStream(new PartitionsSuperStreamSpec(Costants.StreamName, 3)).ConfigureAwait(false);
// end::super-stream-creation[]
// We define a Producer with the SuperStream name (that is the Exchange name)
// tag::super-stream-producer[]
var producer = await Producer.Create(
new ProducerConfig(system,
// Costants.StreamName is the Exchange name
// invoices
Costants.StreamName) // <1>
{
SuperStreamConfig = new SuperStreamConfig() // <2>
{
// The super stream is enable and we define the routing hashing algorithm
Routing = msg => msg.Properties.MessageId.ToString() // <3>
}
}, logger).ConfigureAwait(false);
const int NumberOfMessages = 1_000_000;
for (var i = 0; i < NumberOfMessages; i++)
{
var message = new Message(Encoding.Default.GetBytes($"my_invoice_number{i}")) // <4>
{
Properties = new Properties() {MessageId = $"id_{i}"}
};
await producer.Send(message).ConfigureAwait(false);
// end::super-stream-producer[]
loggerMain.LogInformation("Sent {I} message to {StreamName}, id: {ID}", $"my_invoice_number{i}",
Costants.StreamName, $"id_{i}");
Thread.Sleep(TimeSpan.FromMilliseconds(1000));
}
}
}