部署

准备JDK环境

JDK下载链接:https://www.oracle.com/java/technologies/downloads/

部署ActiveMQ

下载

官网下载地址:https://activemq.apache.org/

解压并移动到指定目录

unzip apache-activemq-5.16.4-bin.zip
mv apache-activemq-5.16.4 /opt/activemq-5.16.4

配置

修改conf/jetty.xml文件,把admin服务地址127.0.0.1修改成0.0.0.0

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8161"/>
</bean>

启动

./opt/activemq-5.16.4/bin/activemq start

通过部署地址:8161端口访问管理平台,默认账号密码都是admin

High Availability:Master-Slave模式

High Availability是主从之间通过Live和Backup角色切换的方式,Live和Backup之间通过Journal Log同步数据

Master-Slave的部署方式主要分为3种

1.Shared Database Master-Slave

基于共享数据库,跟Filesystem类似,性能会受限于数据库。ActiveMQ的队列数据并未存放到数据库中

参考链接:https://activemq.apache.org/jdbc-master-slave.html

环境准备

相关数据库连接工具,下载好后将其放到ActiveMQ的lib目录

1.mysql connector,示例下载的5.1版本。

2.commons-dbcp2,示例下载的2.9.0版本

3.commons-pool2,示例下载的2.11.1版本

https://downloads.mysql.com/archives/c-j/

https://cdn.mysql.com/archives/mysql-connector-java-5.1/mysql-connector-java-5.1.49.tar.gz

https://commons.apache.org/proper/commons-pool/download_pool.cgi

https://commons.apache.org/proper/commons-dbcp/download_dbcp.cgi

配置

activemq.xml的配置注意点

1.Networks必须在消息存储之前创建

2.Message store必须在传输配置好之前配置完

3.Transports必须在broker配置的最后

主要修改参数

设置broker名称

<broker brokerName="broker_beta" >

设置broker连接

<networkConnectors>
<networkConnector name="local_network" uri="static:(tcp://192.168.31.101:61616,tcp://192.168.31.102:61616)" duplex="true"/>
</networkConnectors>

设置数据库连接参数

<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">  
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>  
    <property name="url" value="jdbc:mysql://192.168.31.109:3306/activemq-db?useUnicode=true"/>  
    <property name="username" value="root"/>  
    <property name="password" value="password"/>  
    <property name="poolPreparedStatements" value="true"/>    
</bean> 

注意:数据库可指定名称为activemq-db,可能会有数据库报错,需要先创建数据库activemq-db

连接配置示例:mysql://192.168.31.109:3306/activemq-db

activemq.xml配置参考

<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>

   <!-- Allows accessing the server log -->
    <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="broker_beta" dataDirectory="${activemq.data}">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <!--
            The managementContext is used to configure how ActiveMQ is exposed in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>


        <!-- connector configuration -->
        <networkConnectors>
            <networkConnector name="local_network" uri="static:(tcp://192.168.31.101:61616,tcp://192.168.31.102:61616)" duplex="true"/>
        </networkConnectors>

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
        -->
        <persistenceAdapter>
            <!-- default kahadb -->
            <!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
            <!-- mysql -->
            <!-- <jdbcPersistenceAdapter  useDatabaseLock="true" dataSource="#mysql-ds"/> -->
            <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true" useDatabaseLock="true" />
        </persistenceAdapter>
        
        <!-- <persistenceFactory>
          <journalPersistenceAdapterFactory  dataSource="#mysql_ds" dataDirectory="activemq-data"  />
        <spersistenceFactory> -->

          <!--
            The systemUsage controls the maximum amount of space the broker will
            use before disabling caching and/or slowing down producers. For more information, see:
            http://activemq.apache.org/producer-flow-control.html
          -->
          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

            http://activemq.apache.org/configuring-transports.html
        -->
        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>


        <!-- destroy the spring context on shutdown to stop jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>

    <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">  
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>  
        <property name="url" value="jdbc:mysql://192.168.31.109:3306/activemq-db?useUnicode=true"/>  
        <property name="username" value="root"/>  
        <property name="password" value="password"/>  
        <property name="poolPreparedStatements" value="true"/>    
    </bean> 
    <!--
        Enable web consoles, REST and Ajax APIs and demos
        The web consoles requires by default login, you can disable this in the jetty.xml file

        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml"/>

</beans>
<!-- END SNIPPET: example -->

搭建数据库

启动MySQL数据库或数据库集群,账号密码需与配置文件中的对应,此处略过部署过程,部署完后创建相应的数据库,数据表不用创建,启动ActiveMQ后会自动生成

数据表说明

ACTIVEMQ_ACKS
用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存。

主要的数据库字段如下:

字段说明
container消息的destination
sub_dest如果是使用static集群,这个字段会有集群其他系统的信息
client_id每个订阅者都必须有一个唯一的客户端id用以区分
sub_name订阅者名称
selector选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性and和or操作
last_acked_id记录消费过的消息的id

ACTIVEMQ_MSGS
用于存储消息,Queue和Topic都存储在这个表中。
主要的数据库字段如下:

字段说明
id自增的数据库主键
container消息的destination
msgid_prod消息发送者客户端的主键
msg_seq是发送消息的顺序,msgid_prod+msg_seq可以组成jms的messageid
expiration消息的过期时间,存储的是从1970-01-01到现在的毫秒数
msg消息本体的java序列化对象的二进制数据
priority优先级,从0-9,数值越大优先级越高
activemq_acks用于存储订阅关系。如果是持久化topic,订阅者和服务器的订阅关系在这个表保存

启动

在各个节点中分别启动,先启动的即为Master,不同的Broker集群使用不同的数据库即可

./opt/activemq-5.16.4/bin/activemq start

注意:Master节点开启后,Slave节点不会有端口开启,但是会有侦听进程检查数据库是否被Lock,如果一旦Unlock则抢占角色

2.Replicated LevelDB Store

Replicated LevelDB Store方式,基于ZooKeeper + LevelDB,是生产环境常用的方案

参考链接:https://activemq.apache.org/replicated-leveldb-store.html

3.Shared Filesystem Master-Slave

Shared Filesystem Master-Slave方式,如KahaDB,应用灵活、高效且安全

参考链接:http://activemq.apache.org/shared-file-system-master-slave.html

Load Balance:Broker-Cluster模式

Broker-Cluster部署方式中,各个Broker通过网络互相连接,自动分发调用端请求,从而实现集群的负载均衡。Broker-Cluster集群连接到网络代理的方式,主要分为静态网络代理、动态网络代理,不同的网络代理方式对应不同的集群部署方式。

Master-Slave的部署方式虽然解决了高可用的问题,但不支持负载均衡,Broker-Cluster解决了负载均衡,但当其中一个Broker突然宕掉的话,那么存在于该Broker上处于Pending状态的message将会丢失,无法达到高可用的目的。

在Broker模式下,如果一个节点接收到了Consumer的请求该节点没有相应数据则它会将该请求转发到其他节点,此时它相当于一个Consumer,他们之间可以互相消费。除此之外还需配置消息回流,这样 brokerA 的消费者去消费 队列 queueMsg时 , brokerA 没有回去brokerB要,当 brokerB 的消费者去消费 队列 queueMsg 时,brokerB 会从 brokerA中要回来,就这样互相来回消费。

activemq_cluster

Static Broker-Cluster

Broker Connector配置

<networkConnectors>
    <networkConnector name="brokerB" uri="static:(tcp://172.18.0.116:61619,tcp://172.18.0.116:61620,tcp://172.18.0.116:61621)" duplex="true" />
</networkConnectors>

Dynamic Broker-Cluster

Broker Connector配置

<networkConnectors>
    <networkConnector uri="multicast://default"/>
</networkConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616" discoveryUri="multicast://default" /></transportConnectors>

Master-Slave和Broker-Cluster两者相结合的6个节点集群部署,也说明了Master-Slave和Broker-Cluster模式:

https://developer.aliyun.com/article/758368

https://blog.csdn.net/lifetragedy/article/details/51869032

动态静态配置参考

https://www.cnblogs.com/allenwas3/p/8872822.html

https://www.cnblogs.com/arjenlee/p/9303229.html#auto_id_17

ActiveMQ高可用+负载均衡集群之功能测试

https://developer.aliyun.com/article/762066?spm=a2c6h.12873639.0.0.375e86f3XWr2v8

客户端连接

针对客户端(Java)会使用failover来连接两个ActiveMQ实例,randomize=true时客户端会随机对BrokerA、BrokerB进行连接(默认为true),当BrokerA挂了,他会将所有的请求都会到BrokerB上 ,randomize=false时所有的客户端连接都会打到ip:port 上,只有第一个挂了,才会到第二个,按前后优先级顺序

Python连接ActiveMQ

import stomp
import time

# 注意带/才能区分queue和topic
queue_name = '/queue/SAMPLE.QUEUE'
topic_name = '/topic/SAMPLE.TOPIC'

listener_name = 'SampleListener'

connections = [('192.168.31.101',61613),('192.168.31.102',61613),('192.168.31.103',61613)]


# 声明Listener类
class SampleListener(object):
    def on_error(self, frame):
        print('received an error: %s from ActiveMQ' % frame)
    def on_message(self, frame):
        print(f"headers:{frame.headers['destination']}, message:{frame.body}")

# 推送到队列
def send_to_queue(msg):
    conn = stomp.Connection(connections)
    conn.connect()
    # 可以指定账号密码连接
    # conn.connect(username='admin', passcode='admin', wait=True)
    conn.send(body=msg, destination=queue_name)
    conn.disconnect()

# 从队列接收消息
def receive_from_queue():
    conn = stomp.Connection(connections)
    conn.set_listener(listener_name, SampleListener())
    conn.connect()
    # 可以指定账号密码连接
    # conn.connect(username='admin', passcode='admin', wait=True)
    conn.subscribe(destination=queue_name,id=1)
    while 1:
        time.sleep(1)


# 推送到 Topic
def send_to_topic(msg):
    conn = stomp.Connection(connections)
    conn.connect()
    # 可以指定账号密码连接
    # conn.connect(username='admin', passcode='admin', wait=True)
    conn.send(topic_name, msg)
    conn.disconnect()


# 从 Topic 接收消息
def receive_from_topic():
    conn = stomp.Connection(connections)
    conn.set_listener(listener_name, SampleListener())
    conn.connect()
    # 可以指定账号密码连接
    # conn.connect(username='admin', passcode='admin', wait=True)
    conn.subscribe(destination=topic_name,id=1)
    while 1:
        time.sleep(3) 


if __name__=='__main__':
    while True:
        command = input("[Please choice one option to execute]\n[1]:send to queue\n[2]:receive from queue\n[3]:send to topic\n[4]:receive from topic\n")
        if command == "1":
            get_msg = input("You can input message here or enter to leave it default\n")
            if get_msg != "":
                msg = get_msg
            else:
                msg = 'current time is '+time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
            send_to_queue(msg)
            print("[send to queue done]")
        elif command == "2":
            receive_from_queue()
        elif command == "3":
            get_msg = input("You can input message here or enter to leave it default\n")
            if get_msg != "":
                msg = get_msg
            else:
                msg = 'current time is '+time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
            send_to_topic(msg)
            print("[send to topic done]")
        elif command == "4":
            receive_from_topic()
        else:
            print("[no this option]")

参考链接:

https://blog.csdn.net/five3/article/details/79569587

https://blog.csdn.net/CWG2017/article/details/101169246

http://www.cppcns.com/jiaoben/python/251428.html

参考链接

集群架构说明,很详细

https://blog.csdn.net/yinwenjie/article/details/51124749

https://blog.csdn.net/yinwenjie/article/details/51205822

ZooKeeper集群架构说明

https://blog.csdn.net/yinwenjie/article/details/47361419

Cluster:

https://activemq.apache.org/components/artemis/documentation/latest/clusters.html

High Availability and Failover:

https://activemq.apache.org/components/artemis/documentation/1.0.0/ha.html

https://activemq.apache.org/components/artemis/documentation/

Last modification:March 14th, 2022 at 11:46 pm
硬币投入口