1、发布者确认用法

开启发布者确认功能

发布者确认是 RabbitMQ 对 AMQP 0.9.1 协议的扩展,因此默认情况下未启用。发布者确认可以通过以下CreateChannelOptions类在通道级别启用:

1
2
3
4
5
var channelOpts = new CreateChannelOptions(
publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true,
outstandingPublisherConfirmationsRateLimiter: new ThrottlingRateLimiter(MAX_OUTSTANDING_CONFIRMS)
);

发布消息并等待

通过await请求返回的任务来等待消息确认BasicPublishAsync。 await一旦消息被确认,该任务就会立即返回。如果消息被拒绝或被退回(意味着代理由于某种原因无法处理该消息),该方法将抛出异常。异常处理通常包括记录错误消息和/或重试发送消息。

1
2
3
4
5
6
7
8
9
10
11
12
for (int i = 0; i < MESSAGE_COUNT; i++)
{
byte[] body = Encoding.UTF8.GetBytes(i.ToString());
try
{
await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body, basicProperties: props, mandatory: true);
}
catch (Exception ex)
{
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: {ex}");
}
}

分批发布消息并统一等待

为了改进之前的示例,我们可以发布一批消息,并等待这批消息全部得到确认。以下示例使用的批次大小等于允许的未确认消息数量的一半:

1
2
3
4
5
6
7
8
9
10
11
12
13
var publishTasks = new List<ValueTask>();
for (int i = 0; i < MESSAGE_COUNT; i++)
{
byte[] body = Encoding.UTF8.GetBytes(i.ToString());
ValueTask publishTask = channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body, mandatory: true, basicProperties: props);
publishTasks.Add(publishTask);

await MaybeAwaitPublishes(publishTasks, batchSize);
}

// Await any remaining tasks in case message count was not
// evenly divisible by batch size.
await MaybeAwaitPublishes(publishTasks, 0);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static async Task MaybeAwaitPublishes(List<ValueTask> publishTasks, int batchSize)
{
if (publishTasks.Count >= batchSize)
{
foreach (ValueTask pt in publishTasks)
{
try
{
await pt;
}
catch (Exception ex)
{
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'");
}
}
publishTasks.Clear();
}
}

在应用程序中处理发布者确认(没看懂,之后再研究吧)

发布确认模式用于确保消息已经被正确地发送到RabbitMQ服务器,并被成功接收和持久化。通过使用发布确认,生产者可以获得对消息的可靠性保证,避免消息丢失。这一机制基于通道(Channel)级别,通过两个阶段的确认来保证消息的可靠性。

作为消息中间件, 都会⾯临消息丢失的问题.

消息丢失⼤概分为三种情况:

  • ⽣产者问题. 因为应⽤程序故障, ⽹络抖动等各种原因, ⽣产者没有成功向broker发送消息.
  • 消息中间件⾃⾝问题. ⽣产者成功发送给了Broker, 但是Broker没有把消息保存好, 导致消息丢失.
  • 消费者问题. Broker 发送消息到消费者, 消费者在消费消息时, 因为没有处理好, 导致broker将消费失败的消息从队列中删除了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 已返回消息
channel.BasicReturnAsync += (sender, ea) =>
{
ulong sequenceNumber = 0;

IReadOnlyBasicProperties props = ea.BasicProperties;
if (props.Headers is not null)
{
object? maybeSeqNum = props.Headers[Constants.PublishSequenceNumberHeader];
if (maybeSeqNum is not null)
{
sequenceNumber = BinaryPrimitives.ReadUInt64BigEndian((byte[])maybeSeqNum);
}
}

Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number {sequenceNumber} has been basic.return-ed");
return CleanOutstandingConfirms(sequenceNumber, false);
};
// 已确认消息
channel.BasicAcksAsync += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
// 未确认消息
channel.BasicNacksAsync += (sender, ea) =>
{
Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number: {ea.DeliveryTag} has been nacked (multiple: {ea.Multiple})");
return CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
};

共有 3 个回调函数:一个用于已确认消息,一个用于未确认消息,一个用于已返回消息。所有回调函数都有一个对应的EventArgs 参数(ea)。对于确认和未确认消息,该参数包含:

  • 投递标签:用于识别已确认或未确认信息的序列号。稍后我们将看到如何将其与已发布的信息关联起来。
  • multiple:这是一个布尔值。如果为 false,则只确认/否定一条消息;如果为 true,则确认/否定序列号小于或等于该消息的所有消息。

发布消息前可以通过以下方式提前获取到序列号

1
2
var sequenceNumber = await channel.GetNextPublishSequenceNumberAsync();
await channel.BasicPublishAsync(exchange, queue, properties, body);

将消息与序列号关联起来的一种高效方法是使用链表。发布代码现在使用链表跟踪出站消息。我们需要在收到确认消息时清理此链表,并在消息被拒绝确认时记录警告信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
{
if (debug)
{
Console.WriteLine("{0} [DEBUG] confirming message: {1} (multiple: {2})",
DateTime.Now, deliveryTag, multiple);
}

await semaphore.WaitAsync();
try
{
if (multiple)
{
do
{
LinkedListNode<ulong>? node = outstandingConfirms.First;
if (node is null)
{
break;
}
if (node.Value <= deliveryTag)
{
outstandingConfirms.RemoveFirst();
}
else
{
break;
}

confirmedCount++;
} while (true);
}
else
{
confirmedCount++;
outstandingConfirms.Remove(deliveryTag);
}
}
finally
{
semaphore.Release();
}

if (outstandingConfirms.Count == 0 || confirmedCount == MESSAGE_COUNT)
{
allMessagesConfirmedTcs.SetResult(true);
}
}

前面的示例包含一个回调函数,用于在收到确认、否定或返回消息时清理链表。请注意,此回调函数可以处理单个和多个确认消息。对于否定或返回消息,回调函数会发出警告。然后,它会重用前面的回调函数来清理链表中未完成的确认消息。

总而言之,异步处理发布者确认通常需要以下步骤:

  1. 提供一种将发布序列号与消息关联起来的方法。
  2. 注册通道上的确认监听器,以便在发布者收到确认/否定确认时收到通知,从而执行相应的操作,例如记录日志或重新发布已否定确认的消息。在此步骤中,序列号与消息的关联机制可能也需要进行一些清理工作。
  3. 发布消息前,请跟踪发布序列号。