当前位置:首页 > 后端开发 > 消息队列--ActiveMQ

消息队列--ActiveMQ

4个月前 (06-01)60

JMS(Java Message Service,Java消息服务)是一组Java应用程序接口(Java API),它提供创建、发送、接收、读取消息的服务。Java消息服务支持两种消息模型:Point-to-Point消息(P2P)和发布订阅消息(Publish Subscribe messaging,简称Pub/Sub)。

消息队列厂商只需要实现这些接口即可,与JDBC的实现过程是类似的(JDBC也是提供一组接口,然后MySQL有MySQL的实现,Oracle有Oracle的实现...)。

 

首先去ActiveMQ官网下载,我目前版本是 ActiveMQ 5.15.0 Release

 消息队列--ActiveMQ _ Java侠

然后解压,我解压到D盘

 消息队列--ActiveMQ _ Java侠

bin目录用来启动MQ,conf是配置文件,那个jar包是MQ的包,一会要用。

打开eclipse,新建一个Java工程,把这个包复制到工程,然后build path。

消息队列--ActiveMQ _ Java侠

然后新建两个类,一个作为生产者,一个作为消费者。

生产者:

package mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

    
    public static void main(String[] args) throws Exception {
        
        //1. 建立一个ConnectionFactory. tcp://0.0.0.0:61616
        String userName = ActiveMQConnectionFactory.DEFAULT_USER;
        String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
        String brokerURL = "tcp://localhost:61616";
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, password, brokerURL);
        
        //2. 通过ConnectionFactory建立一个Connection连接,并且调用start方法开启
        Connection connection = connectionFactory.createConnection();
        connection.start();
        
        //3. 通过Connection创建Session,用于接收消息[第一个参数:是否启用事务;第二个参数:设置签收模式]
//        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//手工签收--常用
        
        //4. 通过Session创建Destination对象
        Destination destination = session.createQueue("foodQueue");
        
        //5. 通过Session创建发送或接受对象
        MessageProducer messageProducer = session.createProducer(destination);
        
        //6. 设置持久化特性或非持久化特性
        messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        
        //7. 使用JMS规范里面消息类型之一 TextMessage 来创建数据,用MessageProducer来发送
        for (int i = 0; i < 5; i++) {
            TextMessage message = session.createTextMessage();
            message.setText("大萝卜" + i);
            messageProducer.send(message);
            System.out.println("生产者:" + message.getText());
        }
        
        //使用事务要手动提交
        //session.commit();
        
        //8. 关闭连接
        connection.close();
        
    }// main
}

 

 消费者:

package mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {

public static void main(String[] args) throws Exception {
        
        //1. 建立一个ConnectionFactory. tcp://0.0.0.0:61616
        String userName = ActiveMQConnectionFactory.DEFAULT_USER;
        String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
        String brokerURL = "tcp://localhost:61616";
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, password, brokerURL);
        
        //2. 通过ConnectionFactory建立一个Connection连接,并且调用start方法开启
        Connection connection = connectionFactory.createConnection();
        connection.start();
        
        //3. 通过Connection创建Session,用于接收消息[第一个参数:是否启用事务;第二个参数:设置签收模式]
//        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        
        //4. 通过Session创建Destination对象
        Destination destination = session.createQueue("foodQueue");
        
        //5. 通过Session创建发送或接受对象
        MessageConsumer messageConsumer = session.createConsumer(destination);
        
        //7. 使用JMS规范里面消息类型之一 TextMessage 来创建数据
        while (true) {
            TextMessage message = (TextMessage) messageConsumer.receive();
            if(message == null) break;
            message.acknowledge();    //手动签收
            System.out.println("消费者:" + message.getText());
        }
        
        //8. 关闭连接
        connection.close();
        
    }// main
}

 先启动MQ

消息队列--ActiveMQ _ Java侠

 

在浏览器里输入http://localhost:8161/admin/

消息队列--ActiveMQ _ Java侠

出现这个界面就ok。

消息队列--ActiveMQ _ Java侠

然后运行Sender。

消息队列--ActiveMQ _ Java侠

这里是控制台打印的,是否真的发出去了,看MQ的管理界面。

消息队列--ActiveMQ _ Java侠

ok。然后运行receiver

消息队列--ActiveMQ _ Java侠

 

好了,五个大萝卜已经吃了,看管理界面的变化。

消息队列--ActiveMQ _ Java侠

注意:如果没有【手动签收】,MQ是不会认为客户端已经消费了的

message.acknowledge();    //手动签收

 

到此,一个hello 完成

 

作者:露娜妹
来源链接:https://www.cnblogs.com/LUA123/p/7201717.html

标签: ActiveMQ

“消息队列--ActiveMQ” 的相关文章

SpringBoot 2.x (13):整合ActiveMQ

SpringBoot 2.x (13):整合ActiveMQ

ActiveMQ5.x不多做介绍了,主要是SpringBoot的整合 特点: 1)支持来自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各种跨语言客户端和...

初识 ActiveMQ

其实算不上初识了,工作一年来一直都有接触 mq 相关的东西。但是,从来都是粘贴复制别人的配置代码,却从未认真系统的学习过它,现在线上用 mq 的项目出问题了,老板在后面拿枪指着呢,不得不...

ActiveMQ服务挂掉怎么办

ActiveMQ服务挂掉怎么办

折腾ActiveMQ时遇到的问题和解决方法: 1.先讲严重的:服务挂掉。 这得从ActiveMQ的储存机制说起。在通常的情况下,非持久化消息是存储在内存中的,持久化消...

ActiveMQ的入门demo

ActiveMQ的入门demo

  步骤:  1 :下载ActiveMQ  官网:http://activemq.apache.org/ 2 :解压AcitveMQ, 根据自己的...

ActiveMQ笔记(3):基于Networks of Brokers的HA方案

ActiveMQ笔记(3):基于Networks of Brokers的HA方案

上一篇介绍了基于ZK的ActiveMQ HA方案,虽然理解起来比较容易,但是有二个不足: 1)  占用的节点数过多,1个zk集群至少3个节点,1个ac...

SpringBoot整合ActiveMQ快速入门

SpringBoot整合ActiveMQ快速入门

Spring Boot 具有如下特性: 为基于 Spring 的开发提供更快的入门体验 开箱即用,没有代码生成,也无需 XML 配置。同时也可以修改默认值...

ActiveMQ简介

ActiveMQ简介

ActiveMQ简介 1.  什么是ActiveMQ ActiveMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩...

ActiveMQ消息队列的使用及应用

这里就不说怎么安装了,直接解压出来就行了。   谢绝转载,作者保留所有权力     目录:  一:J...

spring整合activemq

spring整合activemq

1、概述     首先和大家一起回顾一下Java 消息服务,在我之前的博客《Java消息队列-JMS概述》中,我为大家分析了: 消息服务:一...

这次真的忽略了一些ActiveMQ内心的娇艳

好久没总结了,内心有点空虚了,所以今天主要给园里的朋友们分享一点儿这几天使用ActiveMQ过程中踩过的小坑,虽然说这东西简单易用,代码几行配置也就几行,问题不大但是后果有点严重,所以就...