一、交换机

1、 交换机的作用

在 RabbitMQ 的发布/订阅模式下,其核心思想是:生产者绝不会直接将消息发送到队列中。实际上,很多时候生产者甚至都不知道消息是否会送达任何队列。 相反,生产者只能向一个交换机发送消息。交换机是一个非常简单的概念。一方面,它接收来自生产者的消息;另一方面,它将这些消息推送到队列中。交换机必须清楚地知道如何处理它接收到的消息。该消息是否应添加到特定的队列中?是否应添加到多个队列中?还是应该将其丢弃?这些规则由交换机的类型来定义。

交换机有:direct、topic、headers、fanout 类型

2、广播型交换机

会将消息进行广播,它会将消息分发到所有绑定此交换机的队列

示意图

消息发布

消息发布者无需知道任何队列,只需要将消息交给交换机,由交换机决定传给哪个队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
using RabbitMQ.Client;
using System.Text;

var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();

await channel.ExchangeDeclareAsync(exchange: "logs", type: ExchangeType.Fanout);

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
await channel.BasicPublishAsync(exchange: "logs", routingKey: string.Empty, body: body);
Console.WriteLine($" [x] Sent {message}");

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "info: Hello World!");
}

消息接收

消息接收者负责创建队列并将队列绑定交换机

channel.QueueDeclareAsync 默认会创建一个名字随机的队列,使用 QueueBindAysnc 将这个队列与交换机绑定

这样就能多个消费者消费同一个消息了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();

await channel.ExchangeDeclareAsync(exchange: "logs",
type: ExchangeType.Fanout);

// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;
await channel.QueueBindAsync(queue: queueName, exchange: "logs", routingKey: string.Empty);

Console.WriteLine(" [*] Waiting for logs.");

var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += (model, ea) =>
{
byte[] body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] {message}");
return Task.CompletedTask;
};

await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();