本文共 9952 字,大约阅读时间需要 33 分钟。
消息的可靠性投递是保证消息在传输过程中不会丢失或重复的核心机制。在RabbitMQ中,生产者与消费者之间的消息传输可以通过确认机制(Confirm)来实现可靠性投递。
Confirm机制是RabbitMQ提供的消息可靠性投递的核心手段。生产者在向RabbitMQ Broker发送消息时,可以选择启用确认模式(Confirm),这样可以确保消息一旦被Broker接收,就会返回一个确认(ACK)给生产者。生产者通过监听这个确认,可以知道消息是否已经成功投递到Broker中。
在生产端,需要通过RabbitMQ Java客户端API启用确认模式,并添加确认监听器。以下是代码实现示例:
import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConfirmListener;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.confirmSelect(); String exchangeName = "test_confirm_exchange"; String routingKey = "test_confirm"; String msg = "Hello RabbitMQ!"; for (int i = 0; i < 5; i++) { channel.basicPublish(exchangeName, routingKey, false, null, msg.getBytes()); } channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) { System.out.println("消息确认成功,deliveryTag: " + deliveryTag + ", multiple: " + multiple); } @Override public void handleNack(long deliveryTag, boolean multiple) { System.out.println("消息确认失败,deliveryTag: " + deliveryTag + ", multiple: " + multiple); } }); channel.close(); connection.close(); }}public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String queueName = "test_confirm_queue"; String exchangeName = "test_confirm_exchange"; String exchangeType = "direct"; String routingKey = "test_confirm"; channel.queueDeclare(queueName, true, false, false, null); channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); channel.basicConsume(queueName, true, new DefaultConsumer(channel)); }}class DefaultConsumer extends DefaultConsumer { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { System.out.println("接收到消息,consumerTag: " + consumerTag); }} RabbitMQ提供了Return机制,可以用来处理消息路由错误的情况。通过在生产端启用Return监听器,可以接收到无法路由的消息,并进行相应的处理。
public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int relaycode, String relayText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] bytes) { System.out.println("收到不可路由的消息,状态码: " + relaycode); System.out.println("信息: " + relayText); } }); String exchangeName = "test_return_exchange"; String routingKey = "test.return.#"; String msg = "Hello RabbitMQ!"; for (int i = 0; i < 5; i++) { channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes()); } channel.close(); connection.close(); }} 幂等性是指在相同条件下,执行相同操作的次数越多,结果都不会改变。在RabbitMQ中,消息幂等性主要体现在消息的唯一性和不重复消费上。
RabbitMQ本身无法保证消息的幂等性,因为生产者可能会重复发送相同的消息,或者消费者可能会重复消费相同的消息。
为了保证消息的幂等性,可以采用以下方法:
public class MyConsumer extends DefaultConsumer { private Channel channel; public MyConsumer(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { System.out.println("接收到消息,消息内容: " + new String(body)); try { Thread.sleep(1000); channel.basicAck(envelope.getDeliveryTag(), true); } catch (InterruptedException e) { e.printStackTrace(); } }} RabbitMQ支持消息和队列的TTL(Time To Live),可以设置消息的过期时间。消息过期后,Broker会将其清除。
DLX是用来处理死信的队列。当消息在队列中变成死信后,Broker会自动将其发布到DLX交换机,进而路由到指定的队列。
public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_ttl_exchange"; String routingKey = "test.ttl.#"; String msg = "Hello RabbitMQ!"; for (int i = 0; i < 5; i++) { AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .expiration("10000") // 设置消息TTL为10秒 .build(); channel.basicPublish(exchangeName, routingKey, false, properties, msg.getBytes()); } channel.close(); connection.close(); }}public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String queueName = "test_ttl_queue"; String exchangeName = "test_ttl_exchange"; String exchangeType = "topic"; String routingKey = "test.ttl.#"; Map arguments = new HashMap<>(); arguments.put("x-message-ttl", 15000); arguments.put("x-dead-letter-exchange", "dead-letter-exchange"); channel.queueDeclare(queueName, true, false, false, arguments); channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); channel.basicConsume(queueName, false, new DefaultConsumer(channel)); }}class DefaultConsumer extends DefaultConsumer { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { System.out.println("接收到消息,消息内容: " + new String(body)); try { Thread.sleep(1000); channel.basicAck(envelope.getDeliveryTag(), true); } catch (InterruptedException e) { e.printStackTrace(); } }} 为了防止消费端在高并发场景下无法处理巨量消息,可以通过设置RabbitMQ的QoS来实现消费端限流。
public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String queueName = "test_qos_queue"; String exchangeName = "test_qos_exchange"; String exchangeType = "topic"; String routingKey = "test.qos.#"; Map arguments = new HashMap<>(); arguments.put("x-message-ttl", 15000); channel.queueDeclare(queueName, true, false, false, arguments); channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); channel.basicQos(0, 4, false); channel.basicConsume(queueName, false, new MyConsumer(channel)); }}class MyConsumer extends DefaultConsumer { private Channel channel; public MyConsumer(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { System.out.println("接收到消息,消息内容: " + new String(body)); try { Thread.sleep(1000); channel.basicAck(envelope.getDeliveryTag(), true); } catch (InterruptedException e) { e.printStackTrace(); } }} 为了提升代码的可复用性,可以实现消费端自定义监听器,通过继承DefaultConsumer类并重写handleDelivery方法。
通过上述技术手段,可以有效保障消息的可靠性投递和幂等性,同时在高并发场景下保障消费端的稳定性。RabbitMQ提供的丰富特性使得消息系统的设计更加灵活和可靠。
转载地址:http://rntfk.baihongyu.com/