消息中间件MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。
在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而通过这种异步处理的方式可大量节省服务器请求响应时间,从而提高系统的吞吐量。
具体应用场景如下:
①任务异步处理
将不需要同步处理的并且耗时较长的操作由消息队列通知消息接收方进行异步处理。提高了应用响应时间。
②应用程序解耦合
MQ相当于一个中介,Producer通过MQ与Consumer交互,与应用程序进行解耦合。
③削峰填谷
如应用在高峰期时,并发量会突然激增,消息被MQ进行中间步骤处理,按服务器性能进行消费处理,降低在高峰期时与数据库写入操作瞬时频率,消费消息维持在一段时间内直至消费完剩余消息。
实现MQ的大致有两种主流方式:AMQP、JMS:
AMQP:AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。
JMS:JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
RabbitMQ官方地址:rabbitmq.com
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
RabbitMQ提供了7种模式:简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式以及Publisher Confirms 发布者确认模式;
官网对应模式介绍:rabbitmq.com/getstarted.html
RabbitMQ 基础架构
RabbitMQ是AMQP协议的Erlang的实现。
| 概念 | 说明 |
| -------------- | ------------------------------------------------------------ |
| 连接Connection | 一个网络连接,比如TCP/IP套接字连接。 |
| 会话Session | 端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”。 |
| 信道Channel | 多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。 |
| 客户端Client | AMQP连接或者会话的发起者。AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。 |
| 服务节点Broker | 消息中间件的服务节点;一般情况下可以将一个RabbitMQ Broker看作一台RabbitMQ 服务器。 |
| 端点 | AMQP对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)。 |
| 消费者Consumer | 一个从消息队列里请求消息的客户端程序。 |
| 生产者Producer | 一个向交换机发布消息的客户端应用程序。 |
[collapse title="生产者流转过程说明"]
转载自https://blog.csdn.net/weixin_44009447/article/details/110148697
1. 客户端与代理服务器Broker建立连接。会调用newConnection() 方法,这个方法会进一步封装Protocol Header 0-9-1 的报文头发送给Broker ,以此通知Broker 本次交互采用的是AMQPO-9-1 协议,紧接着Broker 返回Connection.Start 来建立连接,在连接的过程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 这6 个命令的交互。
2. 客户端调用connection.createChannel方法。此方法开启信道,其包装的channel.open命令发送给Broker,等待channel.basicPublish方法,对应的AMQP命令为Basic.Publish,这个命令包含了content Header 和content Body()。content Header 包含了消息体的属性,例如:投递模式,优先级等,content Body 包含了消息体本身。
3. 客户端发送完消息需要关闭资源时,涉及到Channel.Close和Channl.Close-Ok 与Connetion.Close和Connection.Close-Ok的命令交互。
[/collapse]
[collapse title="消费者流转过程说明"]
转载自https://blog.csdn.net/weixin_44009447/article/details/110148697
1. 消费者客户端与代理服务器Broker建立连接。会调用newConnection() 方法,这个方法会进一步封装Protocol Header 0-9-1 的报文头发送给Broker ,以此通知Broker 本次交互采用的是AMQPO-9-1 协议,紧接着Broker 返回Connection.Start 来建立连接,在连接的过程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 这6 个命令的交互。
2. 消费者客户端调用connection.createChannel方法。和生产者客户端一样,协议涉及Channel . Open/Open-Ok命令。
3. 在真正消费之前,消费者客户端需要向Broker 发送Basic.Consume 命令(即调用channel.basicConsume 方法〉将Channel 置为接收模式,之后Broker 回执Basic . Consume - Ok 以告诉消费者客户端准备好消费消息。
4. Broker 向消费者客户端推送(Push) 消息,即Basic.Deliver 命令,这个命令和Basic.Publish 命令一样会携带Content Header 和Content Body。
5. 消费者接收到消息并正确消费之后,向Broker 发送确认,即Basic.Ack 命令。
6. 客户端发送完消息需要关闭资源时,涉及到Channel.Close和Channl.Close-Ok 与Connetion.Close和Connection.Close-Ok的命令交互。
[/collapse]
[collapse title="Hello World!"]
package com.example.demo03;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/12/6
* Simplest
*
*/
public class producer {
private final static String QUEUE_NAME = "test_queue_02";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//参数配置
connectionFactory.setHost("test_node_rmq_01");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin123");
connectionFactory.setVirtualHost("/test_virtual_hosts_01");
//获取链接
Connection connection = connectionFactory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列Queue
/**
* @param queue 队列名称
* @param durable 是否持久化
* @param exclusive 是否独占 只能有一个consumer进行监听 当connection关闭时是否删除队列
* @param autoDelete 是否自动删除 当无consumer时 自动删除掉
* @param arguments rabbitmq参数信息
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "test_hello_world";
/**
* @param exchange 交换机名称
* @param routingKey 路由名称
* @param props 配置消息
* @param body the message body
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
channel.close();
connection.close();
}
}
package com.example.demo03;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/12/6
*/
public class consumer {
private final static String QUEUE_NAME = "test_queue_02";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//参数配置
connectionFactory.setHost("test_node_rmq_01");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin123");
connectionFactory.setVirtualHost("/test_virtual_hosts_01");
//获取链接
Connection connection = connectionFactory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列Queue
/**
* @param queue 队列名称
* @param durable 是否持久化
* @param exclusive 是否独占 只能有一个consumer进行监听 当connection关闭时是否删除队列
* @param autoDelete 是否自动删除 当无consumer时 自动删除掉
* @param arguments rabbitmq参数信息
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
* @param queue the name of the queue
* @param autoAck true if the server should consider messages
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param callback 回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
/**
* 回调方法 收到消息,自动执行该方法
* @param consumerTag 标识
* @param envelope 获取交换机信息 路由key等
* @param properties 配置信息
* @param body 数据信息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
System.out.println("body:"+new String(body, "utf-8"));
}
};
channel.basicConsume(QUEUE_NAME,true, consumer);
}
}
[/collapse]
[collapse title="WorkQueues"]
package com.example.demo04;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/12/6
* WorkQueues
* -> Consumer1
* Producer -> Queue ->
* -> Consumer2
*/
public class producer {
private final static String QUEUE_NAME = "test_queue_03_workqueues";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//参数配置
connectionFactory.setHost("test_node_rmq_01");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin123");
connectionFactory.setVirtualHost("/test_virtual_hosts_01");
//获取链接
Connection connection = connectionFactory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列Queue
/**
* @param queue 队列名称
* @param durable 是否持久化
* @param exclusive 是否独占 只能有一个consumer进行监听 当connection关闭时是否删除队列
* @param autoDelete 是否自动删除 当无consumer时 自动删除掉
* @param arguments rabbitmq参数信息
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
* @param exchange 交换机名称
* @param routingKey 路由名称
* @param props 配置消息
* @param body the message body
*/
for (int i = 1; i <= 10; i++) {
String message = "test_hello_world_"+i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
} channel.close();
connection.close();
}
}
package com.example.demo04;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/12/6
*/
public class consumer01 {
private final static String QUEUE_NAME = "test_queue_03_workqueues";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//参数配置
connectionFactory.setHost("test_node_rmq_01");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin123");
connectionFactory.setVirtualHost("/test_virtual_hosts_01");
//获取链接
Connection connection = connectionFactory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列Queue
/**
* @param queue 队列名称
* @param durable 是否持久化
* @param exclusive 是否独占 只能有一个consumer进行监听 当connection关闭时是否删除队列
* @param autoDelete 是否自动删除 当无consumer时 自动删除掉
* @param arguments rabbitmq参数信息
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
* @param queue the name of the queue
* @param autoAck true if the server should consider messages
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param callback 回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
/**
* 回调方法 收到消息,自动执行该方法
* @param consumerTag 标识
* @param envelope 获取交换机信息 路由key等
* @param properties 配置信息
* @param body 数据信息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
System.out.println("body:"+new String(body, "utf-8"));
}
};
channel.basicConsume(QUEUE_NAME,true, consumer);
}
}
package com.example.demo04;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/12/6
*/
public class consumer02 {
private final static String QUEUE_NAME = "test_queue_03_workqueues";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//参数配置
connectionFactory.setHost("test_node_rmq_01");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin123");
connectionFactory.setVirtualHost("/test_virtual_hosts_01");
//获取链接
Connection connection = connectionFactory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列Queue
/**
* @param queue 队列名称
* @param durable 是否持久化
* @param exclusive 是否独占 只能有一个consumer进行监听 当connection关闭时是否删除队列
* @param autoDelete 是否自动删除 当无consumer时 自动删除掉
* @param arguments rabbitmq参数信息
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
* @param queue the name of the queue
* @param autoAck true if the server should consider messages
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param callback 回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
/**
* 回调方法 收到消息,自动执行该方法
* @param consumerTag 标识
* @param envelope 获取交换机信息 路由key等
* @param properties 配置信息
* @param body 数据信息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
System.out.println("body:"+new String(body, "utf-8"));
}
};
channel.basicConsume(QUEUE_NAME,true, consumer);
}
}
[/collapse]
[collapse title="Publish/Subscribe"]
package com.example.demo05;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/12/6
* PubSub
* -> Consumer1
* Producer -> ExChange -> Queue ->
* -> Consumer2
*/
public class producer {
private final static String QUEUE_NAME1 = "test_queue_04_fanout";
private final static String QUEUE_NAME2 = "test_queue_05_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//参数配置
connectionFactory.setHost("test_node_rmq_01");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin123");
connectionFactory.setVirtualHost("/test_virtual_hosts_01");
//获取链接
Connection connection = connectionFactory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建交换机&队列Queue
/**
* @param exchange the name of the exchange
* @param type the exchange type 交换机类型 direct(定向)| fanout(扇形 发送消息到每一个绑定的队列)| topic 通配符形式 | headers 参数匹配
* @param durable 是否持久化 (the exchange will survive a server restart)
* @param autoDelete 是否自动删除
* @param internal 内部使用 一般false
* @param arguments other properties (construction arguments) for the exchange
*/
String exchangeName = "test_exchange_01";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
//绑定队列和交换机
/**
* @param queue the name of the queue
* @param exchange the name of the exchange
* @param routingKey 路由键 绑定规则 | 如规则为fanout 则routingKey为”“
* @param arguments other properties (binding parameters)
*/
channel.queueBind(QUEUE_NAME1,exchangeName,"");
channel.queueBind(QUEUE_NAME2,exchangeName,"");
//发送消息 释放资源
channel.basicPublish(exchangeName, "", null,"test_messaget".getBytes());
channel.close();
connection.close();
}
}
package com.example.demo05;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/12/6
*/
public class consumer01 {
private final static String QUEUE_NAME1 = "test_queue_04_fanout";
private final static String QUEUE_NAME2 = "test_queue_05_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//参数配置
connectionFactory.setHost("test_node_rmq_01");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin123");
connectionFactory.setVirtualHost("/test_virtual_hosts_01");
//获取链接
Connection connection = connectionFactory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列Queue
/**
* @param queue 队列名称
* @param durable 是否持久化
* @param exclusive 是否独占 只能有一个consumer进行监听 当connection关闭时是否删除队列
* @param autoDelete 是否自动删除 当无consumer时 自动删除掉
* @param arguments rabbitmq参数信息
*/
channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
/**
* @param queue the name of the queue
* @param autoAck true if the server should consider messages
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param callback 回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
/**
* 回调方法 收到消息,自动执行该方法
* @param consumerTag 标识
* @param envelope 获取交换机信息 路由key等
* @param properties 配置信息
* @param body 数据信息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
System.out.println("[x] received :"+new String(body, "utf-8"));
}
};
channel.basicConsume(QUEUE_NAME1,true, consumer);
}
}
package com.example.demo05;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/12/6
*/
public class consumer02 {
private final static String QUEUE_NAME1 = "test_queue_04_fanout";
private final static String QUEUE_NAME2 = "test_queue_05_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//参数配置
connectionFactory.setHost("test_node_rmq_01");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin123");
connectionFactory.setVirtualHost("/test_virtual_hosts_01");
//获取链接
Connection connection = connectionFactory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列Queue
/**
* @param queue 队列名称
* @param durable 是否持久化
* @param exclusive 是否独占 只能有一个consumer进行监听 当connection关闭时是否删除队列
* @param autoDelete 是否自动删除 当无consumer时 自动删除掉
* @param arguments rabbitmq参数信息
*/
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
/**
* @param queue the name of the queue
* @param autoAck true if the server should consider messages
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param callback 回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
/**
* 回调方法 收到消息,自动执行该方法
* @param consumerTag 标识
* @param envelope 获取交换机信息 路由key等
* @param properties 配置信息
* @param body 数据信息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
System.out.println("[x] received :"+new String(body, "utf-8"));
}
};
channel.basicConsume(QUEUE_NAME2,true, consumer);
}
}
[/collapse]
[collapse title="Routing"]
package com.example.demo06;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/12/6
* direct
* -> 对应routingKey Consumer1
* Producer -> 指定routingKey -> ExChange -> Queue ->
* -> 对应routingKey Consumer2
*
* -> 对应routingKey Consumer2
*/
public class producer {
private final static String QUEUE_NAME1 = "test_queue_06_direct";
private final static String QUEUE_NAME2 = "test_queue_07_direct";
private final static String QUEUE_NAME3 = "test_queue_08_direct";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//参数配置
connectionFactory.setHost("test_node_rmq_01");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin123");
connectionFactory.setVirtualHost("/test_virtual_hosts_01");
//获取链接
Connection connection = connectionFactory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建交换机&队列Queue
/**
* @param exchange the name of the exchange
* @param type the exchange type 交换机类型 direct(定向)| fanout(扇形 发送消息到每一个绑定的队列)| topic 通配符形式 | headers 参数匹配
* @param durable 是否持久化 (the exchange will survive a server restart)
* @param autoDelete 是否自动删除
* @param internal 内部使用 一般false
* @param arguments other properties (construction arguments) for the exchange
*/
String exchangeName = "test_exchange_02";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
channel.queueDeclare(QUEUE_NAME3, false, false, false, null);
//绑定队列和交换机
/**
* @param queue the name of the queue
* @param exchange the name of the exchange
* @param routingKey 路由键 绑定规则 | 如规则为fanout 则routingKey为”“
* @param arguments other properties (binding parameters)
*/
channel.queueBind(QUEUE_NAME1,exchangeName,"info");
channel.queueBind(QUEUE_NAME2,exchangeName,"warning");
channel.queueBind(QUEUE_NAME3,exchangeName,"error");
//发送消息 释放资源
channel.basicPublish(exchangeName, "info", null,"test_messaget_info".getBytes());
channel.basicPublish(exchangeName, "warning", null,"test_messaget_warning".getBytes());
channel.basicPublish(exchangeName, "error", null,"test_messaget_error".getBytes());
channel.close();
connection.close();
}
}
package com.example.demo06;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/12/6
*/
public class consumer01 {
private final static String QUEUE_NAME1 = "test_queue_06_direct";
private final static String QUEUE_NAME2 = "test_queue_07_direct";
private final static String QUEUE_NAME3 = "test_queue_08_direct";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//参数配置
connectionFactory.setHost("test_node_rmq_01");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin123");
connectionFactory.setVirtualHost("/test_virtual_hosts_01");
//获取链接
Connection connection = connectionFactory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列Queue
/**
* @param queue 队列名称
* @param durable 是否持久化
* @param exclusive 是否独占 只能有一个consumer进行监听 当connection关闭时是否删除队列
* @param autoDelete 是否自动删除 当无consumer时 自动删除掉
* @param arguments rabbitmq参数信息
*/
channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
/**
* @param queue the name of the queue
* @param autoAck true if the server should consider messages
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param callback 回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
/**
* 回调方法 收到消息,自动执行该方法
* @param consumerTag 标识
* @param envelope 获取交换机信息 路由key等
* @param properties 配置信息
* @param body 数据信息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
System.out.println("[x] received :"+new String(body, "utf-8"));
}
};
channel.basicConsume(QUEUE_NAME1,true, consumer);
}
}
package com.example.demo06;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/12/6
*/
public class consumer02 {
private final static String QUEUE_NAME1 = "test_queue_06_direct";
private final static String QUEUE_NAME2 = "test_queue_07_direct";
private final static String QUEUE_NAME3 = "test_queue_08_direct";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//参数配置
connectionFactory.setHost("test_node_rmq_01");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin123");
connectionFactory.setVirtualHost("/test_virtual_hosts_01");
//获取链接
Connection connection = connectionFactory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列Queue
/**
* @param queue 队列名称
* @param durable 是否持久化
* @param exclusive 是否独占 只能有一个consumer进行监听 当connection关闭时是否删除队列
* @param autoDelete 是否自动删除 当无consumer时 自动删除掉
* @param arguments rabbitmq参数信息
*/
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
/**
* @param queue the name of the queue
* @param autoAck true if the server should consider messages
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param callback 回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
/**
* 回调方法 收到消息,自动执行该方法
* @param consumerTag 标识
* @param envelope 获取交换机信息 路由key等
* @param properties 配置信息
* @param body 数据信息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
System.out.println("[x] received :"+new String(body, "utf-8"));
}
};
channel.basicConsume(QUEUE_NAME2,true, consumer);
}
}
package com.example.demo06;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/12/6
*/
public class consumer03 {
private final static String QUEUE_NAME1 = "test_queue_06_direct";
private final static String QUEUE_NAME2 = "test_queue_07_direct";
private final static String QUEUE_NAME3 = "test_queue_08_direct";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//参数配置
connectionFactory.setHost("test_node_rmq_01");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin123");
connectionFactory.setVirtualHost("/test_virtual_hosts_01");
//获取链接
Connection connection = connectionFactory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列Queue
/**
* @param queue 队列名称
* @param durable 是否持久化
* @param exclusive 是否独占 只能有一个consumer进行监听 当connection关闭时是否删除队列
* @param autoDelete 是否自动删除 当无consumer时 自动删除掉
* @param arguments rabbitmq参数信息
*/
channel.queueDeclare(QUEUE_NAME3, false, false, false, null);
/**
* @param queue the name of the queue
* @param autoAck true if the server should consider messages
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param callback 回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
/**
* 回调方法 收到消息,自动执行该方法
* @param consumerTag 标识
* @param envelope 获取交换机信息 路由key等
* @param properties 配置信息
* @param body 数据信息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
System.out.println("[x] received :"+new String(body, "utf-8"));
}
};
channel.basicConsume(QUEUE_NAME3,true, consumer);
}
}
[/collapse]
[collapse title="Topics"]
package com.example.demo07;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/12/6
* Topics
* -> 对应通配符Consumer1
* Producer -> 指定通配符 -> ExChange -> Queue ->
* -> 对应通配符Consumer2
*
*/
public class producer {
private final static String QUEUE_NAME1 = "test_queue_09_topic";
private final static String QUEUE_NAME2 = "test_queue_10_topic";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//参数配置
connectionFactory.setHost("test_node_rmq_01");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin123");
connectionFactory.setVirtualHost("/test_virtual_hosts_01");
//获取链接
Connection connection = connectionFactory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建交换机&队列Queue
/**
* @param exchange the name of the exchange
* @param type the exchange type 交换机类型 direct(定向)| fanout(扇形 发送消息到每一个绑定的队列)| topic 通配符形式 | headers 参数匹配
* @param durable 是否持久化 (the exchange will survive a server restart)
* @param autoDelete 是否自动删除
* @param internal 内部使用 一般false
* @param arguments other properties (construction arguments) for the exchange
*/
String exchangeName = "test_exchange_03";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
//绑定队列和交换机
/**
* @param queue the name of the queue
* @param exchange the name of the exchange
* @param routingKey 路由键 绑定规则 | 如规则为fanout 则routingKey为”“
* @param arguments other properties (binding parameters)
*/
channel.queueBind(QUEUE_NAME1,exchangeName,"#.#");
channel.queueBind(QUEUE_NAME2,exchangeName,"info.*");
//发送消息 释放资源
channel.basicPublish(exchangeName, "test.rua", null,"test_message_01".getBytes());
channel.basicPublish(exchangeName, "info.01", null,"test_message_02".getBytes());
channel.close();
connection.close();
}
}
package com.example.demo07;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/12/6
*/
public class consumer01 {
private final static String QUEUE_NAME1 = "test_queue_09_topic";
private final static String QUEUE_NAME2 = "test_queue_10_topic";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//参数配置
connectionFactory.setHost("test_node_rmq_01");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin123");
connectionFactory.setVirtualHost("/test_virtual_hosts_01");
//获取链接
Connection connection = connectionFactory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列Queue
/**
* @param queue 队列名称
* @param durable 是否持久化
* @param exclusive 是否独占 只能有一个consumer进行监听 当connection关闭时是否删除队列
* @param autoDelete 是否自动删除 当无consumer时 自动删除掉
* @param arguments rabbitmq参数信息
*/
channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
/**
* @param queue the name of the queue
* @param autoAck true if the server should consider messages
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param callback 回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
/**
* 回调方法 收到消息,自动执行该方法
* @param consumerTag 标识
* @param envelope 获取交换机信息 路由key等
* @param properties 配置信息
* @param body 数据信息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("[x] received :"+new String(body, "utf-8"));
}
};
channel.basicConsume(QUEUE_NAME1,true, consumer);
}
}
package com.example.demo07;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/12/6
*/
public class consumer02 {
private final static String QUEUE_NAME1 = "test_queue_09_topic";
private final static String QUEUE_NAME2 = "test_queue_10_topic";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//参数配置
connectionFactory.setHost("test_node_rmq_01");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin123");
connectionFactory.setVirtualHost("/test_virtual_hosts_01");
//获取链接
Connection connection = connectionFactory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列Queue
/**
* @param queue 队列名称
* @param durable 是否持久化
* @param exclusive 是否独占 只能有一个consumer进行监听 当connection关闭时是否删除队列
* @param autoDelete 是否自动删除 当无consumer时 自动删除掉
* @param arguments rabbitmq参数信息
*/
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
/**
* @param queue the name of the queue
* @param autoAck true if the server should consider messages
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param callback 回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
/**
* 回调方法 收到消息,自动执行该方法
* @param consumerTag 标识
* @param envelope 获取交换机信息 路由key等
* @param properties 配置信息
* @param body 数据信息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("[x] received :"+new String(body, "utf-8"));
}
};
channel.basicConsume(QUEUE_NAME2,true, consumer);
}
}
[/collapse]
更新中
文章评论