手把手教你SpringBoot+RabbitMQ实现手动Consumer Ack
你知道的越多,不知道的就越多,业余的像一棵小草!
你来,我们一起精进!你不来,我和你的竞争对手一起精进!
编辑:业余草
blog.csdn.net/LoveLacie
推荐:https://www.xttblog.com/?p=5162
一、Consumer Ack的三种方式
(1)、自动确认:acknowledge = “none”,这是默认的方式,如果不配置的话,默认就是自动确认,消费方从消息队列中拿出消息后,消息队列中都会清除掉这条消息(不安全).
(2)、手动确认:acknowledge = “manual”,手动确认就是当消费者取出来消息其后的操作正常执行后,返回给消息队列,让其清除该条消息;如果后续执行有异常,可以设置requeue=true返回其消息队列,再让其消息队列重新给消费者发送消息.
(3)、根据异常情况确认(很麻烦):acknowledge = “auto”.
二、进入主题:
SpringBoot+RabbitMQ实现手动Consumer Ack
1、pom文件中导入依赖坐标
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、在生产者与消费者工程yml配置文件中开启手动Ack
spring:
rabbitmq:
host: 192.168.253.128 #ip
username: guest
password: guest
virtual-host: /
port: 5672
listener:
simple:
acknowledge-mode: manual #开启手动Ack
3、在生产者工程中创建一个配置类声明队列与交换机的关系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
//交换机的名称;
public static final String DIRECT_EXCHANGE_NAME = "direct_boot_exchange";
//队列名称;
public static final String DIRECT_QUEUE_NAME = "direct_boot_queue";
/**
* 声明交换机,在以后我们会定义多个交换机,
* 所以给这个注入的Bean起一个名字,同理在绑定的时候用@Qualifier注解;
* durablie:持久化
*/
@Bean("directExchange")
public Exchange directExchange(){
return ExchangeBuilder.directExchange(DIRECT_EXCHANGE_NAME).durable(true).build();
}
//声明队列;
@Bean("directQueue")
public Queue testQueue(){
return QueueBuilder.durable(DIRECT_QUEUE_NAME).build();
}
//绑定交换机和队列,把上述声明的交换机、队列作为参数传入进来;
@Bean
public Binding bindDirectExchangeQueue(@Qualifier("directQueue") Queue queue,
@Qualifier("directExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("info").noargs();
}
}
4、在消费者工程中创建一个组件监听在生产者声明的队列
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class MyAckListener {
/**
*
* @param message 队列中的消息;
* @param channel 当前的消息队列;
* @param tag 取出来当前消息在队列中的的索引,
* 用这个@Header(AmqpHeaders.DELIVERY_TAG)注解可以拿到;
* @throws IOException
*/
@RabbitListener(queues = "direct_boot_queue")
public void myAckListener(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
System.out.println(message);
try {
/**
* 无异常就确认消息
* basicAck(long deliveryTag, boolean multiple)
* deliveryTag:取出来当前消息在队列中的的索引;
* multiple:为true的话就是批量确认,如果当前deliveryTag为5,那么就会确认
* deliveryTag为5及其以下的消息;一般设置为false
*/
channel.basicAck(tag, false);
}catch (Exception e){
/**
* 有异常就绝收消息
* basicNack(long deliveryTag, boolean multiple, boolean requeue)
* requeue:true为将消息重返当前消息队列,还可以重新发送给消费者;
* false:将消息丢弃
*/
channel.basicNack(tag,false,true);
}
}
}
5、在生产者中创建一个测试类来发送消息
import com.itlw.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ProducedTest {
//从IOC容器中拿模板类;
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test(){
//发送消息;
rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE_NAME,
"info","这是一条测试消息....");
}
}
5、启动消费者工程来接收此队列的消息
可以看到控制台输出了接收到的消息,并且因为已经被确认,所以队列中消息已经为0,要测出效果,手动添加一个异常.
6、手动添加一个异常
try {
/**
* 无异常就确认消息
* basicAck(long deliveryTag, boolean multiple)
* deliveryTag:取出来当前消息在队列中的的索引;
* multiple:为true的话就是批量确认,如果当前deliveryTag为5,那么就会确认
* deliveryTag为5及其以下的消息;一般设置为false
*/
int i = 3 / 0;//手动添加异常
channel.basicAck(tag, false);
} catch (Exception e) {
/**
* 有异常就绝收消息
* basicNack(long deliveryTag, boolean multiple, boolean requeue)
* requeue:true为将消息重返当前消息队列,还可以重新发送给消费者;
* false:将消息丢弃
*/
channel.basicNack(tag, false, true);
}
7、再次运行看结果
我设置了 channel.basicNack(tag, false, true);第三个requeue属性为true由队列又重新发送给消费者,消费者接收到消息后确认之前遇到了错误又重新拒收消息…所以进入了一个死循环
等暂停运行后,可以看到消息队列中还剩一条消息,就是消费者绝收的这条消息,如果把requeue设置为false,那么这个队列中将没有这条消息.