1、发布者确认用法 开启发布者确认功能 发布者确认是 RabbitMQ 对 AMQP 0.9.1 协议的扩展,因此默认情况下未启用。发布者确认可以通过以下CreateChannelOptions类在通道级别启用:
1 2 3 4 5 var channelOpts = new CreateChannelOptions( publisherConfirmationsEnabled: true , publisherConfirmationTrackingEnabled: true , outstandingPublisherConfirmationsRateLimiter: new ThrottlingRateLimiter(MAX_OUTSTANDING_CONFIRMS) );
发布消息并等待 通过await请求返回的任务来等待消息确认BasicPublishAsync。 await一旦消息被确认,该任务就会立即返回。如果消息被拒绝或被退回(意味着代理由于某种原因无法处理该消息),该方法将抛出异常。异常处理通常包括记录错误消息和/或重试发送消息。
1 2 3 4 5 6 7 8 9 10 11 12 for (int i = 0 ; i < MESSAGE_COUNT; i++){ byte [] body = Encoding.UTF8.GetBytes(i.ToString()); try { await channel.BasicPublishAsync(exchange: string .Empty, routingKey: queueName, body: body, basicProperties: props, mandatory: true ); } catch (Exception ex) { Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: {ex} " ); } }
分批发布消息并统一等待 为了改进之前的示例,我们可以发布一批消息,并等待这批消息全部得到确认。以下示例使用的批次大小等于允许的未确认消息数量的一半:
1 2 3 4 5 6 7 8 9 10 11 12 13 var publishTasks = new List<ValueTask>();for (int i = 0 ; i < MESSAGE_COUNT; i++){ byte [] body = Encoding.UTF8.GetBytes(i.ToString()); ValueTask publishTask = channel.BasicPublishAsync(exchange: string .Empty, routingKey: queueName, body: body, mandatory: true , basicProperties: props); publishTasks.Add(publishTask); await MaybeAwaitPublishes(publishTasks, batchSize); } await MaybeAwaitPublishes(publishTasks, 0 );
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static async Task MaybeAwaitPublishes (List<ValueTask> publishTasks, int batchSize ){ if (publishTasks.Count >= batchSize) { foreach (ValueTask pt in publishTasks) { try { await pt; } catch (Exception ex) { Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex} '" ); } } publishTasks.Clear(); } }
在应用程序中处理发布者确认(没看懂,之后再研究吧) 发布确认模式用于确保消息已经被正确地发送到RabbitMQ服务器,并被成功接收和持久化。通过使用发布确认,生产者可以获得对消息的可靠性保证,避免消息丢失。这一机制基于通道(Channel)级别,通过两个阶段的确认来保证消息的可靠性。
作为消息中间件, 都会⾯临消息丢失的问题.
消息丢失⼤概分为三种情况:
⽣产者问题. 因为应⽤程序故障, ⽹络抖动等各种原因, ⽣产者没有成功向broker发送消息.
消息中间件⾃⾝问题. ⽣产者成功发送给了Broker, 但是Broker没有把消息保存好, 导致消息丢失.
消费者问题. Broker 发送消息到消费者, 消费者在消费消息时, 因为没有处理好, 导致broker将消费失败的消息从队列中删除了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 channel.BasicReturnAsync += (sender, ea) => { ulong sequenceNumber = 0 ; IReadOnlyBasicProperties props = ea.BasicProperties; if (props.Headers is not null ) { object ? maybeSeqNum = props.Headers[Constants.PublishSequenceNumberHeader]; if (maybeSeqNum is not null ) { sequenceNumber = BinaryPrimitives.ReadUInt64BigEndian((byte [])maybeSeqNum); } } Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number {sequenceNumber} has been basic.return-ed" ); return CleanOutstandingConfirms(sequenceNumber, false ); }; channel.BasicAcksAsync += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple); channel.BasicNacksAsync += (sender, ea) => { Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number: {ea.DeliveryTag} has been nacked (multiple: {ea.Multiple} )" ); return CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple); };
共有 3 个回调函数:一个用于已确认消息,一个用于未确认消息,一个用于已返回消息。所有回调函数都有一个对应的EventArgs 参数(ea)。对于确认和未确认消息,该参数包含:
投递标签:用于识别已确认或未确认信息的序列号。稍后我们将看到如何将其与已发布的信息关联起来。
multiple:这是一个布尔值。如果为 false,则只确认/否定一条消息;如果为 true,则确认/否定序列号小于或等于该消息的所有消息。
发布消息前可以通过以下方式提前获取到序列号
1 2 var sequenceNumber = await channel.GetNextPublishSequenceNumberAsync();await channel.BasicPublishAsync(exchange, queue, properties, body);
将消息与序列号关联起来的一种高效方法是使用链表。发布代码现在使用链表跟踪出站消息。我们需要在收到确认消息时清理此链表,并在消息被拒绝确认时记录警告信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 async Task CleanOutstandingConfirms (ulong deliveryTag, bool multiple ){ if (debug) { Console.WriteLine("{0} [DEBUG] confirming message: {1} (multiple: {2})" , DateTime.Now, deliveryTag, multiple); } await semaphore.WaitAsync(); try { if (multiple) { do { LinkedListNode<ulong >? node = outstandingConfirms.First; if (node is null ) { break ; } if (node.Value <= deliveryTag) { outstandingConfirms.RemoveFirst(); } else { break ; } confirmedCount++; } while (true ); } else { confirmedCount++; outstandingConfirms.Remove(deliveryTag); } } finally { semaphore.Release(); } if (outstandingConfirms.Count == 0 || confirmedCount == MESSAGE_COUNT) { allMessagesConfirmedTcs.SetResult(true ); } }
前面的示例包含一个回调函数,用于在收到确认、否定或返回消息时清理链表。请注意,此回调函数可以处理单个和多个确认消息。对于否定或返回消息,回调函数会发出警告。然后,它会重用前面的回调函数来清理链表中未完成的确认消息。
总而言之,异步处理发布者确认通常需要以下步骤:
提供一种将发布序列号与消息关联起来的方法。
注册通道上的确认监听器,以便在发布者收到确认/否定确认时收到通知,从而执行相应的操作,例如记录日志或重新发布已否定确认的消息。在此步骤中,序列号与消息的关联机制可能也需要进行一些清理工作。
发布消息前,请跟踪发布序列号。