RabbitMQ 有5种交换机类型
分别为 有fanout、direct、topic、headers 这四种 加一个Default 默认的交换机
消息发送到交换机由不同类型的交换机执行不同的算法发送到不同的队列中
每种交换机具有不同的特性
Default 交换机
默认交换隐式绑定到每个队列,路由键等于队列名称。无法显式绑定到默认交换或取消绑定。它也不能被删除。
消息生产者
/**
* 消息生产者
*/
public class MyProducer {
private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE_DEFAULT";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 连接IP
factory.setHost("127.0.0.1");
// 连接端口
factory.setPort(5673);
// 虚拟机
factory.setVirtualHost("/");
// 用户
factory.setUsername("guest");
factory.setPassword("guest");
// 建立连接
Connection conn = factory.newConnection();
// 创建消息通道
Channel channel = conn.createChannel();
// 发送消息
String msg = "Hello world, Rabbit MQ";
// "" 发送到默认的交换机,直接指定 队列名称,默认交换隐式绑定到每个队列,路由键等于队列名称。无法显式绑定到默认交换或取消绑定。它也不能被删除。
// proops 用于发送附加参数,header 会用到
channel.basicPublish("", "SIMPLE_QUEUE2", null, msg.getBytes());
channel.close();
conn.close();
}
}
消息消费者
package com.rabbit.simple.defaults;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* 消息消费者
*/
public class MyConsumer {
private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE_DEFAULT";
private final static String QUEUE_NAME = "SIMPLE_QUEUE_SIMPLE_EXCHANGE_DEFAULT_1";
private final static String QUEUE_NAME2 = "SIMPLE_QUEUE_SIMPLE_EXCHANGE_DEFAULT_2";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 连接IP
factory.setHost("127.0.0.1");
// 默认监听端口
factory.setPort(5673);
// 虚拟机
factory.setVirtualHost("/");
// 设置访问的用户
factory.setUsername("guest");
factory.setPassword("guest");
// 建立连接
Connection conn = factory.newConnection();
// 创建消息通道
Channel channel = conn.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
System.out.println(" Waiting for message....");
// 绑定队列和交换机,不需要绑定, routingkey 隐式等于 队列名称
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("Received message : '" + msg + "'");
System.out.println("consumerTag : " + consumerTag );
System.out.println("deliveryTag : " + envelope.getDeliveryTag() );
}
};
// 开始获取消息
channel.basicConsume(QUEUE_NAME, true, consumer);
channel.basicConsume(QUEUE_NAME2, true, consumer);
}
}
Fanout 交换机
具有广播特性,会把消息广播发送到每个队列
消息生产者
/**
* 消息生产者
*/
public class MyProducer {
private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE_FANOUT";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 连接IP
factory.setHost("127.0.0.1");
// 连接端口
factory.setPort(5673);
// 虚拟机
factory.setVirtualHost("/");
// 用户
factory.setUsername("guest");
factory.setPassword("guest");
// 建立连接
Connection conn = factory.newConnection();
// 创建消息通道
Channel channel = conn.createChannel();
// 发送消息
String msg = "Hello world, Rabbit MQ";
// String exchange, String routingKey, BasicProperties props, byte[] body
channel.basicPublish(EXCHANGE_NAME, "hello.best", null, msg.getBytes());
channel.close();
conn.close();
}
}
消费者
/**
* 消息消费者
*/
public class MyConsumer {
private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE_FANOUT";
private final static String QUEUE_NAME_A = "SIMPLE_QUEUE_A";
private final static String QUEUE_NAME_B = "SIMPLE_QUEUE_B";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 连接IP
factory.setHost("127.0.0.1");
// 默认监听端口
factory.setPort(5673);
// 虚拟机
factory.setVirtualHost("/");
// 设置访问的用户
factory.setUsername("guest");
factory.setPassword("guest");
// 建立连接
Connection conn = factory.newConnection();
// 创建消息通道
Channel channel = conn.createChannel();
// 声明交换机
// String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT,false, false, null);
// 声明队列
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME_A, false, false, false, null);
channel.queueDeclare(QUEUE_NAME_B, false, false, false, null);
// 绑定队列和交换机
channel.queueBind(QUEUE_NAME_A,EXCHANGE_NAME,"hello.best");
channel.queueBind(QUEUE_NAME_B,EXCHANGE_NAME,"hello.best3");
System.out.println(" Waiting for message....");
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("Received message : '" + msg + "'");
System.out.println("consumerTag : " + consumerTag );
System.out.println("deliveryTag : " + envelope.getDeliveryTag() );
}
};
// 开始获取消息
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME_A, true, consumer);
channel.basicConsume(QUEUE_NAME_B, true, consumer);
}
}
Direct 交换机
通过routingkey 绑定方式,发送到指定的队列
生产者
/**
* 消息生产者
*/
public class MyProducer {
private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE_DIRECT";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 连接IP
factory.setHost("127.0.0.1");
// 连接端口
factory.setPort(5673);
// 虚拟机
factory.setVirtualHost("/");
// 用户
factory.setUsername("guest");
factory.setPassword("guest");
// 建立连接
Connection conn = factory.newConnection();
// 创建消息通道
Channel channel = conn.createChannel();
// 发送消息
String msg = "Hello world, Rabbit MQ";
// String exchange, String routingKey, BasicProperties props, byte[] body
channel.basicPublish(EXCHANGE_NAME, "hello.best123", null, msg.getBytes());
channel.close();
conn.close();
}
}
消息消费者
/**
* 消息消费者
*/
public class MyConsumer {
private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE_DIRECT";
private final static String QUEUE_NAME = "SIMPLE_QUEUE";
private final static String QUEUE_NAME2 = "SIMPLE_QUEUE2";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 连接IP
factory.setHost("127.0.0.1");
// 默认监听端口
factory.setPort(5673);
// 虚拟机
factory.setVirtualHost("/");
// 设置访问的用户
factory.setUsername("guest");
factory.setPassword("guest");
// 建立连接
Connection conn = factory.newConnection();
// 创建消息通道
Channel channel = conn.createChannel();
// 声明交换机
// String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT,false, false, null);
// 声明队列
// queue: 队列名称 durable:是否持久化 exclusive: 是否排他 autoDelete:是否自动删除 arguments:参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
System.out.println(" Waiting for message....");
// 绑定队列和交换机
// 表示 routingkey 为 hello.best123 时 路由到 QUEUE_NAME 队列
// 表示 routingkey 为 hello.best 时 路由到 QUEUE_NAME2 队列
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"hello.best123");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"hello.best");
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("Received message : '" + msg + "'");
System.out.println("consumerTag : " + consumerTag );
System.out.println("deliveryTag : " + envelope.getDeliveryTag() );
}
};
// 开始获取消息
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, true, consumer);
channel.basicConsume(QUEUE_NAME2, true, consumer);
}
}
Topic 交换机
支持模糊匹配 routingkey方式 发送消息到对应的 队列
*:表示一个单词,. 分割
#:表示0个或多个单词
消息生产者
/**
* 消息生产者
*/
public class MyProducer {
private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE_TOPIC";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 连接IP
factory.setHost("127.0.0.1");
// 连接端口
factory.setPort(5673);
// 虚拟机
factory.setVirtualHost("/");
// 用户
factory.setUsername("guest");
factory.setPassword("guest");
// 建立连接
Connection conn = factory.newConnection();
// 创建消息通道
Channel channel = conn.createChannel();
// 发送消息
String msg = "Hello world, Rabbit MQ";
// String exchange, String routingKey, BasicProperties props, byte[] body
channel.basicPublish(EXCHANGE_NAME, "ak47.abc.efg.kjh.dsadasdasda", null, msg.getBytes());
channel.close();
conn.close();
}
}
消息消费者
/**
* 消息消费者
*/
public class MyConsumer {
private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE_TOPIC";
private final static String QUEUE_NAME = "SIMPLE_QUEUE_TOPIC_1";
private final static String QUEUE_NAME2 = "SIMPLE_QUEUE_TOPIC_2";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 连接IP
factory.setHost("127.0.0.1");
// 默认监听端口
factory.setPort(5673);
// 虚拟机
factory.setVirtualHost("/");
// 设置访问的用户
factory.setUsername("guest");
factory.setPassword("guest");
// 建立连接
Connection conn = factory.newConnection();
// 创建消息通道
Channel channel = conn.createChannel();
// 声明交换机
// String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC,false, false, null);
// 声明队列
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
System.out.println(" Waiting for message....");
// 绑定队列和交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"ak47.**");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"#.zpr.*");
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("Received message : '" + msg + "'");
System.out.println("consumerTag : " + consumerTag );
System.out.println("deliveryTag : " + envelope.getDeliveryTag() );
}
};
// 开始获取消息
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, true, consumer);
channel.basicConsume(QUEUE_NAME2, true, consumer);
}
}
Header 交换机
支持在参数中设置 key、value,根据参数 x-match=all\any
all: 表示 所有key、value 都匹配的队列才会收到消息
any: 表示 任意一个key、value 匹配的队列才会收到消息
消息生产者
/**
* 消息生产者
*/
public class MyProducer {
private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE_HEADER";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 连接IP
factory.setHost("127.0.0.1");
// 连接端口
factory.setPort(5673);
// 虚拟机
factory.setVirtualHost("/");
// 用户
factory.setUsername("guest");
factory.setPassword("guest");
// 建立连接
Connection conn = factory.newConnection();
// 创建消息通道
Channel channel = conn.createChannel();
// 发送消息
String msg = "Hello world, Rabbit MQ";
// 首部消息
Map<String, Object> headersMap = new HashMap<String, Object>();
headersMap.put("api", "JDK1.9");
headersMap.put("version", 1);
headersMap.put("dataType", "json");
// 生成发送首部消息的属性
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().headers(headersMap);
// String exchange, String routingKey, BasicProperties props, byte[] body
channel.basicPublish(EXCHANGE_NAME, "", properties.build(), msg.getBytes());
channel.close();
conn.close();
}
}
消息消费者
/**
* 消息消费者
*/
public class MyConsumer {
private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE_HEADER";
private final static String QUEUE_NAME = "SIMPLE_QUEUE_HEADER_3";
private final static String QUEUE_NAME2 = "SIMPLE_QUEUE_HEADER_4";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 连接IP
factory.setHost("127.0.0.1");
// 默认监听端口
factory.setPort(5673);
// 虚拟机
factory.setVirtualHost("/");
// 设置访问的用户
factory.setUsername("guest");
factory.setPassword("guest");
// 建立连接
Connection conn = factory.newConnection();
// 创建消息通道
Channel channel = conn.createChannel();
// 声明交换机
// String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.HEADERS,false, false, null);
// 声明队列
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
final Map<String, Object> arguments = new HashMap<String, Object>();
// "x-match", "any" 代表:仅匹配一个键(any)就可以收到消息, "all" 代表:匹配所有键才可以收到消息
arguments.put("x-match", "all");
arguments.put("api", "JDK1.9");
arguments.put("version", 1);
arguments.put("dataType", "json");
System.out.println(" Waiting for message....");
// 绑定队列和交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"",arguments);
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"",arguments);
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("Received message : '" + msg + "'");
System.out.println("consumerTag : " + consumerTag );
System.out.println("deliveryTag : " + envelope.getDeliveryTag() );
}
};
// 开始获取消息
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, true, consumer);
channel.basicConsume(QUEUE_NAME2, true, consumer);
}
}
评论区