部署
准备JDK环境
JDK下载链接:https://www.oracle.com/java/technologies/downloads/
部署ActiveMQ
下载
官网下载地址:https://activemq.apache.org/
解压并移动到指定目录
unzip apache-activemq-5.16.4-bin.zip
mv apache-activemq-5.16.4 /opt/activemq-5.16.4
配置
修改conf/jetty.xml文件,把admin服务地址127.0.0.1修改成0.0.0.0
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8161"/>
</bean>
启动
./opt/activemq-5.16.4/bin/activemq start
通过部署地址:8161端口访问管理平台,默认账号密码都是admin
High Availability:Master-Slave模式
High Availability是主从之间通过Live和Backup角色切换的方式,Live和Backup之间通过Journal Log同步数据
Master-Slave的部署方式主要分为3种
1.Shared Database Master-Slave
基于共享数据库,跟Filesystem类似,性能会受限于数据库。ActiveMQ的队列数据并未存放到数据库中
参考链接:https://activemq.apache.org/jdbc-master-slave.html
环境准备
相关数据库连接工具,下载好后将其放到ActiveMQ的lib目录
1.mysql connector,示例下载的5.1版本。
2.commons-dbcp2,示例下载的2.9.0版本
3.commons-pool2,示例下载的2.11.1版本
https://downloads.mysql.com/archives/c-j/
https://cdn.mysql.com/archives/mysql-connector-java-5.1/mysql-connector-java-5.1.49.tar.gz
https://commons.apache.org/proper/commons-pool/download_pool.cgi
https://commons.apache.org/proper/commons-dbcp/download_dbcp.cgi
配置
activemq.xml的配置注意点
1.Networks必须在消息存储之前创建
2.Message store必须在传输配置好之前配置完
3.Transports必须在broker配置的最后
主要修改参数
设置broker名称
<broker brokerName="broker_beta" >
设置broker连接
<networkConnectors>
<networkConnector name="local_network" uri="static:(tcp://192.168.31.101:61616,tcp://192.168.31.102:61616)" duplex="true"/>
</networkConnectors>
设置数据库连接参数
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.31.109:3306/activemq-db?useUnicode=true"/>
<property name="username" value="root"/>
<property name="password" value="password"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
注意:数据库可指定名称为activemq-db,可能会有数据库报错,需要先创建数据库activemq-db
连接配置示例:mysql://192.168.31.109:3306/activemq-db
activemq.xml配置参考
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<!-- Allows accessing the server log -->
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="broker_beta" dataDirectory="${activemq.data}">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other consumers
by limiting the number of messages that are retained
For more information, see:
http://activemq.apache.org/slow-consumer-handling.html
-->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<!-- connector configuration -->
<networkConnectors>
<networkConnector name="local_network" uri="static:(tcp://192.168.31.101:61616,tcp://192.168.31.102:61616)" duplex="true"/>
</networkConnectors>
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<!-- default kahadb -->
<!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
<!-- mysql -->
<!-- <jdbcPersistenceAdapter useDatabaseLock="true" dataSource="#mysql-ds"/> -->
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true" useDatabaseLock="true" />
</persistenceAdapter>
<!-- <persistenceFactory>
<journalPersistenceAdapterFactory dataSource="#mysql_ds" dataDirectory="activemq-data" />
<spersistenceFactory> -->
<!--
The systemUsage controls the maximum amount of space the broker will
use before disabling caching and/or slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<!-- destroy the spring context on shutdown to stop jetty -->
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.31.109:3306/activemq-db?useUnicode=true"/>
<property name="username" value="root"/>
<property name="password" value="password"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
<!--
Enable web consoles, REST and Ajax APIs and demos
The web consoles requires by default login, you can disable this in the jetty.xml file
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml"/>
</beans>
<!-- END SNIPPET: example -->
搭建数据库
启动MySQL数据库或数据库集群,账号密码需与配置文件中的对应,此处略过部署过程,部署完后创建相应的数据库,数据表不用创建,启动ActiveMQ后会自动生成
数据表说明
ACTIVEMQ_ACKS
用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存。
主要的数据库字段如下:
字段 | 说明 |
---|---|
container | 消息的destination |
sub_dest | 如果是使用static集群,这个字段会有集群其他系统的信息 |
client_id | 每个订阅者都必须有一个唯一的客户端id用以区分 |
sub_name | 订阅者名称 |
selector | 选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性and和or操作 |
last_acked_id | 记录消费过的消息的id |
ACTIVEMQ_MSGS
用于存储消息,Queue和Topic都存储在这个表中。
主要的数据库字段如下:
字段 | 说明 |
---|---|
id | 自增的数据库主键 |
container | 消息的destination |
msgid_prod | 消息发送者客户端的主键 |
msg_seq | 是发送消息的顺序,msgid_prod+msg_seq可以组成jms的messageid |
expiration | 消息的过期时间,存储的是从1970-01-01到现在的毫秒数 |
msg | 消息本体的java序列化对象的二进制数据 |
priority | 优先级,从0-9,数值越大优先级越高 |
activemq_acks | 用于存储订阅关系。如果是持久化topic,订阅者和服务器的订阅关系在这个表保存 |
启动
在各个节点中分别启动,先启动的即为Master,不同的Broker集群使用不同的数据库即可
./opt/activemq-5.16.4/bin/activemq start
注意:Master节点开启后,Slave节点不会有端口开启,但是会有侦听进程检查数据库是否被Lock,如果一旦Unlock则抢占角色
2.Replicated LevelDB Store
Replicated LevelDB Store方式,基于ZooKeeper + LevelDB,是生产环境常用的方案
参考链接:https://activemq.apache.org/replicated-leveldb-store.html
3.Shared Filesystem Master-Slave
Shared Filesystem Master-Slave方式,如KahaDB,应用灵活、高效且安全
参考链接:http://activemq.apache.org/shared-file-system-master-slave.html
Load Balance:Broker-Cluster模式
Broker-Cluster部署方式中,各个Broker通过网络互相连接,自动分发调用端请求,从而实现集群的负载均衡。Broker-Cluster集群连接到网络代理的方式,主要分为静态网络代理、动态网络代理,不同的网络代理方式对应不同的集群部署方式。
Master-Slave的部署方式虽然解决了高可用的问题,但不支持负载均衡,Broker-Cluster解决了负载均衡,但当其中一个Broker突然宕掉的话,那么存在于该Broker上处于Pending状态的message将会丢失,无法达到高可用的目的。
在Broker模式下,如果一个节点接收到了Consumer的请求该节点没有相应数据则它会将该请求转发到其他节点,此时它相当于一个Consumer,他们之间可以互相消费。除此之外还需配置消息回流,这样 brokerA 的消费者去消费 队列 queueMsg时 , brokerA 没有回去brokerB要,当 brokerB 的消费者去消费 队列 queueMsg 时,brokerB 会从 brokerA中要回来,就这样互相来回消费。
Static Broker-Cluster
Broker Connector配置
<networkConnectors>
<networkConnector name="brokerB" uri="static:(tcp://172.18.0.116:61619,tcp://172.18.0.116:61620,tcp://172.18.0.116:61621)" duplex="true" />
</networkConnectors>
Dynamic Broker-Cluster
Broker Connector配置
<networkConnectors>
<networkConnector uri="multicast://default"/>
</networkConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616" discoveryUri="multicast://default" /></transportConnectors>
Master-Slave和Broker-Cluster两者相结合的6个节点集群部署,也说明了Master-Slave和Broker-Cluster模式:
https://developer.aliyun.com/article/758368
https://blog.csdn.net/lifetragedy/article/details/51869032
动态静态配置参考
https://www.cnblogs.com/allenwas3/p/8872822.html
https://www.cnblogs.com/arjenlee/p/9303229.html#auto_id_17
ActiveMQ高可用+负载均衡集群之功能测试
https://developer.aliyun.com/article/762066?spm=a2c6h.12873639.0.0.375e86f3XWr2v8
客户端连接
针对客户端(Java)会使用failover来连接两个ActiveMQ实例,randomize=true时客户端会随机对BrokerA、BrokerB进行连接(默认为true),当BrokerA挂了,他会将所有的请求都会到BrokerB上 ,randomize=false时所有的客户端连接都会打到ip:port 上,只有第一个挂了,才会到第二个,按前后优先级顺序
Python连接ActiveMQ
import stomp
import time
# 注意带/才能区分queue和topic
queue_name = '/queue/SAMPLE.QUEUE'
topic_name = '/topic/SAMPLE.TOPIC'
listener_name = 'SampleListener'
connections = [('192.168.31.101',61613),('192.168.31.102',61613),('192.168.31.103',61613)]
# 声明Listener类
class SampleListener(object):
def on_error(self, frame):
print('received an error: %s from ActiveMQ' % frame)
def on_message(self, frame):
print(f"headers:{frame.headers['destination']}, message:{frame.body}")
# 推送到队列
def send_to_queue(msg):
conn = stomp.Connection(connections)
conn.connect()
# 可以指定账号密码连接
# conn.connect(username='admin', passcode='admin', wait=True)
conn.send(body=msg, destination=queue_name)
conn.disconnect()
# 从队列接收消息
def receive_from_queue():
conn = stomp.Connection(connections)
conn.set_listener(listener_name, SampleListener())
conn.connect()
# 可以指定账号密码连接
# conn.connect(username='admin', passcode='admin', wait=True)
conn.subscribe(destination=queue_name,id=1)
while 1:
time.sleep(1)
# 推送到 Topic
def send_to_topic(msg):
conn = stomp.Connection(connections)
conn.connect()
# 可以指定账号密码连接
# conn.connect(username='admin', passcode='admin', wait=True)
conn.send(topic_name, msg)
conn.disconnect()
# 从 Topic 接收消息
def receive_from_topic():
conn = stomp.Connection(connections)
conn.set_listener(listener_name, SampleListener())
conn.connect()
# 可以指定账号密码连接
# conn.connect(username='admin', passcode='admin', wait=True)
conn.subscribe(destination=topic_name,id=1)
while 1:
time.sleep(3)
if __name__=='__main__':
while True:
command = input("[Please choice one option to execute]\n[1]:send to queue\n[2]:receive from queue\n[3]:send to topic\n[4]:receive from topic\n")
if command == "1":
get_msg = input("You can input message here or enter to leave it default\n")
if get_msg != "":
msg = get_msg
else:
msg = 'current time is '+time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
send_to_queue(msg)
print("[send to queue done]")
elif command == "2":
receive_from_queue()
elif command == "3":
get_msg = input("You can input message here or enter to leave it default\n")
if get_msg != "":
msg = get_msg
else:
msg = 'current time is '+time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
send_to_topic(msg)
print("[send to topic done]")
elif command == "4":
receive_from_topic()
else:
print("[no this option]")
参考链接:
https://blog.csdn.net/five3/article/details/79569587
https://blog.csdn.net/CWG2017/article/details/101169246
http://www.cppcns.com/jiaoben/python/251428.html
参考链接
集群架构说明,很详细
https://blog.csdn.net/yinwenjie/article/details/51124749
https://blog.csdn.net/yinwenjie/article/details/51205822
ZooKeeper集群架构说明
https://blog.csdn.net/yinwenjie/article/details/47361419
Cluster:
https://activemq.apache.org/components/artemis/documentation/latest/clusters.html
High Availability and Failover:
https://activemq.apache.org/components/artemis/documentation/1.0.0/ha.html
https://activemq.apache.org/components/artemis/documentation/
One comment
努力看懂(ó﹏ò。)