直连交换机虽然可以根据路由键来实现消息分发,但是其分发形式比较死板,必须要路由键完全匹配才行,那有没有可以更方便分发的交换机?有的兄弟,主题(topic)交换机可以解决这个问题
主题交换机可以这样绑定路由键:*.orange.*、*.*.rabbit…..
其中 *表示一个单词,#表示 0 个或更多个单词

直接看代码,当我们的日志系统需要根据日志级别往消息队列中推送,我们可以使用 <设施名>.<日志级别>来作为路由键
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| using RabbitMQ.Client; using System.Text;
var factory = new ConnectionFactory { HostName = "localhost" }; using var connection = await factory.CreateConnectionAsync(); using var channel = await connection.CreateChannelAsync(); await channel.ExchangeDeclareAsync(exchange: "topic_logs", type: ExchangeType.Topic);
var routingKey = (args.Length > 0) ? args[0] : "anonymous.info";
var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
await channel.BasicPublishAsync(exchange: "topic_logs", routingKey: routingKey, body:body);
Console.WriteLine($" [x] Sent '{routingKey}':'{message}'");
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text;
if (args.Length < 1) { Console.Error.WriteLine("Usage: {0} [binding_key...]", Environment.GetCommandLineArgs()[0]); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); Environment.ExitCode = 1; return; }
var factory = new ConnectionFactory { HostName = "localhost" }; using var connection = await factory.CreateConnectionAsync(); using var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(exchange: "topic_logs", type: ExchangeType.Topic);
var queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;
foreach (string? bindingKey in args) { await channel.QueueBindAsync(queue: queueName, exchange: "topic_logs", routingKey: bindingKey); }
Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
var consumer = new AsyncEventingBasicConsumer(channel); consumer.ReceivedAsync += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine($" [x] Received '{routingKey}':'{message}'"); return Task.CompletedTask; }; await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit."); Console.ReadLine();
|
运行效果:
