目 录CONTENT

文章目录

RabbitMQ 5种交换机使用案例

小张的探险日记
2022-12-01 / 0 评论 / 0 点赞 / 432 阅读 / 13,597 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-12-01,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

RabbitMQ 有5种交换机类型

分别为 有fanout、direct、topic、headers 这四种 加一个Default 默认的交换机

消息发送到交换机由不同类型的交换机执行不同的算法发送到不同的队列中

image.png

每种交换机具有不同的特性

image.png

Default 交换机

默认交换隐式绑定到每个队列,路由键等于队列名称。无法显式绑定到默认交换或取消绑定。它也不能被删除。

消息生产者

/**
 * 消息生产者
 */
public class MyProducer {
    private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE_DEFAULT";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 连接IP
        factory.setHost("127.0.0.1");
        // 连接端口
        factory.setPort(5673);
        // 虚拟机
        factory.setVirtualHost("/");
        // 用户
        factory.setUsername("guest");
        factory.setPassword("guest");

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        // 发送消息
        String msg = "Hello world, Rabbit MQ";

        // "" 发送到默认的交换机,直接指定 队列名称,默认交换隐式绑定到每个队列,路由键等于队列名称。无法显式绑定到默认交换或取消绑定。它也不能被删除。
        // proops 用于发送附加参数,header 会用到
        channel.basicPublish("", "SIMPLE_QUEUE2", null, msg.getBytes());

        channel.close();
        conn.close();
    }
}

消息消费者

package com.rabbit.simple.defaults;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * 消息消费者
 */
public class MyConsumer {
    private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE_DEFAULT";
    private final static String QUEUE_NAME = "SIMPLE_QUEUE_SIMPLE_EXCHANGE_DEFAULT_1";
    private final static String QUEUE_NAME2 = "SIMPLE_QUEUE_SIMPLE_EXCHANGE_DEFAULT_2";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 连接IP
        factory.setHost("127.0.0.1");
        // 默认监听端口
        factory.setPort(5673);
        // 虚拟机
        factory.setVirtualHost("/");

        // 设置访问的用户
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueDeclare(QUEUE_NAME2, false, false, false, null);

        System.out.println(" Waiting for message....");

        // 绑定队列和交换机,不需要绑定, routingkey 隐式等于 队列名称
        // 创建消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("Received message : '" + msg + "'");
                System.out.println("consumerTag : " + consumerTag );
                System.out.println("deliveryTag : " + envelope.getDeliveryTag() );
            }
        };

        // 开始获取消息
        channel.basicConsume(QUEUE_NAME, true, consumer);
        channel.basicConsume(QUEUE_NAME2, true, consumer);
    }
}



Fanout 交换机

具有广播特性,会把消息广播发送到每个队列

消息生产者

/**
 * 消息生产者
 */
public class MyProducer {
    private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE_FANOUT";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 连接IP
        factory.setHost("127.0.0.1");
        // 连接端口
        factory.setPort(5673);
        // 虚拟机
        factory.setVirtualHost("/");
        // 用户
        factory.setUsername("guest");
        factory.setPassword("guest");

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        // 发送消息
        String msg = "Hello world, Rabbit MQ";

        // String exchange, String routingKey, BasicProperties props, byte[] body
        channel.basicPublish(EXCHANGE_NAME, "hello.best", null, msg.getBytes());

        channel.close();
        conn.close();
    }
}


消费者

/**
 * 消息消费者
 */
public class MyConsumer {
    private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE_FANOUT";
    private final static String QUEUE_NAME_A = "SIMPLE_QUEUE_A";
    private final static String QUEUE_NAME_B = "SIMPLE_QUEUE_B";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 连接IP
        factory.setHost("127.0.0.1");
        // 默认监听端口
        factory.setPort(5673);
        // 虚拟机
        factory.setVirtualHost("/");

        // 设置访问的用户
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        // 声明交换机
        // String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT,false, false, null);

        // 声明队列
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME_A, false, false, false, null);

        channel.queueDeclare(QUEUE_NAME_B, false, false, false, null);

        // 绑定队列和交换机
        channel.queueBind(QUEUE_NAME_A,EXCHANGE_NAME,"hello.best");
        channel.queueBind(QUEUE_NAME_B,EXCHANGE_NAME,"hello.best3");

        System.out.println(" Waiting for message....");



        // 创建消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("Received message : '" + msg + "'");
                System.out.println("consumerTag : " + consumerTag );
                System.out.println("deliveryTag : " + envelope.getDeliveryTag() );
            }
        };

        // 开始获取消息
        // String queue, boolean autoAck, Consumer callback
        channel.basicConsume(QUEUE_NAME_A, true, consumer);
        channel.basicConsume(QUEUE_NAME_B, true, consumer);
    }
}

Direct 交换机

通过routingkey 绑定方式,发送到指定的队列

生产者

/**
 * 消息生产者
 */
public class MyProducer {
    private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE_DIRECT";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 连接IP
        factory.setHost("127.0.0.1");
        // 连接端口
        factory.setPort(5673);
        // 虚拟机
        factory.setVirtualHost("/");
        // 用户
        factory.setUsername("guest");
        factory.setPassword("guest");

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        // 发送消息
        String msg = "Hello world, Rabbit MQ";

        // String exchange, String routingKey, BasicProperties props, byte[] body
        channel.basicPublish(EXCHANGE_NAME, "hello.best123", null, msg.getBytes());

        channel.close();
        conn.close();
    }
}


消息消费者

/**
 * 消息消费者
 */
public class MyConsumer {
    private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE_DIRECT";
    private final static String QUEUE_NAME = "SIMPLE_QUEUE";
    private final static String QUEUE_NAME2 = "SIMPLE_QUEUE2";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 连接IP
        factory.setHost("127.0.0.1");
        // 默认监听端口
        factory.setPort(5673);
        // 虚拟机
        factory.setVirtualHost("/");

        // 设置访问的用户
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        // 声明交换机
        // String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT,false, false, null);

        // 声明队列
        // queue: 队列名称 durable:是否持久化  exclusive: 是否排他 autoDelete:是否自动删除 arguments:参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.queueDeclare(QUEUE_NAME2, false, false, false, null);

        System.out.println(" Waiting for message....");

        // 绑定队列和交换机
        // 表示 routingkey 为 hello.best123 时 路由到 QUEUE_NAME 队列
        // 表示 routingkey 为 hello.best 时 路由到 QUEUE_NAME2 队列
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"hello.best123");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"hello.best");

        // 创建消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("Received message : '" + msg + "'");
                System.out.println("consumerTag : " + consumerTag );
                System.out.println("deliveryTag : " + envelope.getDeliveryTag() );
            }
        };

        // 开始获取消息
        // String queue, boolean autoAck, Consumer callback
        channel.basicConsume(QUEUE_NAME, true, consumer);
        channel.basicConsume(QUEUE_NAME2, true, consumer);
    }
}



Topic 交换机

支持模糊匹配 routingkey方式 发送消息到对应的 队列
*:表示一个单词,. 分割
#:表示0个或多个单词

消息生产者

/**
 * 消息生产者
 */
public class MyProducer {
    private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE_TOPIC";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 连接IP
        factory.setHost("127.0.0.1");
        // 连接端口
        factory.setPort(5673);
        // 虚拟机
        factory.setVirtualHost("/");
        // 用户
        factory.setUsername("guest");
        factory.setPassword("guest");

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        // 发送消息
        String msg = "Hello world, Rabbit MQ";

        // String exchange, String routingKey, BasicProperties props, byte[] body
        channel.basicPublish(EXCHANGE_NAME, "ak47.abc.efg.kjh.dsadasdasda", null, msg.getBytes());

        channel.close();
        conn.close();
    }
}

消息消费者

/**
 * 消息消费者
 */
public class MyConsumer {
    private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE_TOPIC";
    private final static String QUEUE_NAME = "SIMPLE_QUEUE_TOPIC_1";
    private final static String QUEUE_NAME2 = "SIMPLE_QUEUE_TOPIC_2";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 连接IP
        factory.setHost("127.0.0.1");
        // 默认监听端口
        factory.setPort(5673);
        // 虚拟机
        factory.setVirtualHost("/");

        // 设置访问的用户
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        // 声明交换机
        // String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC,false, false, null);

        // 声明队列
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.queueDeclare(QUEUE_NAME2, false, false, false, null);

        System.out.println(" Waiting for message....");

        // 绑定队列和交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"ak47.**");

        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"#.zpr.*");
        // 创建消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("Received message : '" + msg + "'");
                System.out.println("consumerTag : " + consumerTag );
                System.out.println("deliveryTag : " + envelope.getDeliveryTag() );
            }
        };

        // 开始获取消息
        // String queue, boolean autoAck, Consumer callback
        channel.basicConsume(QUEUE_NAME, true, consumer);
        channel.basicConsume(QUEUE_NAME2, true, consumer);
    }
}



Header 交换机

支持在参数中设置 key、value,根据参数 x-match=all\any
all: 表示 所有key、value 都匹配的队列才会收到消息
any: 表示 任意一个key、value 匹配的队列才会收到消息

消息生产者

/**
 * 消息生产者
 */
public class MyProducer {
    private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE_HEADER";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 连接IP
        factory.setHost("127.0.0.1");
        // 连接端口
        factory.setPort(5673);
        // 虚拟机
        factory.setVirtualHost("/");
        // 用户
        factory.setUsername("guest");
        factory.setPassword("guest");

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        // 发送消息
        String msg = "Hello world, Rabbit MQ";

        // 首部消息
        Map<String, Object> headersMap = new HashMap<String, Object>();
        headersMap.put("api", "JDK1.9");
        headersMap.put("version", 1);
        headersMap.put("dataType", "json");

        // 生成发送首部消息的属性
        AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().headers(headersMap);

        // String exchange, String routingKey, BasicProperties props, byte[] body
        channel.basicPublish(EXCHANGE_NAME, "", properties.build(), msg.getBytes());

        channel.close();
        conn.close();
    }
}



消息消费者

/**
 * 消息消费者
 */
public class MyConsumer {
    private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE_HEADER";
    private final static String QUEUE_NAME = "SIMPLE_QUEUE_HEADER_3";
    private final static String QUEUE_NAME2 = "SIMPLE_QUEUE_HEADER_4";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 连接IP
        factory.setHost("127.0.0.1");
        // 默认监听端口
        factory.setPort(5673);
        // 虚拟机
        factory.setVirtualHost("/");

        // 设置访问的用户
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        // 声明交换机
        // String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.HEADERS,false, false, null);

        // 声明队列
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.queueDeclare(QUEUE_NAME2, false, false, false, null);

        final Map<String, Object> arguments = new HashMap<String, Object>();
        // "x-match", "any" 代表:仅匹配一个键(any)就可以收到消息, "all" 代表:匹配所有键才可以收到消息
        arguments.put("x-match", "all");
        arguments.put("api", "JDK1.9");
        arguments.put("version", 1);
        arguments.put("dataType", "json");

        System.out.println(" Waiting for message....");

        // 绑定队列和交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"",arguments);

        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"",arguments);
        // 创建消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("Received message : '" + msg + "'");
                System.out.println("consumerTag : " + consumerTag );
                System.out.println("deliveryTag : " + envelope.getDeliveryTag() );
            }
        };

        // 开始获取消息
        // String queue, boolean autoAck, Consumer callback
        channel.basicConsume(QUEUE_NAME, true, consumer);
        channel.basicConsume(QUEUE_NAME2, true, consumer);
    }
}

0

评论区