b2c信息网

您现在的位置是:首页 > 金融新闻 > 正文

金融新闻

activemq作用(activemq能做什么)

hacker2022-06-09 19:23:25金融新闻69
本文目录一览:1、如何使用ActiveMQ+MQTT实现Android点对点消息通知?

本文目录一览:

如何使用ActiveMQ+MQTT实现Android点对点消息通知?

2013-12-20

实现点对点消息通知的关键问题

ActiveMQ使用MQTT协议,加上android上的paho包,即可简单实现消息通知功能,但是mqtt协议只支持topic,而且不能用selector,使得点对点的消息投递变成问题。

有两个解决思路:

1、每个clientId,建一个topic...这个办法对解决消息点对点投递非常有效,但是有两个大问题:

随着用户数增多,topic数量增多,对管理性要求增大,对内存的管理也有问题。

消息广播操作也变得非常麻烦,只能一个个的发送了。

2、另一个思路,就是在消息广播的基础上,进行点对点控制,实现某些特征的消息投递到指定的订阅者。

这个的实现比较简单,而且没有上面方案的大问题。代码稍微改下即可:

其实就只添加了一个新的类: ClientIdFilterDispatchPolicy

可以git clone所在版本源码,然后加上这个类,mvn package以后使用。

使用说明

本修改实现mqtt协议使用单个topic,来做消息广播和点对点的投递。

1、将本文件夹下的activemq-broker-5.9.0.jar、activemq-spring-5.9.0.jar换掉apache-activemq-5.9.0\lib下的jar。

2、参考本文件夹下activemq.xml,在topic上配置

dispatchPolicy

clientIdFilterDispatchPolicy /

/dispatchPolicy

3、对于此配置下的所有名称以.PTP结尾的队列,

如果要投递消息的properties里包含PTP_CLIENTID,则系统只会将此消息发给clientId为此值的订阅者;如果当前没有此clientId的订阅者,消息不会被任何人接收到。

如果投递消息的properties里不包含PTP_CLIENTID,则消息广播给所有的订阅者。 跟正常消息投递一致。

其中后缀.PTP和键值PTP_CLIENTID,是可以配置的:

dispatchPolicy

clientIdFilterDispatchPolicy ptpSuffix="" ptpClientId="clientId"/

/dispatchPolicy

如上配置,使得此policy下的所有topic都起作用,且消息的properties里获取clientId的key变成clientId。

消息发布者,如果要对所有人广播消息,直接发送消息即可。

如果要对指定的消息订阅者发消息,请在消息里设置接收者的clientId:

message.setStringProperty(PTP_CLIENTID, clientId);则此消息只有指定的订阅者可以拿到。

简单测试

两台android设备使用MQTT协议订阅到ActiveMQ的同一个topic,clientId分别为mqtt-1001和mqtt1002;

写代码发两条消息,设置消息属性中PTP_CLIENTID分别为mqtt-1001和mqtt1002;

两个设备分别接收到自己的消息通知,相互之间没有影响。还可以测试下如果消息没有PTP_CLIENTID,两个都能收到。

请用白话讲解ActiveMQ的用途

用途就是用来处理消息,也就是处理JMS的。消息队列在大型电子商务类网站,如京东、淘宝、去哪儿等网站有着深入的应用,队列的主要作用是消除高并发访问高峰,加快网站的响应速度。

在不使用消息队列的情况下,用户的请求数据直接写入数据库,高发的情况下,会对数据库造成巨大的压力,同时也使得系统响应延迟加剧,但使用队列后,用户的请求发给队列后立即返回。

例如:不能直接给用户提示订单提交成功,京东上提示:“您提交了订单,请等待系统确认”再由消息队列的消费者进程从消息队列中获取数据,异步写入数据库。

由于消息队列的服务处理速度远快于数据库,因此用户的响应延迟可得到有效改善。

扩展资料:

ActiveMQ主要有以下几种使用场景

1、异步调用。

2、一对多通信。

3、做多个系统的集成、同构、异构。

4、作为RPC的替代。

5、多个应用相互解耦。

6、作为事件驱动架构的幕后支撑。

7、为了提高系统的可伸缩性。

activemq为什么需要zookeeper

如何使用 Zookeeper 作为一个分布式的服务框架,主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储,但是 Zookeeper 并不是用来专门存储数据的,它的作用主要是用来维护和监控你存储的数据。

关于ActiveMQ的配备怎么解决

关于ActiveMQ的配置

目前常用的消息队列组建无非就是MSMQ和ActiveMQ,至于他们的异同,这里不想做过多的比较。简单来说,MSMQ内置于微软操作系统之中,在部署上包含一个隐性条件:Server需要是微软操作系统。(对于这点我并去调研过MSMQ是否可以部署在非微软系统,比如:Linux,只是拍脑袋想了想,感觉上是不可以)。对于ActiveMQ,微软系统和Linux都是可以部署的。从功能方面来说,一般最常用的就是:消息的收/发,感觉差异不大。从性能上来说,一般的说法是ActiveMQ略高。在稳定性上,个人感觉MSMQ更好。如果这两种常用队列都用过的同学,应该来说最大的差异在于:MSMQ如果要访问远程队列(比如机器A上的程序访问机器B上的队列),会比较恶心。在数据量比较大的情况之下,一般来说队列服务器会专门的一台或者多台(多台的话,用程序去做热备+负载比较方便,也不需要额外的硬件成本。简单来说做法可以这样:消息发送的时候随机向着多台队列服务器上发送消息;接受的时候开多个线程去分别监听;热备方面,可以维护一个带状态的队列连接池,如果消息收发失败那么将状态置为不可用,然后起一个线程去定时监测坏的连接是否可用,这个过程一般情况下可以不用加锁,为什么,大家根据各自需要去取舍吧)。最近搞完了短彩信的网关连接服务,这两种队列我均使用了。大致的过程是这样的:上层应用如果要发端彩信,那么将消息直接发送至ActiveMQ(目前用的就是上面说的多台热备+负载,因为实际中下行量非常大5千万条/天以上),然后端彩信网关连接服务部署多套,每套均依赖本机的MSMQ。为什么呢?用ActiveMQ的原因是:上层应用程序和网关连接服务彼此独立,消息需要跨机访问。用MSMQ的原因是:ActiveMQ中的数据是一条不分省的大队列,网关连接服务需要按省流控,所以端彩信网关连接服务:首先把消息从ActiveMQ取出来,然后存至本机上的分省MSMQ,这样做另外的一个好处就是:ActiveMQ不至于过多挤压,他的数据会分摊到N台短彩信网关连接服务所部署的机器上的MSMQ之中,也就说MSMQ可以起到分摊数据和缓冲的作用。

在之前的随笔中,已经介绍过MSMQ,现在先介绍一下ActiveMQ一些配置,目前好像ActiveMQ配置上的介绍还比较少。以下是自己总结一些相关资料,贴出来给大家共享

一)问题分析和解决

1)KahaDb和AMQ Message Store两种持久方式如何选择?

官方:

From 5.3 onwards - we recommend you use KahaDB - which offers improved scalability and recoverability over the AMQ Message Store.

The AMQ Message Store which although faster than KahaDB - does not scales as well as KahaDB and recovery times take longer.

非官方:

kaha文件系统实际上上是一个文件索引系统,有两部分组成,一个是数据文件系统,由一个个独立的文件组成,缺省文件大小是32M大(可配置),另外一个是索引文件系统,记录消息在数据文件中的位置信息以及数据文件中的空闲块信息。数据文件是存储到硬盘上的,索引文件是缓存在内存中的。所以这个存储系统对大消息存储有利,象我们的memberId之类的文本消息,实际上是浪费,索引比消息还大,哈。

我方分析:

推荐: Amq持久方式

理由:虽然官方推荐使用KahaDB持久方式,但其提到的优势:可伸缩性和恢复性较好,对于我们实际的应用意义不大。从我们自己的使用经验来看,KahaDB持久方式,Data文件是一个大文件(感觉文件过大后,造成队列服务瘫死的可能性会增大),从官网的相关配置(附录1)也找不到哪里可以设置数据的文件的最大Size。)而Amq持久方式可以设置Data文件最大Size,这样可以保证即时消息积压很多,Data文件也不至于过大。

2)错误:Channel was inactive for too long

解决方法:

在建立连接的Uri中加入: wireFormat.maxInactivityDuration=0

参考资源:

You can do the following to fix the issues:

1) Append max inactivity duration to your Uri in the format below: wireFormat.maxInactivityDuration=0

2) Use the same Uri at the client side as well as at the server side

Regards,

如果不这样设置,对应的错误会出现:

2008-05-07 09:22:56,343 [org.apache.activemq.ActiveMQConnection]-[WARN] Async exception with no exception listener: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long: localhost/127.0.0.1:61616

org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long: localhost/127.0.0.1:61616

ActiveMQ的tcp url:tcp://localhost:61616后面要加入?wireFormat.maxInactivityDuration=0 这样的参数,否则当一段时间没有消息发送时会抛出 "Channel was inactive for too long"异常

3)错误:Wire format negotiation timeout: peer did not send his wire format.

解决方法:

1)关闭ActiveMqLog4j

打开:conf/log4j.properties

将:log4j.rootLogger=INFO, console, logfile

修改为:log4j.rootLogger=OFF

2)在建立连接的Uri中加入: maxInactivityDurationInitalDelay=30000

例如北京的测试环境连接Uri:

tcp://192.168.199.80:61616?wireFormat.maxInactivityDuration=0maxInactivityDurationInitalDelay=30000connection.AsyncSend=true

参考资源:

If you get exception like this,it can mean one of three things:

1. You're connecting to the port not used by ActiveMQ TCP transport

Make sure to check that you're connecting to the appropriate host:port

2. You're using log4j JMS appender and doesn't filter out ActiveMQ log messages

Be sure to read How do I use log4j JMS appender with ActiveMQ and more importantly to never send ActiveMQ log messages to JMS appender

3. Your broker is probably under heavy load (or network connection is unreliable), so connection setup cannot be completed in a reasonable time

If you experience sporadic exceptions like this, the best solution is to use failover transport, so that your clients can try connecting again if the first attempt fails. If you're getting these kind of exceptions more frequently you can also try extending wire format negotiation period (default 10 sec). You can do that by using wireFormat.maxInactivityDurationInitalDelay property on the connection URL in your client.

For example

tcp://localhost:61616?wireFormat.maxInactivityDurationInitalDelay=30000

will use 30 sec timeout.(貌似有问题!!!)

4)错误:Out of memory

解决方法:

1) 设置Java最大内存限制为合适大小:

Bin/activemq.bat 中ACTIVEMQ_OPTS=-Xmx512M(默认是512)

2)Activemq.xml配置节:systemUsage/ systemUsage配置大小合适,并且特别注意:大于所有durable desitination设置的memoryUsage之和。

备注:

1)尖括号:“”代表通配符

2)ACTIVEMQ_OPTS的配置〉=memoryUsage中配置〉=所有durable desitination设置之和

3)SystemUsage配置设置了一些系统内存和硬盘容量,当系统消耗超过这些容量设置时,amq会“slow down producer”,还是很重要的。

参考资料:

参考--

对于MQ的内容实用是可管理和可配置的。首先需要判断的是MQ的哪部分系统因内存不足而导致泄漏,是JVM,broker还是消费者、生产者?

一、内存管理

JVM内存管理:

1. 用bin/activemq命令在独立JVM中运行broker。用-Xmx和-Xss命令即可(activemq.bat文件中修改ACTIVEMQ_OPTS选项参数即可);

2. 默认情况下,MQ用512M的JVM;

broker内存管理:

1. broker使用的内存并不是由JVM的内存决定的。虽然受到JVM的限制,但broker确实独立管理器内存;

2. systemUsage和destination的内存限制与broker内存息息相关;

3. MQ中内存的关系是:JVM-Broker-broker features;

4. 所有destination的内存总量不能超过broker的总内存;

消费者:

1. 由于消息大小可以配置,prefetch limit往往是导致内存溢出的主要原因;

2. 减少prefetch limit的大小,会减少消费者内存中存储的消息数量;

生产者:

1. 除非消息数量超过了broker资源的限制,否则生产者不会导致内存溢出;

2. 当内存溢出后,生产者会收到broker的阻塞信息提示;

二、其他

将消息缓冲之硬盘:

1. 只有当消息在内存中存储时,才允许消息的快速匹配与分发,而当消费者很慢或者离开时,内存可能会耗尽;

2. 当destination到达它的内存临界值时,broker会用消息游标来缓存非持久化的消息到硬盘。

3. 临界值在broker中通过memoryUsage和systemUsage两个属性配置,请参考activemq.xml;

4. 对于缓慢的消费者,当尚未耗尽内存或者转变为生产者并发控制模式前,这个特性允许生产者继续发送消息到broker;

5. 当有多个destination的时候,默认的内存临界值可能被打破,而这种情况将消息缓存到硬盘就显得很有意义;

6. precentUsage配置:使用百分比来控制内存使用情况;

多个线程:

1. 默认情况下,MQ每个destination都对应唯一的线程;

2. -Dorg.apache.activema.UseDedicatedTaskRunner=false(activemq.bat文件中修改ACTIVEMQ_OPTS选项参数即可),用线程池来限制线程的数量,从而减少内存消耗;

大数据传输:

1. destination policies--maxPageSize:控制进入内存中的消息数量;lazyDispatch:增加控制使用当前消费者列表的预取值;

2. 使用blogMessage或者streamsMessage类型来进行大量文件的传输;

泄漏JMS资源:

1. 当session或者producer或者consumer大量存在而没有关闭的时候;

2. 使用PooledConnectionFactory;

ActiveMQ是什么是干什么用的

1、ActiviMq消息队列,解决了服务解耦合的动作,缓解了服务并发量很大,造成服务器无法处理的状况。(kafka、rabbitMQ、activiMQ)

其他作用:异步处理、消息通讯、流量消峰、应用解耦

应用场景:

1、用户注册的时候,重点内容是将用户信息保存到数据库中,发短信验证或者是发邮件增加了业务的复杂度。这时使用MQ将发短信、发邮件通知MQ由另外的服务平台完成。

2、搜索平台、缓存平台

查询数据,建立缓存、索引,不从数据库查询,从缓存或者索引库查询,当数据库发生增加、修改、删除操作时发消息给MQ,缓存平台或者是索引平台从MQ获取到这个消息,更新缓存或者索引。

ActiveMQ使用的是标准的生产者(完成生产消息并发送消息)和消费者(获取消息,完成自己的业务逻辑)模型

有两种数据结构

Topic(发布订阅) 一个生产者对应多个消费者,消息默认不会持久化,需要手动配置持久化。如果A服务器挂了,再生产一条消息的话,会被B服务器拿去使用,就算重新启动,A服务器也不会再拿到消息了

商品系统、库存系统、生成商品详情页面的系统,现在要添加一个商品信息,消息肯定是需要让库存系统以及商品信息详情页面系统知道的。

Queue(点对点)一个生产者对应一个消费者,默认消息持久化

StringMessage

mapMessage

byteMessage

objectMessage

要完成topic模式的消息持久化,需要保证每个消费者有唯一的clientID(本文来自北大青鸟)

ActiveMQ stomp的协议包和其作用

回答1 : -------------------------------------------------

支持多种传送协议:in-VM,TCP,SSL,Peer,NIO,UDP,http,https

In-VM允许在VM内部通信,从而避免网络传输的开销。

TCP:允许客户端通过TCP socket连接到远程broker

SSL:允许客户端使用SSL协议通过TCP socket连接到远程broker

Peer:提供了一个P2P的传输方式

NIO:和TCP类似,

UDP:允许通过UDP进行通信

http/https:允许客户端和broker在http/https上建立连接通道

回答2 : -------------------------------------------------

STOMP协议工作于TCP协议之上,使用了下列命令:

* SEND 发送

* SUBSCRIBE 订阅

* UNSUBSCRIBE 退订

* BEGIN 开始

* COMMIT 提交

* ABORT 取消

* ACK 确认

* DISCONNECT 断开

回答3 : -------------------------------------------------

暂无...

发表评论

评论列表

  • 柔侣酒废(2022-06-10 05:52:08)回复取消回复

    true 参考资源: If you get exception like this,it can mean one of three things:1. You're connecting to the port not used by ActiveMQ TCP transportMake

  • 忿咬聊慰(2022-06-10 07:18:12)回复取消回复

    nd ActiveMQ log messages to JMS appender 3. Your broker is probably under heavy load (or ne

  • 俗野嘤咛(2022-06-10 00:59:00)回复取消回复

    息数量超过了broker资源的限制,否则生产者不会导致内存溢出; 2. 当内存溢出后,生产者会收到broker的阻塞信息提示; 二、其他 将消

  • 澄萌偶亦(2022-06-10 05:14:39)回复取消回复

    paho包,即可简单实现消息通知功能,但是mqtt协议只支持topic,而且不能用selector,使得点对点的消息投递变成问题。有两个解决思路:1、每个clientId,建一个topic...这个办法对解决消息点对点投递非常有效,但

  • 北槐戏侃(2022-06-09 20:12:50)回复取消回复

    推荐使用KahaDB持久方式,但其提到的优势:可伸缩性和恢复性较好,对于我们实际的应用意义不大。从我们自己的使用经验来看,KahaDB持久方式,Data文件是一个大文件(感觉文件过大后,造成队列服务瘫死的可能性会增大),从官网的相关配置(附录1)也找不到