`
yugouai
  • 浏览: 492368 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

rabbitmq入门-路由

 
阅读更多

创建过绑定(bindings),代码如下:

 

channel.queueBind(queueName, EXCHANGE_NAME, "");

绑定(binding)是指交换器(exchange)和队列(queue)的关系。可以简单理解为:这个队列(queue)对这个交换器(exchange)的消息感兴趣。

 

 

绑定的时候可以带上一个额外的routing_key参数。为了避免与basic_publish的参数混淆,我们把它叫做binding key。以下是如何创建一个带binding key的绑定。

 

channel.queueBind(queueName, EXCHANGE_NAME, "error")

 binding key的含义取决于交换器(exchange)的类型。

 

 

Direct类型的交换器(exchange)

使用的fanout类型的交换器(exchange)扩展性不够——它能做的仅仅是广播。

 

使用direct类型的交换器(exchange)来代替。路由的算法很简单——交换器将会对binding key和routing key进行精确匹配,从而确定消息该分发到哪个队列。

 

多个绑定(Multiple bindings)

多个队列使用相同的binding key是合法的,即一条消息能发送到多个queue

 

发送消息到一个direct exchange,把日志级别作为routing key

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

 发送一条消息

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

 

订阅(Subscribing)

处理接收消息的方式和之前差不多,但是我们为每一个日志级别创建了一个新的绑定

String queueName = channel.queueDeclare().getQueue();

for (String severity : argv) {
	channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

 

代码整合

生产者

package com.duowan.rabbit.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect {

	private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

        channel.close();
        connection.close();
    }
    
    private static String getMessage(String[] strings){
        if (strings.length < 1)
            return "Hello World!";
        return joinStrings(strings, " ");
    }
    
    private static String joinStrings(String[] strings, String delimiter) {
        int length = strings.length;
        if (length == 0) return "";
        StringBuilder words = new StringBuilder(strings[0]);
        for (int i = 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
    
    private static String getSeverity(String[] strings){
        if (strings.length < 1)
                    return "info";
        return strings[0];
      }
}

 消费者

package com.duowan.rabbit.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsDirect {
	private static final String EXCHANGE_NAME = "direct_logs";

	public static void main(String[] argv) throws Exception {

		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		String queueName = channel.queueDeclare().getQueue();

		if (argv.length < 1) {
			System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
			System.exit(1);
		}

		for (String severity : argv) {
			channel.queueBind(queueName, EXCHANGE_NAME, severity);
		}

		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, consumer);

		while (true) {
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			String routingKey = delivery.getEnvelope().getRoutingKey();

			System.out.println(" [x] Received '" + routingKey + "':'" + message
					+ "'");
		}
	}

}

 

分享到:
评论

相关推荐

    5-2 RabbitMQ入门 - EMOS小程序1

    1. docker load &lt; rabbitmq.tar.gz 1. 简单模式 3. 发布/订阅模式 4. 路由模式 5. 主题模式

    RabbitMQ入门小Dome ------&amp;gt; RabbitMQDome.zip

    最近整理学习的RabbitMQ入门Dome,文件是一个普通java项目导入完成后在lib文件夹中amqp-client-5.2.0.jar,slf4j-api-1.7.25.jar添加进去即可,里面有5个dome分是 dome1 : 简单队列,dome2 :work模式,dome3 : 订阅...

    RabbitMQ快速入门及API介绍(401M)

    RabbitMQ快速入门及API介绍(401M) QQ截图 20191220230107.png?x-oss-process=style/pnp8 (42.73KB, 下载次数:227) 下载附件 2019-12-2023 :01 上传【课程介绍】:第一章 : RabbitMQ介绍:消息中间件概念、RabbitMQ...

    RabbitMQ_Project.zip

    rabbitmq的入门实例,有简单队列,轮询分发模式,公平分发模式,交换机,路由模式,topic模式

    RabbitMQSpringBootGetStart:RabbitMQ SpringBoot入门

    RabbitMQ SpringBoot入门基本网址: : 兔子MQ 安装并启动服务器 使用命令“ rabbitmq-server”启动服务器,然后使用来宾/来宾启动 或添加用户 使用Spring Boot 了解AMQP结构在/ config中queue()方法创建一个AMQP...

    RabbitMQ自学笔记

    最全面的rabbitmq课堂笔记, rabbitMQ windows 安装 入门,添加用户,host管理等。 简单队列,工作队列,路由模式,消息应答,消息持久化,topic交换器,事务机制,确认时模式,comfirm等。。

    mycloud-mq.zip

    RabbitMq入门案例,路由模式,配置好MQ运行代码即可

    rabbitmq.zip

    RabbitMQ相关的代码,包括了入门案例,发布订阅模式,路由模式还有SpringBoot整合RabbitMQ的代码

    Java基于Netty实现的高性能分布式IM即时通信系统源码+项目说明.tar

    3、用途:项目具有较高的学习借鉴价值,也适用于小白学习入门进阶。当然也可作为毕设项目、课程设计、大作业、初期项目立项演示等。 4、如果基础还行,或者热爱钻研,亦可在此项目代码基础上进行修改添加,实现其他...

    Java课设基于Spring Boot和Spring Cloud开发的加密货币爱好者流量的平台源码(含项目说明+数据库).zip

    3.项目具有丰富的拓展空间,不仅可作为入门进阶,也可直接作为毕设、课程设计、大作业、初期项目立项演示等用途。 4.当然也鼓励大家基于此进行二次开发。 5.期待你能在项目中找到乐趣和灵感,也欢迎你的分享和反馈!...

    spring-cloud-netflix-example:spring-cloud-netflix-example是微服务系统的示例

    它包含配置管理,服务发现,断路器,智能路由,分布式跟踪,应用程序监视器。 注册中心使用eureka,如果您想使用领事,则可以参考 。入门./gradlew clean build -x test./buildDockerImage.shdocker-compose up -d ...

Global site tag (gtag.js) - Google Analytics