消息发布者 消息发布者需要注意:
设置 queue 为持续且关闭自动删除
发送消息时配置属性为持久且设置为强制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 using RabbitMQ.Client;using System.Text;var factory = new ConnectionFactory { HostName = "localhost" };using var connection = await factory.CreateConnectionAsync();using var channel = await connection.CreateChannelAsync();await channel.QueueDeclareAsync(queue: "task_queue" , durable: true , exclusive: false , autoDelete: false , arguments: null ); var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);var properties = new BasicProperties{ Persistent = true }; await channel.BasicPublishAsync(exchange: string .Empty, routingKey: "task_queue" , mandatory: true , basicProperties: properties, body: body); Console.WriteLine($" [x] Sent {message} " ); static string GetMessage (string [] args ){ return ((args.Length > 0 ) ? string .Join(" " , args) : "Hello World!" ); }
消息接受者 消息接收者需要注意:
设置 queue 为持续且关闭自动删除
设置预取大小和预取总数(防止单个消费者处理多个消息,其他消费者闲置)
调用 BasicConsumeAsync 时关闭自动应答
事件处理函数中手动应答
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;var factory = new ConnectionFactory { HostName = "localhost" };using var connection = await factory.CreateConnectionAsync();using var channel = await connection.CreateChannelAsync();await channel.QueueDeclareAsync(queue: "task_queue" , durable: true , exclusive: false , autoDelete: false , arguments: null ); await channel.BasicQosAsync(prefetchSize: 0 , prefetchCount: 1 , global : false );Console.WriteLine(" [*] Waiting for messages." ); var consumer = new AsyncEventingBasicConsumer(channel);consumer.ReceivedAsync += async (model, ea) => { byte [] body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine($" [x] Received {message} " ); int dots = message.Split('.' ).Length - 1 ; await Task.Delay(dots * 1000 ); Console.WriteLine(" [x] Done" ); await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false ); }; await channel.BasicConsumeAsync("task_queue" , autoAck: false , consumer: consumer);Console.WriteLine(" Press [enter] to exit." ); Console.ReadLine();
其他 默认情况下,RabbitMQ 会按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将收到相同数量的消息。这种消息分发方式被称为“轮询”。