AMQP协议——(8)Quorum队列

阿里云教程4个月前发布
36 0 0

AMQP协议——(8)Quorum队列

Quorum队列介绍

Quorum队列(Quorum Queues)是RabbitMQ消息队列系统中的一种现代队列类型,于RabbitMQ 3.8版本引入。它是一种基于Raft共识算法实现的持久化、复制型队列,旨在提供高数据安全性和可靠的领导者选举机制,以实现高可用性。Quorum队列特别适用于数据安全至关重要的场景,例如处理订单、财务交易或关键事件日志等应用。它被推荐作为复制型高可用队列的默认选择,取代了经典镜像队列(Classic Mirrored Queues)。

关键特性

Quorum队列具有以下主要特性:

复制与持久化:消息在多个节点间复制,确保耐用性。队列始终是持久化的,不支持非持久队列。共识机制:采用Raft协议,确保队列状态在多数节点间达成一致。毒消息处理:内置交付尝试计数(默认上限20次,可配置),超过后可丢弃或路由到死信队列,避免无限重试循环。至少一次死信处理:支持配置为“至少一次”语义的死信路由,确保消息不丢失。优先级支持:支持高/正常优先级消息(比例2:1),无需额外策略。TTL和长度限制:支持消息TTL、队列TTL,以及队列长度上限(溢出行为包括drop-head或reject-publish)。策略支持:可通过策略配置如最大长度、溢出行为、死信交换器和交付上限。其他限制:不支持独占队列、瞬态队列或全局QoS(仅支持每个消费者的QoS)。

这些特性使Quorum队列在可靠性上优于传统队列。

如何工作

Quorum队列基于Raft共识协议运行,每个队列有一个领导者(主副本)和多个跟随者(从副本)。所有操作(如发布、消费)通过领导者处理,领导者将变更复制到跟随者,以维护FIFO顺序和一致性。共识要求多数派节点(例如,N个节点中的(N/2)+1)同意队列状态。如果领导者故障,系统会自动选举新领导者,确保快速恢复。

数据持久化通过预写日志(WAL)和段文件实现,消息确认后进行 compaction(压缩)。成员变更(如添加/删除节点)需显式管理或通过连续成员协调(Continuous Membership Reconciliation, CMR)自动化。Quorum队列强调数据安全,但复制会引入延迟。

与经典镜像队列的比较

相比经典镜像队列(已在RabbitMQ 4.0中移除),Quorum队列有显著优势:

数据安全:通过Raft和Jepsen测试,提供更强的网络分区和故障容忍能力。性能:在许多工作负载下,吞吐量更高,延迟变异更低,尤其在使用确认和发布确认时。死信处理:内置至少一次保证和毒消息处理,避免无限循环。复制机制:现代共识算法取代了过时的镜像同步。优先级:原生支持,无需策略配置。

经典镜像队列在性能和可靠性上较弱,已被弃用。迁移时推荐蓝绿部署策略。

配置和创建过程

创建Quorum队列时,需要指定队列类型:

声明队列:在客户端代码中设置x-queue-type: quorum,可选参数如x-quorum-initial-group-size(初始复制因子,默认3,推荐奇数)。成员管理:使用CLI命令如rabbitmq-queues add_member添加节点、delete_member删除、grow/shrink扩展/收缩、rebalance重新平衡领导者。策略配置:通过rabbitmqctl set_policy设置如交付上限(delivery-limit)、死信交换器等。领导者定位:使用queue-leader-locator策略(如client-local或balanced)。
高级配置 :在 rabbitmq.conf 中调整如 quorum_queue.initial_cluster_size = 3 或 Raft 参数(如 raft.segment_max_entries )。

示例:在Python pika库中声明:

Python

 


channel.queue_declare(queue='my_quorum_queue', arguments={'x-queue-type': 'quorum', 'x-quorum-initial-group-size': 3})

 

成员变更示例: rabbitmq-queues add_member / my_quorum_queue node2@hostname

使用场景

Quorum队列适用于:

长生命周期的队列,如处理销售订单或选举投票等关键数据。需要高数据安全的场景,结合发布确认和手动确认。不适合:临时/独占队列、高 churn 场景、低延迟需求、未确认发布、超长积压(推荐使用Streams队列)或大规模扇出。

限制

不支持特性:无瞬态/独占队列、无全局QoS、无某些溢出策略(如reject-publish-dlx)。性能开销:共识引入延迟,吞吐随成员数或消息大小下降;内存占用高(每消息32字节元数据)。故障容忍:至少需3节点,偶数节点降低效益;未确认消息无保证。死信注意:至少一次配置需资源支持,可能导致重复;重试失败可能引起副作用。资源使用:磁盘I/O密集,建议快速磁盘;reject-publish可能短暂超限。

最佳实践和示例

数据安全:始终使用发布确认和手动确认。成员管理:保持奇数成员(至少3),启用CMR自动增长;定期重新平衡领导者。资源优化:设置max-length限制内存/磁盘;清除空闲队列截断段文件;为大消息过配磁盘。性能调优:小消息增加raft.segment_max_entries(如32768),大消息减少(如128);小消息工作负载增加readahead。毒消息处理 :通过策略设置 delivery-limit,如 rabbitmqctl set_policy qq-overrides “^qq.” '{“delivery-limit”: 50}' –apply-to “quorum_queues” 。无限上限 : rabbitmqctl set_policy qq-unlimited “^qq.unlimited” '{“delivery-limit”: -1}' –apply-to “quorum_queues” 。死信配置 :启用至少一次: dead-letter-strategy: at-least- once ,结合 overflow:reject-publish 和 stream_queue 标志。

Quorum队列显著提升了RabbitMQ的可靠性和可用性,是现代分布式系统的首选。

RabbitMQ 中的队列配置示例

RabbitMQ 中的仲裁队列主要通过队列声明参数、策略、CLI 命令和服务器配置文件进行配置。它们必须使用 `x-queue-type: quorum` 参数声明,许多设置(例如副本因子、投递限制和死信机制)可以在创建时或通过策略进行自定义。下面,我将根据 RabbitMQ 官方文档,提供不同方法的实用示例。请注意,仲裁队列始终是持久的,不支持瞬态或排他模式。在生产环境中,为了获得更好的容错能力,请使用奇数副本因子(例如 3 或 5)

1. 通过客户端库声明仲裁队列

使用 AMQP 客户端库声明队列时,需要在参数中指定队列类型。这奠定了基础,还可以包含初始组大小等其他参数。

基本声明(Python 与 Pika)

Python

 



import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
 
channel.queue_declare(
    queue='my_quorum_queue',
    durable=True,  # Always required for Quorum queues
    arguments={'x-queue-type': 'quorum'}  # Default replication: up to 3 members
)
 
# Publish a message for testing
channel.basic_publish(exchange='', routing_key='my_quorum_queue', body='Hello Quorum!')
 
connection.close()

AMQP协议——(8)Quorum队列

 

 

使用自定义复制因子(Java 与 RabbitMQ 客户端)

Java

 



import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
 
import java.util.HashMap;
import java.util.Map;
 
public class QuorumQueueExample {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
 
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
 
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-queue-type", "quorum");
            arguments.put("x-quorum-initial-group-size", 5);  // Initial 5-member replication
 
            channel.queueDeclare("my_quorum_queue", true, false, false, arguments);
            System.out.println("Quorum queue declared.");
        }
    }
}

AMQP协议——(8)Quorum队列

 

 

AMQP 0-9-1 通用示例 : 使用哈希表来存储参数:

文本

 


queueDeclare("my_quorum_queue", durable=true, exclusive=false, autoDelete=false, arguments={ "x-queue-type": "quorum" })

 

 

这些示例会在代理服务器上创建队列(如果队列不存在)

2. 通过 CLI 命令管理成员

声明完成后,可以使用 rabbitmq-queues CLI 来添加/删除成员、扩展/缩小组或重新平衡领导者节点。这些操作对于集群管理至关重要。

将成员添加到队列


rabbitmq-queues add_member -p / my_quorum_queue rabbit@node2 # -p specifies vhost (default /)

 

 

删除成员


rabbitmq-queues delete_member -p / my_quorum_queue rabbit@node2

 

 

将队列扩展到新节点


rabbitmq-queues grow rabbit@node3 even # Even distribution across all Quorum queues

 

 

或者使用图案:


rabbitmq-queues grow rabbit@node3 even --vhost-pattern "prod.*" --queue-pattern "qq.*"

 

 

缩减队列(移除节点)


rabbitmq-queues shrink rabbit@node2 --errors-only # Only if no errors

 

 

重新平衡队列领导者


rabbitmq-queues rebalance quorum # All Quorum queues

 

 

或者具体来说:


rabbitmq-queues rebalance quorum --queue-pattern "orders.*" --vhost-pattern "production.*"

 

 

这些命令有助于维持集群的高可用性

3. 策略配置

策略会将运行时设置应用于匹配的队列(例如,通过正则表达式模式)。可以使用 rabbitmqctl set_policy 命令来实现此操作。

设置投递限制(毒信处理)


rabbitmqctl set_policy qq-delivery-limit "^qq." '{"delivery-limit": 50}' --apply-to "quorum_queues" --priority 10

 

 

禁用配送限制


rabbitmqctl set_policy qq-unlimited "^qq.unlimited" '{"delivery-limit": -1}' --apply-to "quorum_queues" --priority 10

 

 

配置死信循环并允许溢出


rabbitmqctl set_policy qq-dlq "^qq." '{"dead-letter-exchange": "my_dlx", "dead-letter-routing-key": "retry", "overflow": "reject-publish", "max-length": 10000}' --apply-to "quorum_queues" --priority 10

 

 

至少一次死信策略


rabbitmqctl set_policy qq-at-least-once "^qq." '{"dead-letter-strategy": "at-least-once", "overflow": "reject-publish", "dead-letter-exchange": "my_dlx"}' --apply-to "quorum_queues" --priority 10

 

 

注意:需要启用 stream_queue 功能标志并拒绝发布溢出。

策略会覆盖受支持键(例如 max-length 、 overflow 和 delivery-limit) 的队列参数

4. 服务器配置(rabbitmq.conf)

编辑 /etc/rabbitmq/rabbitmq.conf (或等效文件)以获取全局默认值和高级调整。

基本设置

 



# Default initial cluster size for new Quorum queues
quorum_queue.initial_cluster_size = 3
 
# Soft limit for queued commands
quorum_queue.commands_soft_limit = 32

 

持续成员资格核对(CMR)

文本

 



# Enable automatic member management
quorum_queue.continuous_membership_reconciliation.enabled = true
 
# Target replication factor
quorum_queue.continuous_membership_reconciliation.target_group_size = 5
 
# Auto-remove from decommissioned nodes
quorum_queue.continuous_membership_reconciliation.auto_remove = true
 
# Reconciliation interval (ms)
quorum_queue.continuous_membership_reconciliation.interval = 3600000  # 1 hour
 
# Trigger delay after events (ms)
quorum_queue.continuous_membership_reconciliation.trigger_interval = 10000  # 10 seconds

AMQP协议——(8)Quorum队列

 

 

领导者定位策略


queue_leader_locator = balanced # Alternatives: client-local (default)

 

提升性能的筏艇调校

 



# For small messages: Increase segment entries
raft.segment_max_entries = 32768  # Default: 4096
 
# For large messages: Decrease
raft.segment_max_entries = 128
 
# WAL size limit
raft.wal_max_size_bytes = 64000000  # 64 MiB; Default: 512 MiB

 

对于更高级的 Raft 设置,请使用 Erlang 格式的 advanced.config 文件 (例如,用于死信预取):

文本

 


[{rabbit, [{dead_letter_worker_consumer_prefetch, 64}]}].  # Default: 32

更改后请重启

5. 高级参数和最佳实践

目标组大小(通过队列参数或策略)

Python

 



arguments = {
    'x-queue-type': 'quorum',
    'x-quorum-target-group-size': 5  # Overrides initial size
}

 

 

或者通过政策途径:


rabbitmqctl set_policy qq-target-size "^qq." '{"target-group-size": 5}' --apply-to "quorum_queues"

 

 

宽松的房产检查


quorum_queue.property_equivalence.relaxed_checks_on_redeclaration = true

 

允许重新声明队列,忽略某些参数。尖端
使用 rabbitmqctl list_queues 来验证配置(例如, rabbitmqctl list_queues name type arguments )。对于至少一次的死信处理,确保有足够的资源来避免重复。在集群中进行测试:从 3 个节点开始,声明一个队列,并使用 CLI 来增加/减少节点数量。使用 RabbitMQ 管理界面监控成员状态和 Raft 日志。

这些示例涵盖常见场景;请根据您的工作负载进行调整(例如,高吞吐量与高持久性)。有关完整详细信息,请参阅 文档。rabbitmq.com

 

© 版权声明

相关文章

暂无评论

none
暂无评论...