Mq交换机
原创
2025-03-13 06:30
6
RabbitMQ 支持四种主要类型的交换机:**Direct(直连)**、**Fanout(广播)**、Topic(主题)**和****Headers(头部)**。每种交换机的工作方式不同,适用于不同的场景。我们将通过具体的代码示例演示每种交换机的使用方法。 ### 1. Direct Exchange(直连交换机) **Direct Exchange** 通过**路由键**精确匹配队列,将消息路由到指定的队列。 场景: 生产者发送带有特定路由键的消息,交换机根据路由键将消息推送到与之匹配的队列。 示例代码: **生产者**: ```java import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class DirectProducer { private final static String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 声明一个 Direct 类型的交换机 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = "info"; // 路由键,表示消息的级别 String message = "This is an info message!"; // 发送带有路由键的消息 channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); } } } ``` **消费者**: ```java import com.rabbitmq.client.*; public class DirectConsumer { private final static String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 创建一个临时队列,并绑定到交换机,指定路由键为 info String queueName = channel.queueDeclare().getQueue(); String routingKey = "info"; channel.queueBind(queueName, EXCHANGE_NAME, routingKey); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } ``` 结果: - 生产者发送路由键为 `info` 的消息,只有绑定了 `info` 路由键的消费者能接收到。 ------ ### 2. Fanout Exchange(广播交换机) **Fanout Exchange** 会将消息广播到所有绑定到该交换机的队列,**不考虑路由键**。 场景: 生产者发送消息,所有与交换机绑定的队列都会接收到消息。 示例代码: **生产者**: ```java import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class FanoutProducer { private final static String EXCHANGE_NAME = "fanout_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 声明一个 Fanout 类型的交换机 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = "Broadcast message to all queues!"; // 发送消息,忽略路由键 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } } ``` **消费者**: ```java import com.rabbitmq.client.*; public class FanoutConsumer { private final static String EXCHANGE_NAME = "fanout_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 创建一个临时队列,并绑定到交换机 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } ``` 结果: - 每个消费者会接收到相同的广播消息。 ------ ### 3. Topic Exchange(主题交换机) **Topic Exchange** 允许使用**模式匹配**路由键,路由键可以包含通配符: - `*` 匹配一个单词。 - `#` 匹配零个或多个单词。 场景: 生产者发送带有特定模式的路由键,交换机会根据模式将消息发送到匹配的队列。 示例代码: **生产者**: ```java import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class TopicProducer { private final static String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 声明一个 Topic 类型的交换机 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = "kern.critical"; String message = "Kernel critical error!"; // 发送带有路由键的消息 channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } } } ``` **消费者**: ```java import com.rabbitmq.client.*; public class TopicConsumer { private final static String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 创建一个临时队列,并绑定到交换机,路由键模式为 "kern.*" String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "kern.*"); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } ``` 结果: - 如果生产者发送路由键 `kern.critical`,绑定 `kern.*` 的消费者会接收到消息。 ------ ### 4. Headers Exchange(头部交换机) **Headers Exchange** 不使用路由键,而是根据消息头部的属性匹配,将消息路由到队列。 示例代码: **生产者**: ```java import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.HashMap; import java.util.Map; public class HeadersProducer { private final static String EXCHANGE_NAME = "headers_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 声明一个 Headers 类型的交换机 channel.exchangeDeclare(EXCHANGE_NAME, "headers"); String message = "Message with headers!"; Map<string,> headers = new HashMap<>(); headers.put("format", "pdf"); headers.put("type", "report"); // 设置消息属性中的头部 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .headers(headers) .build(); // 发送消息 channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes("UTF-8")); System.out.println(" [x] Sent message with headers"); } } } ``` **消费者**: ```java import com.rabbitmq.client.*; import java.util.HashMap; import java.util.Map; public class HeadersConsumer { private final static String EXCHANGE_NAME = "headers_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "headers"); // 创建一个临时队列 String queueName = channel.queueDeclare().getQueue(); // 设置绑定条件,必须匹配到 headers 中的 format 为 pdf 和 type 为 report Map<string,> headers = new HashMap<>(); headers.put("format", "pdf"); headers.put("type", "report"); // 将队列与交换机绑定,并使用 headers 进行匹配 channel.queueBind(queueName, EXCHANGE_NAME, "", headers); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "' with headers: " + delivery.getProperties().getHeaders()); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } ``` ### 结果说明: 在 **Headers Exchange** 中,消费者会根据消息的头部信息进行匹配。如果生产者发送的消息头部包含 `format` 为 `pdf` 且 `type` 为 `report`,那么该消息会被路由到匹配了这些头部条件的队列。 ------ ### 总结: 1. **Direct Exchange**:根据精确的路由键匹配队列。 2. **Fanout Exchange**:广播消息到所有绑定队列,不考虑路由键。 3. **Topic Exchange**:通过模式匹配的方式根据路由键将消息发送到匹配的队列。 4. **Headers Exchange**:根据消息的头部属性匹配队列,灵活性更高。 不同的交换机适用于不同的消息分发场景,根据需求选择合适的交换机可以有效优化系统的消息推送机制。
评 论
目录