ActiveMQ有人用过荃芬没有没

使用ActiveMQ时遇到的坑 - u的专栏 - CSDN博客
使用ActiveMQ时遇到的坑
最近项目要做一个消息系统,选来选去,最后锁定到了ActiveMQ上,但是在使用中遇到了一个坑,分享给大家。
在使用PooledConnectionFactory做连接池来优化的时候,我原先按照官方API以及网上其他的相关资料引用了下面的包:
geronimo-j2ee-management_1.1_spec-1.0.1.jar
geronimo-jms_1.1_spec-1.1.1.jar
slf4j-api-1.7.5.jar
activemq-all-5.9.0.jar
activemq-client-5.9.0.jar
activemq-core.jar
activemq-protobuf-1.1.jar
activemq-pool-5.9.0.jar
在最开始的时候,使用每次都获取一个新的连接,使用完成后就立即关闭。这样在创建连接的时候是会消耗很多资源的,所以采用了连接池
好了,开始坑了,不管是网上的哪一篇帖子(我所浏览的)都使用的相同的jar包,在使用下面语句的时候都没说会报错:
//设置最大连接数
pooledConnectionFactory.setMaxConnections(200);
//设置最小
pooledConnectionFactory.setMaximumActiveSessionPerConnection(50);
//后台对象清理时,休眠时间超过了3000毫秒的对象为过期
pooledConnectionFactory.setTimeBetweenExpirationCheckMillis(3000);但是在我实际使用的时候,都是错的。表示没有相关的方法,仔细查阅文档后才发现:
PooledConnectionFactory的全限定名为:org.apache.activemq.jms.pool.PooledConnectionFactory
但是我目前应用的包都是找不到这个这个地方的,使用的是org.apache.activemq.pool.PooledConnectionFactory这个,而这个里面是没有相关的方法的,然后回去找所以的jar包,才发现,我们应该使用的是activemq-jms-pool-5.9.0.jar而不是activemq-pool-5.9.0.jar。
除此之外,这里还需要另一个依赖包:commons-pool-1.6.jar,只有将这个包也引入之后才能正常使用。
OK,这就是在使用是遇到的一个问题,但是在网上却没有找到资源解决,撰此文以记之,同时也分享给大家。
我的热门文章博客分类:
转载自:http://windows9834./blog/static//
这是官方推荐要看的原文的翻译:
异步性(asynchronicity),是高伸缩性系统的
,对Java来说,这意味着,然后又意味着。但是?一个人很有可能会不知所措,在谈论容器、框架、和一堆选项时。那么让我们分开来探讨下。
ActiveMQ文档中提及两个框架:和。那么要做决定就取决于简单性和功能性的对比。Camel支持大量的(Enterprise Integration Patterns),它可以大大简化集成组件间的大量服务和复杂的消息流。如果你的系统需要这样的功能,它当然是非常好的选择。然而,如果你寻找简单性,仅仅支持基本的最佳实践,那么Spring是好的选择。
JCA (使用它或放弃它)
阅读ActiveMQ的时,开发者立刻被介绍了JCA容器和ActiveMQ很多的代理和选项的概念。然而这些都不适应。JCA是规范的一部分,对于大部分EJB规范,Spring都没有提供很好的支持。这时,就提到了,”为Spring提供的轻量级JCA容器”,它可以作为ActiveMQ的。 乍看上去,这是个非常好的解决方案,但是我在这里告诉你不要这样做。Jencks最后更新时间是2007年1月。那时ActiveMQ的版本是 4.1x,Spring的版本是2.0.x,但是时间过了很多,事情也变化了很大。甚至是试图从Maven库中获取ActiveMQ4.1的依赖包 Jencks都会失败,因为它已经不存在了。简单的事实是有更好和更简单的方式来缓存资源。
Spring消息发送的核心架构是。在传统的Spring模板方式中,JmsTemplate隔离了像打开、关闭Session和Producer的繁琐操作,因此应用开发人员仅仅需要关注实际的业务逻辑。然而ActiveMQ快速的指出了,大多是因为JmsTemplate被设计在每次调用时都打开和关闭session及producer。JmsTemplate损害了ActiveMQ的对session和消息producer的缓存机制而带来的性能提升。然而,这个太过时了。从Spring2.5.3开始,它开始提供自己的,我相信该类在缓存方面更加强大。然而,这里有一个点需要注意。默认情况下,CachingConnectionFactory只缓存一个session,在它的JavaDoc中,它。与之相反,PooledConnectionFactory的。这些设置,在很多情况下,需要亲自去测试并验证。我将其设置为100,对我来说还是很不错。
可能你已经注意了,强 烈的不建议你使用JmsTemplate的receive()调用,同样也是因为没有session和consumer的池机制。更多的原因是在 JmsTemplate上的所有调用都是同步的,这意味着调用线程需要被阻塞,直到方法返回。这在使用JmsTemplate发送消息时,没有任何问题, 因为该方法几乎是立即返回。然而,当调用receive方法时,线程将阻塞,直到接收到一个消息为止,这对性能影响很大。不幸的是,和中都没有对该问题提供简单的解决方案。实际上,它们均提倡使用,我们之前已经说过。实际的解决方案,使用,已经写在了文 档中。DefaultMessageListenerContainer允许异步接收消息并缓存session和消息consumer。更有趣的 是,DefaultMessageListenerContainer可以根据消息数量动态的增加或缩减监听器的数量。简而言之,我们可以完全忽视 JCA。
Spring Context配置文件
&? xml version ="1.0" encoding ="UTF-8"?&xmlns="http://www.springframework.org/schema/beans"xmlns:amq="http://activemq.apache.org/schema/core"xmlns:jms="http://www.springframework.org/schema/jms"xmlns:context="http://www.springframework.org/schema/context"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-2.5.xsd"&!-- enables annotation based configuration --&&!-- scans for annotated classes in pany package --&base-package="net.javasight"&!--
allows for ${} replacement in the spring xml configuration from the
system.properties file on the classpath
--&location="classpath:system.properties"&!-- creates an activemq connection factory using the amq namespace --&id="amqConnectionFactory"brokerURL="${jms.url}"userName="${jms.username}"password="${jms.password}"&!--
CachingConnectionFactory Definition, sessionCacheSize property is the
number of sessions to cache
--&id="connectionFactory"class="org.springframework.jms.connection.CachingConnectionFactory"ref="amqConnectionFactory"name="exceptionListener"ref="jmsExceptionListener"name="sessionCacheSize"value="100"&!-- JmsTemplate Definition --&id="jmsTemplate"class="org.springframework.jms.core.JmsTemplate"ref="connectionFactory"&!--
listener container definition using the jms namespace, concurrency is
the max number of concurrent listeners that can be started
--&concurrency="10"id="QueueListener"destination="Queue.Name"ref="queueListener"
这里有两点需要注意。首先,我添加了amq和jms的命名空间。第二,我使用了Spring2.5的。通过使用基于注解的配置,我可以简单的添加注解到我的Java类上,而不是使用XML文件显式的在spring配置文件中定义它们。另外,我可以在我的构造方法中使用来将像JmsTemplate这样的对象注入到我的对象中。
QueueSender
package net.javasight;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
publicclassQueueSender{
privatefinalJmsTemplate jmsTemplate;
@Autowired
publicQueueSender(finalJmsTemplate jmsTemplate){
this.jmsTemplate = jmsTemplate;
publicvoid send(finalString message){
jmsTemplate.convertAndSend("Queue.Name", message);
Queue Listener
package net.javasight;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;import org.springframework.stereotype.Component;@ComponentpublicclassQueueListenerimplementsMessageListener{publicvoid onMessage(finalMessage message){if(message instanceofTextMessage){finalTextMessage textMessage =(TextMessage) message;try{System.out.println(textMessage.getText());}catch(finalJMSException e){
e.printStackTrace();}}}}
JmsExceptionListener
package net.javasight;import javax.jms.ExceptionListener;import javax.jms.JMSException;import org.springframework.stereotype.Component;@ComponentpublicclassJmsExceptionListenerimplementsExceptionListener{publicvoid onException(finalJMSException e){
e.printStackTrace();}}
浏览: 26956 次
来自: 北京一、本文目的
& & & & 介绍如何在同一台虚拟机上搭建高可用的Activemq服务,集群数量包含3个Activemq,当Activemq可用数&=2时,整个集群可用。
& & & & 本文Activemq的集群数量为3个,分别命名为mq1,mq2,mq3
二、概念介绍
& & & 集群搭建在同一台虚拟机上,3个Activemq分别使用不同的端口提供服务,启用1个为Master,其它2个为Slaver,同一时间仅Master队列提供服务
& & & 3个Activemq服务,同一时间仅Master队列提供服务,当Master队列挂掉后,其它2个Slaver自动选举出1个成为Master,整个队列服务依然可用。当挂掉的队列重新恢复后,自动加入集群。当集群仅剩下1个队列时,整个队列不可用。
3、Activemq集群数据存储方式
& & & a)&kahaDB:文件共享,默认方式
& & & b) JDBC:数据库共享
& & & c)&LevelDB:数据共享,本文使用方式
三、Activemq伪集群的搭建
1、Activemq的端口介绍
& & &&Activemq默认主要使用2个端口,8161(控制台使用)、61616(提供服务的端口),如果需要搭建集群,还需要开放集群间通讯的端口(主要用于选举Master)
2、Activemq集群端口的分配
集群通讯接口
服务接口没有使用默认的61611是因为activemq默认还会使用等端口,如何开放端口及配置控制台端口请自行百度
3、修改activemq配置
a) 安装activemq,本文使用Activemq版本为5.11.1,安装过程略
b) 修改配置文件activemq.xml,路径为conf/activemq.xml
& & &1、broker(所有activemq的brokerName必须一致,才能加入同一个集群)
&broker xmlns="" brokerName="V1MQ" dataDirectory="${activemq.data}"&
& & 2、配置levelDB,加载&broker&节点内
& & & & & &bind:集群间通讯的ip和端口
& & & & & &zkAddress:ZooKeeper地址,多个可用,逗号分隔
& & & & & &hostname:主机名,可在/etc/hosts中进行配置
& & & & & &zkPath:zkPath目录,可在ZooInspetor中进行查看
&persistenceAdapter&
&&&&&&&&&&replicatedLevelDB
&&&&&&&&&&&&&directory="${activemq.data}/leveldb"
&&&&&&&&&&&&&replicas="3"
&&&&&&&&&&&&&bind=""
&&&&&&&&&&&&&zkAddress="192.168.146.130:2181"
&&&&&&&&&&&&&hostname="V1"
&&&&&&&&&&&&&zkPath="/activemq/leveldb-stores"
&&&&&&&&&&&&&/&
&&&&&&/persistenceAdapter&
4、启动activemq
& & &&/usr/local/src/activemq1/bin/activemq start
& & & 可通过/usr/local/src/activemq1/data/activemq.log查看启动日志
5、关于管控台
& & & &虽然3个activemq都启动了,但是同一时间只有Master对应的管控台可用,Slaver对应的管控台不可用
四、结合ZooInspector测试
1、打开ZooInspector(可自行搜索下载或从群中下载),输入ZooKeeper地址进行监控,如果3个activemq都启动成功,则显示如下:
2、Java测试代码
& & & &代码可参考:
& & & &a) 配置集群IP(这里3个activemq的端口分别是,51513)
&bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"&
&&&&&property name="brokerURL" value="failover:(,,)" /&
&&&&&property name="trustAllPackages" value="true"/&
&&&&&property name="useAsyncSend" value="true" /&
& & & & b) &测试代码
&&public void produceMsg_DefaultQueue() {
&&&&&&for (int i = 0; i & 10000; i++) {
&&&&&&&&&&final String msg ="序号:"+String.valueOf(i) + " " +"这里是向默认队列发送的消息" + new Date().toString();
&&&&&&&&&&System.out.println(msg);
&&&&&&&&&&String destination = jmsTemplate.getDefaultDestination().toString();
&&&&&&&&&&jmsTemplate.send(new MessageCreator() {
&&&&&&&&&&&&&&public Message createMessage(Session session) throws JMSException {
&&&&&&&&&&&&&&&&&&return session.createTextMessage(msg);
&&&&&&&&&&&&&&}
&&&&&&&&&&});
&&&&&&&&&&try {
&&&&&&&&&&&&&&Thread.sleep(300);
&&&&&&&&&&} catch (InterruptedException e) {
&&&&&&&&&&&&&&e.printStackTrace();
&&&&&&&&&&}
3、启动Java程序并发送消息
4、关闭Master队列(注意此时的Master队列为mq2)
a) & 此时的ZooInspetor
5、重启mq2队列(重启后mq2加入队列成为Slaver,但是mq1还是Master,队列不受影响)
6、关闭mq1、mq2,仅剩mq3(由于只有一个队列,无法进行选举,所以整个队列都无法提供服务)
7、重启mq1(重启mq1后,mq1和mq3选举了Master队列,从而重新对外提供服务)
a) ZooInspecto显示mq1被选举为Master
b) 程序重新向队列消息,数据并没有中断&
& & & & 本文演示了activemq伪集群的搭建过程及高可用的测试过程,通过举一反三可以将activemq部署到不同的机器上,从而实现相同的功能。
& & & & 遗留问题:ZooInspetor的activemq编号是自动生成的,很难发现其对应的队列,请问有没有可以自定义编号的方法,谢谢。
六、参考资料
阅读排行榜ActiveMQ使用问题整理 - 简书
ActiveMQ使用问题整理
producer的复用
一开始研究ActiveMQ的资料时,官网只说了connection和session最好复用,在我认识里,producer和consumer是由session创建出来的,session复用没问题,producer应该资源占用没那么大吧?
所以在一开始对项目组暴露的封装好的api接口中,client每发送一条消息,都会自动创建一个producer,随着发送消息的越来越多,在mq控制台上看到的生产者数量就越来越多,mq内存占用越来越大,从而导致mq的宕机。
最后的解决方法是在程序里复用producer,由于client需要持续往三个队列发送消息,因此使用了hashmap来存放建好的producer,map里键是目的地,值则是MessageProducer对象。
每次发送消息时,需要检查是否producer可用,主要是检查会话是否正常。
failover机制的问题
在项目上线后,需求方要求Agent可以在故障时自动切换到另一个MQ集群以实现容错,且在原集群正常后可以自动切换回来。
这个需求直觉上就是ActiveMQ的failover机制,只需要在建立连接时把randomize设置为false,然后加个priorityBackup=true,failover中配置上两个MQ集群的F5就OK,实际上我们也是这么做的。
上线后才发现了一个问题,只要有一点网络抖动,或者因为inactivityMonitor导致的连接断开,那么Agent就会自动连接到另一个集群,而后再尝试连接回来。也就是说,不会尝试多次第一个地址,而是直接尝试第二个可用的地址,就算将配置写成
ip1,ip1,ip2
的形式也不行。因为源码中在识别uri添加uri到重连列表时有一个去重的机制,在FailoverTransport中:
public void add(boolean rebalance, URI u[]) {
boolean newURI =
for (URI uri : u) {
if (!contains(uri)) {
uris.add(uri);
if (newURI) {
reconnect(rebalance);
由于我们使用的是MQ上自带的系统消息来判断Agent是否在线,这样就会出现两个集群的ActiveMQ都会上送一条agent连接到该集群的提示,如果离线消息处理不及时,上层就会出现一个agent连接两个mq的情况。
实际解决方法还没有想到,目前想到的就是先把InactivityMonitor给关了,再看看效果吧
Channel was inactive for too long
这个问题出现在使用了上述的failover机制后,由于InactivityMonitor机制,一般情况下如果没有数据包,那么MQ和Agent会互相发送keepAlive Info数据包,是不会出现问题的。但是在MQ负载较高的情况下,会出现无法发送KeepAlive Info的问题(推测),因此InactivityMonitor就会断开连接,而后自动重连到另一个集群再连接回来。
在这种情况下,考虑两种方案
1.彻底关闭该功能
2.延长检测时间,比如10分钟内没数据交互才关闭。
目前暂定方案2具体测试情况待上线后检验
无限重连导致连接暴增
在测试环境上发现一个场景,agent不停发起对mq的重连,但是连接未被关闭,一段时间后,火墙就被这些连接占满了。
现在初步判断原因是mq被连接撑爆,导致mq不停关闭连接,agent又不停尝试重连,具体需要再进行测试。
这个问题的解决方法有两种
1.增加mq集群数量,限制每台mq的连接数
2.放弃failover机制,而是改用自己写的逻辑来进行重连。
具体解决方法待确定,后续更新
KahaDB无法及时清理db-*.log
问题原因已经找到,是ActiveMQ的一个bug:
ActiveMQ使用KahaDB进行消息的持久化存储,在KahaDB的db-.log文件中,存放的是消息和消息被消费者接收的ack数据(messages and acknowledgement),只有一个db-.log文件中所有的消息都被消费(都存在ack)后,这个db-*.log才会被清理。
那么现在存在一种情况,如果MQ上堆积了一条消息,许久没被消费。那么这条消息所在的db-1.log就不会被清理,也就是说db-1.log中所有其他已经被消费的消息也都不会被清理。如果db-2.log,db-3.log中存在db-1.log中的已经被消费的消息的ack数据,这个ack数据也是不能被清理的。原因是如果清理了ack数据,那么在MQ宕机重启后,db-1.log中的被消费的消息又会被置为未消费。因此就会产生一个连锁反应,只要有一条消息卡在MQ上,就可能后续的数据文件均无法被删除。当然连锁反应也有可能在某个环节被打破,比如某个文件刚好包含了自己所有消息的ack和上一个文件的消息的ack数据,但是如果一直连锁下去,文件系统的确是会被撑爆。
在测试环境已重现这个问题
这个bug在ActiveMQ-5.14.0版本中被解决,解决方法是把第一个数据文件中消息的ack给迁移到某个数据文件中。我也在测试环境搭了5.14.1版本的ActiveMQ进行了测试。
可惜我们用的版本是ActiveMQ-5.11.1……………………
解决方法是:
1、尽量减少MQ上的消息堆积的情况,比如设置消息的超时时间,定时清理等。如果堆积了要及时处理,不能让MQ上长期堆积相同的消息。
2、减小db-*.log的文件大小,理论上可以使脏数据的影响范围降低
3、设置KahaDB多实例,让队列之间的数据文件互不干扰。设置方法
亲测,在分别对两个队列设置KahaDB多实例后,生产者生产持久化消息的TPS提升了
MQ服务器上出现大量CLOSE_WAIT连接
CLOSE_WAIT表示一个连接被被动关闭了,服务器没有检测到,因此程序忽略了继续进行连接的关闭。最麻烦的是CLOSE_WAIT的连接会占用amq的总连接数,因此如果要清理,只能重启mq来解决。有一篇文章对TIME_WAIT和CLOSE_WAIT解释得很详细。出现这个问题后,可能直接导致activemq报出timer already cancelled,连接超限,无法建立线程OOM等报错。(推测)该问题在JIRA上已经被关闭,见AMQ-5543和AMQ-5251。
经过一段时间的研究,发现是我的代码触发了MQ 的一个bug代码中建立了过多的JMX连接,导致服务器无法建立新的线程,从而导致连接无法被正常处理,而后连接被客户端关闭,就出现了close_wait。
这个bug连接的是InactivityMonitor中建立ReadChecker和WriteCheck的bug,出现问题时会导致MQ上报出大量的Timer Already Cancelled报错,因为无法建立新的checker线程了,但是代码中仍然进行了checker的Timer类的配置,所以抛出了timer already cancelled的异常。在AMQ 5.13及以后的版本已修复该问题。
ActiveMQ重度用户,分享一些随笔,技术,阅读等。时常发点絮叨。
还有,我爱你10174人阅读
处理事务性的消息
ActiveMQ的此种特性主要管理消息的事务,以及消息持久化,即使在出错时也不会漏掉一条消息。消息服务器需要进行信息持久化,一个服务器集群可以提高其可用性,ActiveMQ正式这样的一个高可用性的消息服务器,典型的情况就是当一个Server Node掉线的时候,它上面的所有消息都会被保存下来,以便在它重新上线时继续处理。
高性能的数据分发
ActiveMQ的这个特性主要关注的是消息的吞吐率以及高效的消息投递路由,中心思想就是在一个大的网络中尽可能快的传递大量的并且快速改变的消息数据。
鉴于大量的数据和频繁的数据数据交换负荷很高,所以这种情况下很少使用数据持久化,在失败时丢失几条数据也是可以接受的因为老的数据通常都不再被需要了,最新的数据才是真正我们关心的。
集群和通用的异步消息模型
这种特性重点在网络延迟和速度,当实现一个web或者EJB集群的时候,目的是维护一个node集群,典型的是使用多点广播来discovery&keep-alive然后使用socket直接连接这些node来进行高效的通信。
这和使用JMS provider在EJB-Style或者WS-style的服务中作为RMI层是很相似的,都能使用多点广播来discovery&keep-alive并且使用socket直接连接通信以减少延迟。所以与其使用不同的服务器来协调client之间的通信,不如让client直接和彼此通信来减少延迟。
Ps: 此段主要讲的是activeMQ的node之间会有高效的异步通信机制,网络延迟小并且高效
网络数据流
这种特性关注点是activeMQ的ajax支持,越来越多的人希望数据流能实时的传递到网络浏览器中,例如金融行业的股票价格数据,实时的在展示IM会话,实时拍卖并且动态更新内容和消息。
鉴于这种情况,我们把ActiveMQ集成到了web容器中来提供封闭的网络集成,使用HTTP POSTS来发布消息并且在js中通过HTTP GET来接受并展示消息。
简易的使用HTTP来传递消息的API
ActiveMQ这种特性主要关注跨语言跨技术的连接能力,我们为message broker提供了一个HTTP接口允许跨语言或者技术来进行简单的发送和接受消息。使用HTTP POST将消息发送到message broker,使用HTTP GET从message broker获取消息,使用URI并且指定参数来决定接受/发送的目的地。
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:103794次
积分:1078
积分:1078
排名:千里之外
原创:16篇
(1)(1)(3)(1)(1)(1)(4)(4)(1)(1)(2)
(window.slotbydup = window.slotbydup || []).push({
id: '4740881',
container: s,
size: '200,200',
display: 'inlay-fix'

我要回帖

更多关于 有人用过馥美人没 的文章

 

随机推荐