消息发布者

消息发布者需要注意:

  • 设置 queue 为持续且关闭自动删除
  • 发送消息时配置属性为持久且设置为强制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
using RabbitMQ.Client;
using System.Text;

var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();

await channel.QueueDeclareAsync(queue: "task_queue", durable: true, exclusive: false,
autoDelete: false, arguments: null);

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);

var properties = new BasicProperties
{
Persistent = true
};

await channel.BasicPublishAsync(exchange: string.Empty, routingKey: "task_queue", mandatory: true,
basicProperties: properties, body: body);
Console.WriteLine($" [x] Sent {message}");

static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}

消息接受者

消息接收者需要注意:

  • 设置 queue 为持续且关闭自动删除
  • 设置预取大小和预取总数(防止单个消费者处理多个消息,其他消费者闲置)
  • 调用 BasicConsumeAsync 时关闭自动应答
  • 事件处理函数中手动应答
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();

await channel.QueueDeclareAsync(queue: "task_queue", durable: true, exclusive: false,
autoDelete: false, arguments: null);

await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);

Console.WriteLine(" [*] Waiting for messages.");

var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (model, ea) =>
{
byte[] body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received {message}");

int dots = message.Split('.').Length - 1;
await Task.Delay(dots * 1000);

Console.WriteLine(" [x] Done");

// here channel could also be accessed as ((AsyncEventingBasicConsumer)sender).Channel
await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
};

await channel.BasicConsumeAsync("task_queue", autoAck: false, consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

其他

默认情况下,RabbitMQ 会按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将收到相同数量的消息。这种消息分发方式被称为“轮询”。