ActiveMQ入门之Java操作API 有更新!

  |   0 评论   |   281 浏览

1.项目准备

  1. 创建一个Maven工程,引入相关依赖
    <dependencies>
    	<!-- activemq所需的jar包配置 -->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.15.6</version>
            </dependency>
            <dependency>
                <groupId>org.apache.xbean</groupId>
                <artifactId>xbean-spring</artifactId>
                <version>3.16</version>
            </dependency>
    
            <!-- junit/log4j等基础配置 -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.25</version>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
                <version>1.2.3</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.16.18</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
            </dependency>
    </dependencies>
    

2.队列生产者编码

  1. 新建一个JmsProduce类
    package com.fdzang.activemq.queue;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * @author tanghu
     * @Date: 2019/7/31 11:12
     */
    public class JmsProduce {
    
        public static final String ACTIVEMQ_URL = "tcp://ip:61616";
        public static final String QUEUE_NAME = "queue01";
    
        public static void main(String[] args) throws JMSException {
            //1. 创建连接工厂,按照给定的URL地址,采用的默认的用户名跟密码
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //2. 通过连接工厂,获取连接信息,并启动访问
            Connection connection=activeMQConnectionFactory.createConnection();
            connection.start();
    
            //3. 创建会话session
            //两个参数:第一个叫事务/第二个叫签收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //4. 创建目的地(具体是队列还是主题)
            Queue queue = session.createQueue(QUEUE_NAME);
    
            //5.创建消息的生产者
            MessageProducer messageProducer = session.createProducer(queue);
    
            //6. 通过使用messageProducer生产3条消息发送到MQ的队列里面
            for(int i = 0; i < 3; i++){
                //7. 创建消息
                TextMessage textMessage = session.createTextMessage("msg---"+i);
                //8. 通过messageProducer发送到MQ
                messageProducer.send(textMessage);
            }
            //9. 关闭资源
            messageProducer.close();
            session.close();
            connection.close();
    
            System.out.println("*****消息已经发送到MQ*****");
        }
    }
    
  2. 程序运行后打印效果
    produce.JPG
  3. 在ActiveMQ中查看
    activemqqueue.png

3.队列消费者编码

  1. 新建一个JmsConsumer类
    package com.fdzang.activemq.queue;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import java.io.IOException;
    
    /**
     * @author tanghu
     * @Date: 2019/7/31 14:21
     */
    public class JmsConsumer {
    
        public static final String ACTIVEMQ_URL = "tcp://ip:61616";
        public static final String QUEUE_NAME = "queue01";
    
    
        public static void main(String[] args) throws JMSException,IOException{
            //1. 创建连接工厂,按照给定的URL地址,采用的默认的用户名跟密码
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //2. 通过连接工厂,获取连接信息,并启动访问
            Connection connection=activeMQConnectionFactory.createConnection();
            connection.start();
    
            //3. 创建会话session
            //两个参数:第一个叫事务/第二个叫签收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //4. 创建目的地(具体是队列还是主题)
            Queue queue = session.createQueue(QUEUE_NAME);
    
            //5. 创建消费者
            MessageConsumer messageConsumer = session.createConsumer(queue);
    
            /*
            同步阻塞方式(receive)
            订阅者调用messageConsumer的receive方法来接收消息
            receive方法在能够接收到消息之前(或超时之前)一直阻塞
            while (true) {
                TextMessage textMessage= (TextMessage)messageConsumer.receive(4000L);
                if(textMessage != null){
                    System.out.println("*****消费者接收到消息:"+textMessage.getText());
                } else {
                    break;
                }
            }
            messageConsumer.close();
            session.close();
            connection.close();*/
    
            //通过监听的方式来消费消息
            messageConsumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if(null != message && message instanceof TextMessage){
                        TextMessage textMessage = (TextMessage)message;
                        try {
                            System.out.println("*****消费者接收到消息:"+textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            System.in.read();//保证控制台不退出
    
            messageConsumer.close();
            session.close();
            connection.close();
        }
    }
    
  2. 查看控制台输出
    consmer.JPG
  3. 在ActiveMQ中查看
    activemqconsumer.JPG
  4. 消费者优化
    在接收消息时[messageConsumer.receive()],应该指定等待时间。[messageConsumer.receive(4000L)]

4.主题生产者编码

  1. 新建一个JmsProduce
    package com.fdzang.activemq.topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * @author tanghu
     * @Date: 2019/7/31 16:41
     */
    public class JmsProduce {
        public static final String ACTIVEMQ_URL = "tcp://ip:61616";
        public static final String TOPIC_NAME = "topic01";
    
    
        public static void main(String[] args) throws JMSException {
            //1. 创建连接工厂,按照给定的URL地址,采用的默认的用户名跟密码
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //2. 通过连接工厂,获取连接信息,并启动访问
            Connection connection=activeMQConnectionFactory.createConnection();
            connection.start();
    
            //3. 创建会话session
            //两个参数:第一个叫事务/第二个叫签收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //4. 创建目的地(具体是队列还是主题)
            Topic topic = session.createTopic(TOPIC_NAME);
    
            //5.创建消息的生产者
            MessageProducer messageProducer = session.createProducer(topic);
    
            //6. 通过使用messageProducer生产3条消息发送到MQ的队列里面
            for(int i = 0; i < 3; i++){
                //7. 创建消息
                TextMessage textMessage = session.createTextMessage("TOPIC MSG ---"+i);
                //8. 通过messageProducer发送到MQ
                messageProducer.send(textMessage);
            }
            //9. 关闭资源
            messageProducer.close();
            session.close();
            connection.close();
    
            System.out.println("*****消息已经发送到MQ*****");
        }
    }
    
  2. 查看ActiveMQ控制台
    topic.jpg

5.消费者编码

  1. 新建一个JmsConsumer类
    package com.fdzang.activemq.topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import java.io.IOException;
    
    /**
     * @author tanghu
     * @Date: 2019/8/1 9:50
     */
    public class JmsConsumer {
        public static final String ACTIVEMQ_URL = "tcp://120.78.121.110:61616";
        public static final String TOPIC_NAME = "topic01";
    
    
        public static void main(String[] args) throws JMSException, IOException {
            //1. 创建连接工厂,按照给定的URL地址,采用的默认的用户名跟密码
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //2. 通过连接工厂,获取连接信息,并启动访问
            Connection connection=activeMQConnectionFactory.createConnection();
            connection.start();
    
            //3. 创建会话session
            //两个参数:第一个叫事务/第二个叫签收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //4. 创建目的地(具体是队列还是主题)
            Topic topic = session.createTopic(TOPIC_NAME);
    
            //5. 创建消费者
            MessageConsumer messageConsumer = session.createConsumer(topic);
    
            //通过监听的方式来消费消息
            messageConsumer.setMessageListener((message) -> {
                if(null != message && message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage)message;
                    try {
                        System.out.println("*****消费者接收到Topic消息:"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            System.in.read();//保证控制台不退出
    
            messageConsumer.close();
            session.close();
            connection.close();
        }
    }
    

6.区别

default.JPG

评论

发表评论