博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
activemq消息重发机制[转]
阅读量:6105 次
发布时间:2019-06-21

本文共 5208 字,大约阅读时间需要 17 分钟。

大家知道,JMS规范中,Message消息头接口中有setJMSRedelivered(boolean redelivered)和getJMSRedelivered()方法,用于设置和获取消息的重发标志,当然set方法主要是MOM来调用的,我们客户端使用的是get方法。

还记得当时阿里的电话面试曾问过我,你知道ActiveMQ中的消息重发时间间隔和重发次数吗?我当时尴尬了,只知道会重发,还真没去了解过其中的细节,所以最后被完美的“淘汰了”。
后来有时间了就去网上看了下官方的文档,所以现在把ActiveMQ中的重发机制和大家一起分享一下。
首先,我们得大概了解下,在哪些情况下,ActiveMQ服务器会将消息重发给消费者,这里为简单起见,假定采用的消息发送模式为队列(即消息发送者和消息接收者)。
1.如果消息接收者在处理完一条消息的处理过程后没有对MOM进行应答,则该消息将由MOM重发。
2.如果我们队某个队列设置了预读参数(consumer.prefetchSize),如果消息接收者在处理第一条消息时(没向MOM发送消息接收确认)就宕机了,则预读数量的所有消息都将被重发。
3.如果Session是事务的,则只要消息接收者有一条消息没有确认,或发送消息期间MOM或客户端某一方突然宕机了,则该事务范围中的所有消息MOM都将重发。
说到这里,大家可能会有疑问,ActiveMQ消息服务器怎么知道消费者客户端到底是消息正在处理中还没来得急对消息进行应答还是已经处理完成了没有应答或是宕机了根本没机会应答呢?其实在所有的客户端机器上,内存中都运行着一套客户端的ActiveMQ环境,该环境负责缓存发来的消息,负责维持着和ActiveMQ服务器的消息通讯,负责失效转移(fail-over)等,所有的判断和处理都是由这套客户端环境来完成的。
我们可以来对ActiveMQ的重发策略(Redelivery Policy)来进行自定义配置,其中的配置参数主要有以下几个:
a) collisionAvoidanceFactor :碰撞躲避因数,默认值是0.15,这个参数是为了躲避高并发的重发带来的问题,我们查看org.apache.activemq.RedeliveryPolicy类的源代码,
// +/-15% for a 30% spread -cgs
protected double collisionAvoidanceFactor = 0.15d;
protected long initialRedeliveryDelay = 1000L;
可以发现,该默认值带来的变动范围是正负百分之15,也就是有30%的范围,也就是说,如果延迟发送时间(也就是initialRedeliveryDelay 默认值)是1000毫秒,则该条消息第一次有可能被拖延850毫秒到1150毫秒之间后被发送,如果有第二次重发,基数就不是1000毫秒了,而是以上一次重发拖延时间为基础来算。源代码如下:
public long getNextRedeliveryDelay(long previousDelay) {
long nextDelay = redeliveryDelay;
if (previousDelay > 0 && useExponentialBackOff && backOffMultiplier > 1) {
nextDelay = (long) (previousDelay * backOffMultiplier);
if(maximumRedeliveryDelay != -1 && nextDelay > maximumRedeliveryDelay) {
// in case the user made max redelivery delay less than redelivery delay for some reason.
nextDelay = Math.max(maximumRedeliveryDelay, redeliveryDelay);
}
}
if (useCollisionAvoidance) {
/*
* First random determines +/-, second random determines how far to
* go in that direction. -cgs
*/
Random random = getRandomNumberGenerator();
double variance = (random.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor) * random.nextDouble();
nextDelay += nextDelay * variance;
}
return nextDelay;
}
b)maximumRedeliveries :最大重发次数,默认值是6,如果你想不限次数重发,可以设置成-1。同样是org.apache.activemq.RedeliveryPolicy类中的代码:
public static final int NO_MAXIMUM_REDELIVERIES = -1;
public static final int DEFAULT_MAXIMUM_REDELIVERIES = 6;
protected int maximumRedeliveries = DEFAULT_MAXIMUM_REDELIVERIES;
我们探究一下maximumRedeliveries 的get方法,可以发现有org.apache.activemq.ActiveMQSession和org.apache.activemq.ActiveMQMessageConsumer两个类中有用到:
其中ActiveMQSession中的代码如下:
// Figure out how long we should wait to resend this message.
long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
for (int i = 0; i < redeliveryCounter; i++) {
// 每次重发拖延时间都是以上一次重发拖延时间来算,所以这里for循环来取得最新的拖延时间
redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
}
// 交给定时任务重发
connection.getScheduler().executeAfterDelay(new Runnable() {
public void run() {
((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
}
}, redeliveryDelay);
ActiveMQMessageConsumer中的代码类似。
c)maximumRedeliveryDelay :重发最大拖延时间,默认为-1,表示没有最大拖延时间,此参数只有当useExponentialBackOff 为true时起效。同样是RedeliveryPolicy中的代码:
protected long maximumRedeliveryDelay = -1;
public long getNextRedeliveryDelay(long previousDelay) {
long nextDelay = redeliveryDelay;
if (previousDelay > 0 && useExponentialBackOff && backOffMultiplier > 1) {
nextDelay = (long) (previousDelay * backOffMultiplier);
if(maximumRedeliveryDelay != -1 && nextDelay > maximumRedeliveryDelay) {
// in case the user made max redelivery delay less than redelivery delay for some reason.
nextDelay = Math.max(maximumRedeliveryDelay, redeliveryDelay);
}
}
。。。。。
}
看源代码就显而易见了。
d)initialRedeliveryDelay :第一次重发的拖延时间基础,默认是1000,单位为毫秒,前面讲collisionAvoidanceFactor 属性时已经提到过,这里不再多说。
e)redeliveryDelay :如果initialRedeliveryDelay 为0,则使用redeliveryDelay ,默认也是1000。RedeliveryPolicy中源代码如下:
protected long initialRedeliveryDelay = 1000L;
protected long redeliveryDelay = initialRedeliveryDelay;
f)useCollisionAvoidance :消息重发时是否采用前面提到的碰撞避免collisionAvoidanceFactor 参数,默认是false,不采用。源代码上面也给出了,这里不再多说。
g)useCollisionAvoidance :是否使用成倍增加拖延,默认为false,如果我们希望重发的拖延时间一次比一次大很多,则可以设置它为true。上面已经给出过源代码,这里再次给出:
protected boolean useExponentialBackOff;
protected double backOffMultiplier = 5.0;
public long getNextRedeliveryDelay(long previousDelay) {
long nextDelay = redeliveryDelay;
if (previousDelay > 0 && useExponentialBackOff && backOffMultiplier > 1) {
nextDelay = (long) (previousDelay * backOffMultiplier);
if(maximumRedeliveryDelay != -1 && nextDelay > maximumRedeliveryDelay) {
// in case the user made max redelivery delay less than redelivery delay for some reason.
nextDelay = Math.max(maximumRedeliveryDelay, redeliveryDelay);
}
}
if (useCollisionAvoidance) {
/*
* First random determines +/-, second random determines how far to
* go in that direction. -cgs
*/
Random random = getRandomNumberGenerator();
double variance = (random.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor) * random.nextDouble();
nextDelay += nextDelay * variance;
}
return nextDelay;
}
可以看出,成倍拖延是将上一次拖延时间乘以backOffMultiplier来实现的,而 backOffMultiplier默认为5.
h)backOffMultiplier :成倍拖延时间的倍率,默认为5,上面已经提到了,这里不再多说。

 

转自:http://www.tuicool.com/articles/iMb2Y3

 

转载于:https://www.cnblogs.com/olmlo/p/4708660.html

你可能感兴趣的文章
使用正态分布变换(Normal Distributions Transform)进行点云配准
查看>>
java 泛型基础问题汇总
查看>>
用Apache+mod_wsgi部署python程序 作者:leven | 日期2010-11-29 00:09:37
查看>>
python的对象复制,深复制和浅复制
查看>>
groovy multimap
查看>>
合并Excel文件
查看>>
Android中文API合集(6) + 开发者指南合集(1) (chm格式)
查看>>
如何定义一个简单的Concurrent Program
查看>>
Java线程池主线程等待子线程执行完成
查看>>
分表处理设计思想和实现
查看>>
iOS_时间相关
查看>>
Win10 UWP系列:关于错误 0x80073CF9及一个小bug的解决
查看>>
虚拟化之lxc
查看>>
django之异常错误3(Student matching query does not exist.)
查看>>
Android图片控件,跟随列表(recyclerView)的上下滚动而同步平移。
查看>>
阿里巴巴开源框架JarsLink
查看>>
亲历一次vscode搭建flutter开发环境
查看>>
CSS水平垂直居中解决方案
查看>>
如此,用dep获取私有库
查看>>
了解HTML5中的MutationObserver
查看>>