直连交换机虽然可以根据路由键来实现消息分发,但是其分发形式比较死板,必须要路由键完全匹配才行,那有没有可以更方便分发的交换机?有的兄弟,主题(topic)交换机可以解决这个问题

主题交换机可以这样绑定路由键:*.orange.**.*.rabbit…..

其中 *表示一个单词,#表示 0 个或更多个单词

直接看代码,当我们的日志系统需要根据日志级别往消息队列中推送,我们可以使用 <设施名>.<日志级别>来作为路由键

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
using RabbitMQ.Client;
using System.Text;


var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(exchange: "topic_logs", type: ExchangeType.Topic);// 创建主题交换机

var routingKey = (args.Length > 0) ? args[0] : "anonymous.info"; // 获取控制台输入的路由键

var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!"; // 要发送的消息

var body = Encoding.UTF8.GetBytes(message);

await channel.BasicPublishAsync(exchange: "topic_logs", routingKey: routingKey, body:body);

Console.WriteLine($" [x] Sent '{routingKey}':'{message}'");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

if (args.Length < 1) {
Console.Error.WriteLine("Usage: {0} [binding_key...]",
Environment.GetCommandLineArgs()[0]);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Environment.ExitCode = 1;
return;
}


var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();

await channel.ExchangeDeclareAsync(exchange: "topic_logs", type: ExchangeType.Topic);// 创建主题交换机

var queueDeclareResult = await channel.QueueDeclareAsync(); // 创建队列

string queueName = queueDeclareResult.QueueName;

foreach (string? bindingKey in args) {
await channel.QueueBindAsync(queue: queueName, exchange: "topic_logs", routingKey: bindingKey); // 绑定队列与交换机
}

Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");


var consumer = new AsyncEventingBasicConsumer(channel); // 绑定消费者与channel
consumer.ReceivedAsync += (model, ea) => {
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine($" [x] Received '{routingKey}':'{message}'");
return Task.CompletedTask;
};
await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); // 绑定队列和消费者,开始监听队列

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

运行效果: