博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ中的事务与confirmSelect模式
阅读量:3559 次
发布时间:2019-05-20

本文共 17118 字,大约阅读时间需要 57 分钟。

好久没写技术文章了,由于公司马上要做消息相关的业务,所以最近在Docker上搭了一台RabbitMQ并研究研究。

从网易蜂巢上拉取的镜像:

docker pull hub.c.163.com/library/rabbitmq:latest

启动容器:

docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

查看容器启动情况:

docker psCONTAINER ID        IMAGE                   COMMAND                  CREATED             STATUS              PORTS                                                                                        NAMES05fb983beef4        rabbitmq:3-management   "docker-entrypoint.s…"   3 days ago          Up About an hour    4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp   my-rabbit1

我们可以看到, 主机的15672映射到docker的15672端口,主机的5672映射到docker的5672端口.

在浏览器中输入网址:http://ip:5672/,输入用户名/密码:guest/guest,即可进入RabbitMQ的主界面。

简单的搭建过程就是这样,废话不多说,接下来介绍RabbitMQ事务方面的问题(本文的部分截图来自于网络)。

让我们先看一个RabbitMQ的小例子:

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class ProducerDemo {    public static void main(String[] args)throws Exception {        ConnectionFactory factory = new ConnectionFactory();        factory.setUsername("guest");        factory.setPassword("guest");        factory.setVirtualHost("/");        factory.setHost("172.16.41.232");        factory.setPort(5672);        Connection conn = factory.newConnection();        Channel channel = conn.createChannel();        try{            String exchangeName = "exchangeName";            String routingKey = "routingKey";            String queueName = "queueName";            channel.exchangeDeclare(exchangeName,"direct",true);            channel.queueDeclare(queueName,true,false,false,null);            channel.queueBind(queueName,exchangeName,routingKey);            byte [] messageBodyBytes = "Hello World!" .getBytes();            for(int i = 0;i<100;i++) {                channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);            }        }catch (Exception e){            e.printStackTrace();        }finally {            channel.close();            conn.close();        }    }}

通过上面的代码,消息生产者Producer向Broker发送100条消息(什么是Broker本文暂不做解释,请自行百度),然而生产环境异常复杂,我们怎么确定Broker收到Producer的消息了呢??类似于JDBC中的事务:①开启事务--> ②update/insert/delete-->3成功commit失败rollback,我们来看RabbitMQ对事务的控制。

1、txSelect()、txCommit()与txRollback()

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class ProducerDemo {    public static void main(String[] args)throws Exception {        ConnectionFactory factory = new ConnectionFactory();        factory.setUsername("guest");        factory.setPassword("guest");        factory.setVirtualHost("/");        factory.setHost("172.16.41.232");        factory.setPort(5672);        Connection conn = factory.newConnection();        Channel channel = conn.createChannel();        try{            String exchangeName = "exchangeName";            String routingKey = "routingKey";            String queueName = "queueName";            channel.exchangeDeclare(exchangeName,"direct",true);            channel.queueDeclare(queueName,true,false,false,null);            channel.queueBind(queueName,exchangeName,routingKey);            byte [] messageBodyBytes = "Hello World!" .getBytes();            channel.txSelect(); //开启事务            channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);            channel.txCommit(); //提交事务        }catch (Exception e){            e.printStackTrace();            channel.txRollback();   //回滚        }finally {            channel.close();            conn.close();        }    }}

在通过txSelect开启事务之后,我们便可以发布消息给broker服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。

通过wireshark抓包,我们可以看到事务对RabbitMQ性能的影响。

è¿éåå¾çæè¿°

在事务中,整个过程如下:

Tx.Select-->Tx.Select-OK-->Basic.Publish-->Tx.Commit-->Tx.Commit-OK(注意这里的Tx.Commit与Tx.Commit-Ok之间的时间间隔294ms,由此可见事务还是很耗时的。)

我们再来看看没有事务时的通信是怎样的:

只有Basic.Publish

最后我们看看事务回滚时的通信:

try{        channel.txSelect(); //开启事务        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);         int i = 1/0;             channel.txCommit(); //提交事务 }catch (Exception e){        e.printStackTrace();        channel.txRollback();   //回滚 }finally {        channel.close();        conn.close();}

è¿éåå¾çæè¿°

Tx.Select-->Tx.Select-OK-->Basic.Publish-->Tx.Rollback-->Tx.Rollback-OK

事务确实可以判断producer向Broker发送消息是否成功,只有Broker接受到消息,才会commit,但是使用事务机制的话会降低RabbitMQ的性能,那么有没有更好的方法既能保障producer知道消息已经正确送到,又能基本上不带来性能上的损失呢?从AMQP协议的层面看是没有更好的方法,但是RabbitMQ提供了一个更好的方案,即将channel信道设置成confirm模式。

2、Comfirm模式

 生产者将Channel设置成confirm模式,一旦Channel进入confirm模式,所有在该Channel上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理;

       confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等Channel返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息;

       开启Comfire模式的方法:

channel.confirmSelect();

这里注意一下:txSelect与Confirm模式不能共存。

Confirm模式的三种编程方式:

  1. 串行confirm模式:peoducer每发送一条消息后,调用waitForConfirms()方法,等待broker端confirm。
  2. 批量confirm模式:producer每发送一批消息后,调用waitForConfirms()方法,等待broker端confirm。
  3. 异步confirm模式:提供一个回调方法,broker confirm了一条或者多条消息后producer端会回调这个方法。

我们分别来看看这三种confirm模式

1、串行confirm模式

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class ProducerDemo {    public static void main(String[] args)throws Exception {        ConnectionFactory factory = new ConnectionFactory();        factory.setUsername("guest");        factory.setPassword("guest");        factory.setVirtualHost("/");        factory.setHost("172.16.41.232");        factory.setPort(5672);        Connection conn = factory.newConnection();        Channel channel = conn.createChannel();        String exchangeName = "exchangeName";        String routingKey = "routingKey";        String queueName = "queueName";        channel.exchangeDeclare(exchangeName,"direct",true);        channel.queueDeclare(queueName,true,false,false,null);        channel.queueBind(queueName,exchangeName,routingKey);        byte [] messageBodyBytes = "Hello World!" .getBytes();        channel.confirmSelect();    //开启confirm模式        try{            for(int i = 0;i<50;i++) {                channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);                if (channel.waitForConfirms()) {  //broker confirm后producer调用                    System.out.println("发送成功");                } else {                    System.out.println("发送失败");                }            }        }catch (Exception e){            e.printStackTrace();        }finally {            channel.close();            conn.close();        }    }}

通过循环,发送了50条消息,在channel.waitForConfirms()等待broker发送ack或nack,这种模式每发送一条消息就会等待broker代理服务器返回消息,通过抓包我们可以看到:

2、批量confirm模式:

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class ProducerDemo {    public static void main(String[] args)throws Exception {        ConnectionFactory factory = new ConnectionFactory();        factory.setUsername("guest");        factory.setPassword("guest");        factory.setVirtualHost("/");        factory.setHost("172.16.41.232");        factory.setPort(5672);        Connection conn = factory.newConnection();        Channel channel = conn.createChannel();        String exchangeName = "exchangeName";        String routingKey = "routingKey";        String queueName = "queueName";        channel.exchangeDeclare(exchangeName,"direct",true);        channel.queueDeclare(queueName,true,false,false,null);        channel.queueBind(queueName,exchangeName,routingKey);        byte [] messageBodyBytes = "Hello World!" .getBytes();        channel.confirmSelect();    //开启confirm模式        try{            for(int i = 0;i<50;i++) {                channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);            }            if (channel.waitForConfirms()) {  //broker confirm后producer调用                System.out.println("发送成功");            } else {                System.out.println("发送失败");            }        }catch (Exception e){            e.printStackTrace();        }finally {            channel.close();            conn.close();        }    }}

通过循环批量发送50条消息,但只在控制台输出了一行“发送成功”,该方法会等到最后一条消息得到ack或者得到nack才会结束,也就是说在waitForConfirms处会造成当前程序的阻塞,这点我们看出broker端默认情况下是进行批量回复的,并不是针对每条消息都发送一条ack消息;

3、异步confirm模式:

通过添加监听器,如果broker返回ack,producer回调handleAck,返回nack,producer回调handleNack

import com.rabbitmq.client.*;import java.io.IOException;public class Demo01_ConnectionMQ_Provider {    public static void main(String[] args)throws Exception {        ConnectionFactory factory = new ConnectionFactory();        factory.setUsername("guest");        factory.setPassword("guest");        factory.setVirtualHost("/");        factory.setHost("172.16.41.232");        factory.setPort(5672);        Connection conn = factory.newConnection();        Channel channel = conn.createChannel();        String exchangeName = "exchangeNamex";        String routingKey = "routingKeyx";        String queueName = "queueNamex";        channel.exchangeDeclare(exchangeName,"direct",true);        channel.queueDeclare(queueName,true,false,false,null);        channel.queueBind(queueName,exchangeName,routingKey);        byte [] messageBodyBytes = "你好,世界!" .getBytes();        try{            channel.confirmSelect();    // 开启confirm模式            long start  = System.currentTimeMillis();            //设置监听器            channel.addConfirmListener(new ConfirmListener() {                public void handleAck(long deliveryTag, boolean multiple) throws IOException {                    System.out.println("ack:deliveryTag:"+deliveryTag+",multiple:"+multiple);                }                public void handleNack(long deliveryTag, boolean multiple) throws IOException {                    System.out.println("nack:deliveryTag:"+deliveryTag+",multiple:"+multiple);                }            });                        for(int i = 0;i<100;i++) {   //循环发消息                channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);            }        }catch (Exception e){            e.printStackTrace();        }finally {            channel.close();            conn.close();        }    }}

在控制台输出结果:

ack:deliveryTag:8,multiple:true
ack:deliveryTag:1,multiple:falseack:deliveryTag:3,multiple:trueack:deliveryTag:6,multiple:trueProcess finished with exit code 0

可以看到,发送100条消息,收到的ack个数不一样。你多次运行程序会发现每次发送回来的ack消息中的deliveryTag域的值并不是一样的,说明broker端批量回传给发送者的ack消息并不是以固定的批量大小回传的;

由于是异步的,producer不需要等待broker返回ack任可以继续发送消息,比channel.waitForConfirms()速度快很多。

3、性能测试

Client端机器和RabbitMQ机器配置:CPU:24核,2600MHZ, 64G内存,1TB硬盘。 

Client端发送消息体大小10B,线程数为1即单线程,消息都持久化处理(deliveryMode:2)。 
分别采用事务模式、普通confirm模式,批量confirm模式和异步confirm模式进行producer实验,比对各个模式下的发送性能。 

è¿éåå¾çæè¿°

发送平均速率:

  • 事务模式(tx):1637.484
  • 普通confirm模式(common):1936.032
  • 批量confirm模式(batch):10432.45
  • 异步confirm模式(async):10542.06

可以看到事务模式性能是最差的,普通confirm模式性能比事务模式稍微好点,但是和批量confirm模式还有异步confirm模式相比,还是小巫见大巫。批量confirm模式的问题在于confirm之后返回false之后进行重发这样会使性能降低,异步confirm模式(async)编程模型较为复杂,至于采用哪种方式,看情况喽。

4、Consumer端的消息确认

与producer端类似,为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制。consumer在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。

采用消息确认机制后,只要令noAck=false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直持有消息直到消费者显式调用basicAck为止。

当noAck=false时,对于RabbitMQ服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者ack信号的消息。如果服务器端一直没有收到消费者的ack信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。

让我们来看代码:

1、consumer自动向broker发送ack

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;public class Consumer {    public static void main(String[] args)throws Exception {        ConnectionFactory factory = new ConnectionFactory();        factory.setUsername("guest");        factory.setPassword("guest");        factory.setVirtualHost("/");        factory.setHost("172.16.41.190");        factory.setPort(5672);        Connection conn = factory.newConnection();        Channel channel = conn.createChannel();        String queueName = "queueNamex";        QueueingConsumer consumer = new QueueingConsumer(channel);        //设置为true,consumer自动向broker发送ack        channel.basicConsume(queueName, true, consumer);        for(int i=0;i<100;i++){            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            String msg = new String(delivery.getBody());            System.out.println(msg);    //打印消息        }        channel.close();        conn.close();    }}

        假设有100条消息,consumer 调用

channel.basicConsume(queueName, true, consumer);

设置为true自动向broker发送ack,最后关闭链接。读者可以在rabbitmq的管理界面看到消息从100条减少到0条。

2、consumer手动向broker发送ack

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;public class Consumer {    public static void main(String[] args)throws Exception {        ConnectionFactory factory = new ConnectionFactory();        factory.setUsername("guest");        factory.setPassword("guest");        factory.setVirtualHost("/");        factory.setHost("172.16.41.190");        factory.setPort(5672);        Connection conn = factory.newConnection();        Channel channel = conn.createChannel();        String queueName = "queueNamex";        QueueingConsumer consumer = new QueueingConsumer(channel);        //设置为false,consumer手动向broker发送ack        channel.basicConsume(queueName, false, consumer);        for(int i=0;i<100;i++){            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            String msg = new String(delivery.getBody());            //consumer手动向broker发送ack            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);            System.out.println(msg);        }        channel.close();        conn.close();    }}

在consumer端,调用

channel.basicConsume(queueName, false, consumer);和 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

手动向broker发送ack确认消息被接受,随后关闭链接。

3、consumer不发送ack并且consumer断开链接:这一点要注意让我们来看下面的代码和rabbitmq的管理界面

我用producer发送了100条消息,可以看到,Ready=100,Unacked=0,Total=100;

  如果我在Consumer端,设置为手动发送ack方式但最后一直没有发送ack,并且在读取消息后立刻关闭链接

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;public class Consumer {    public static void main(String[] args)throws Exception {        ConnectionFactory factory = new ConnectionFactory();        factory.setUsername("guest");        factory.setPassword("guest");        factory.setVirtualHost("/");        factory.setHost("172.16.41.190");        factory.setPort(5672);        Connection conn = factory.newConnection();        Channel channel = conn.createChannel();        String queueName = "queueNamex";        QueueingConsumer consumer = new QueueingConsumer(channel);        //设置为false,consumer手动向broker发送ack        channel.basicConsume(queueName, false, consumer);        for(int i=0;i<100;i++){            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            String msg = new String(delivery.getBody());            //不发送ack给broker        //    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);            System.out.println(msg);        }        //关闭连接        channel.close();        conn.close();    }}

我们再来运行Consumer,来看看输出的结果和rabbitmq的管理界面:

打印了100条消息,但是从rabbitmq的管理界面来看,消息数任仍为100条,并没有被消费掉,这就验证了我前面的话:

如果服务器端一直没有收到消费者的ack信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。

4、consumer不发送ack,并且没有关闭连接

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;public class Consumer {    public static void main(String[] args)throws Exception {        ConnectionFactory factory = new ConnectionFactory();        factory.setUsername("guest");        factory.setPassword("guest");        factory.setVirtualHost("/");        factory.setHost("172.16.41.190");        factory.setPort(5672);        Connection conn = factory.newConnection();        Channel channel = conn.createChannel();        String queueName = "queueNamex";        QueueingConsumer consumer = new QueueingConsumer(channel);        //设置为false,consumer手动向broker发送ack        channel.basicConsume(queueName, false, consumer);        for(int i=0;i<100;i++){            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            String msg = new String(delivery.getBody());            //不发送ack给broker        //    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);            System.out.println(msg);        }    }}

我们可以看到,Ready变为0,而Unacked变为100,表示consumer没有向broker发送ack,前面我们说过,只有consumer向broker发送了ack,broker才会删除消息,所以此时broker并没有删除消息,如果消费者再次正常消费,依然可以获得消息。

 

这就是我这几天来对RabbitMQ事务方面的理解,谢谢大家,欢迎转载。

你可能感兴趣的文章
Scala-面向对象后章
查看>>
iOS蓝牙原生封装,助力智能硬件开发
查看>>
iOS 代码的Taste(品位)
查看>>
iOS开发代码规范
查看>>
iOS组件化实践(基于CocoaPods)
查看>>
【iOS学习】RxSwift从零入手 - 介绍
查看>>
数据结构之栈
查看>>
Elastic Stack简介
查看>>
关于deepin系统安装design compiler的问题解答
查看>>
Java Agent简介及使用Byte Buddy和AspectJ LTW监控方法执行耗时
查看>>
记录一下最近的学习经历
查看>>
hadoop3.0+spark2.0两台云服务器集群环境配置。
查看>>
网站实现qq登录(springboot后台)
查看>>
简单的用户头像修改功能(springboot后台)
查看>>
springboot+mybatis实现分页
查看>>
leetcode332. 重新安排行程
查看>>
为什么局域网网段不同不能通信?
查看>>
itchat微信助手,kaggle 电影数据集分析,基于内容的电影推荐
查看>>
认识和使用JWT
查看>>
通过springboot框架,自己动手实现oauth2.0授权码模式认证
查看>>