springboot整合rabbitmq 演示DirectExchange,TopicExchange,FanoutExchange交换机 发送和接收消息

news/2025/5/22 16:38:56

1 添加依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2 配置application.yml文件

server:port: 8021
spring:#给项目来个名字application:name: rabbitmq-test#配置rabbitMq 服务器rabbitmq:host: 127.0.0.1port: 5672username: needpassword: 123456#虚拟host 可以不设置,使用server默认hostvirtual-host: /testhost

3 编写配置队列,交换机,绑定的类,这里以fanoutExchange为列,其他的交换机类似

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/***  1 创建三个队列 :fanout.A   fanout.B  fanout.C*  2 定义交换机*  3 将三个队列都绑定在交换机 fanoutExchange 上*  因为是扇型交换机, 路由键无需配置,配置也不起作用*/
@Configuration
public class FanoutRabbitConfig {// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//一般设置一下队列的持久化就好,其余两个就是默认false@Beanpublic Queue queueA() {return new Queue("fanout.A",true);}@Beanpublic Queue queueB() {return new Queue("fanout.B",true);}@Beanpublic Queue queueC() {return new Queue("fanout.C",true);}@Bean//DirectExchange Direct交换机//TopicExchange Topic交换机FanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange",true,false);}@BeanBinding bindingExchangeA() {return BindingBuilder.bind(queueA()).to(fanoutExchange());//将队列和交换机绑定, 并设置用于路由键 (Direct交换机,Topic交换机使用)//return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");}@BeanBinding bindingExchangeB() {return BindingBuilder.bind(queueB()).to(fanoutExchange());}@BeanBinding bindingExchangeC() {return BindingBuilder.bind(queueC()).to(fanoutExchange());}
}

4 发送消息

import cn.huawei.rabbitmqtest1.pojo.User;
import com.alibaba.fastjson.JSON;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
class RabbitmqTest1ApplicationTests_1 {@AutowiredRabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法@Testvoid test_1() {for (int i = 1; i <= 3; i++) {User user = new User(i + "", "小明 " + i);rabbitTemplate.convertAndSend("fanoutExchange", "", JSON.toJSONString(user));}}@Testpublic void test_2() {for (int i = 1; i <= 3; i++) {User user = new User(i + "", "张三 " + i);rabbitTemplate.convertAndSend("TestTopicExchange", "news.chongqing", JSON.toJSONString(user));}}@Testpublic void test_3() {for (int i = 1; i <= 3; i++) {User user = new User(i + "", "陈四 " + i);rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", JSON.toJSONString(user));}}}

5 接收消息

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class FanoutReceiverA {@RabbitListener(queues = "fanout.A")public void process(Message testMessage) {System.out.println("id:"+testMessage.getMessageProperties().getMessageId());System.out.println("tag:"+testMessage.getMessageProperties().getConsumerTag());System.out.println("FanoutReceiverA消费者收到消息  : " +new String(testMessage.getBody()));}
}import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class FanoutReceiverB {@RabbitListener(queues = "fanout.B")public void process(Message testMessage) {System.out.println("id:"+testMessage.getMessageProperties().getMessageId());System.out.println("tag:"+testMessage.getMessageProperties().getConsumerTag());System.out.println("FanoutReceiverB消费者收到消息  : " +new String(testMessage.getBody()));}
}


 

文章来源:https://blog.csdn.net/qq_41712271/article/details/115619405
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:https://dhexx.cn/news/show-17470.html

相关文章

【struts2】自定义登录检查拦截器

在实际开发中&#xff0c;一个常见的功能要求是&#xff1a;有很多操作都需要登录后才能操作&#xff0c;如果操作的时候还没有登录&#xff0c;那么通常情况下会要求跳转回到登录页面。 1&#xff09;如何实现这样的功能呢&#xff1f; 在具体实现之前&#xff0c;先来考虑几个…

EF(EntityFramework) Migrations 迁移

1、开启程序包管理器控制台 2.安装EntityFrameworkPM> Install-Package EntityFramework3.启用迁移PM> Enable-Migrations –EnableAutomaticMigrations遇到的问题&#xff1a;4.创建迁移点PM> Add-Migration InitialCreate5.执行迁移点更新PM> Update-Database –…

springboot整合rabbitmq 消息发布的可靠性投递(生产者发送消息到mq服务器)

以下功能是在此项目上升级https://blog.csdn.net/qq_41712271/article/details/115619405?spm1001.2014.3001.5501 在使用 RabbitMQ的时候&#xff0c;作为消息发送方希望杜绝任何消息丢失或者投递失败场景。也可以用事务&#xff0c;但性能较差&#xff0c;一般不用 RabbitMQ…

qt mvc3

前面两节讲的model是一维的&#xff0c;这次开始二维的也就是我们常说的Table&#xff0c;相对与list&#xff0c;我们多了一个列的概念。下面讲解一个例子。我先说明一下我们这个例子&#xff0c;在程序目录下&#xff0c;我们有一个文本文件&#xff0c;其中存放的学生信息。…

springboot整合rabbitmq 消费者Consumer 手动进行ack确认

ack指Acknowledge&#xff0c;确认。 表示消费端收到消息后的确认方式。 有三种确认方式&#xff1a; 自动确认&#xff1a;acknowledge"none" 手动确认&#xff1a;acknowledge"manual" 根据异常情况确认&#xff1a;acknowledge"auto"&#xf…

LRU缓存实现(Java)

LRU Cache的LinkedHashMap实现LRU Cache的链表HashMap实现LinkedHashMap的FIFO实现调用示例LRU是Least Recently Used 的缩写&#xff0c;翻译过来就是“最近最少使用”&#xff0c;LRU缓存就是使用这种原理实现&#xff0c;简单的说就是缓存一定量的数据&#xff0c;当超过设定…

hdu 5206 Four Inages Strategy

题目大意&#xff1a; 判断空间上4个点是否形成一个正方形 分析&#xff1a; 标称思想 &#xff1a; 在p2,p3,p4中枚举两个点作为p1的邻点&#xff0c;不妨设为pi&#xff0c;pj&#xff0c;然后判断p1pi与p1pj是否相等、互相垂直&#xff0c;然后由向量法&#xff0c;最后一个…

springboot整合rabbitmq 消费者Consumer限流

场景&#xff1a;请求瞬间增多&#xff0c;每秒5000个请求&#xff0c;防止A系统挂掉 注意&#xff1a;一定要开启手动ack确认 1 application.yml配置文件 server:port: 8021 spring:#给项目来个名字application:name: rabbitmq-test#配置rabbitMq 服务器rabbitmq:host: 127.0…

EncoPrint

http://blog.sina.com.cn/s/blog_7985987f0101955a.html转载于:https://www.cnblogs.com/elitiwin/p/4441975.html

【博弈】【清华冬令营2018模拟】取石子

写完敢说全网没有这么详细的题解了。 注意&#xff1a;题解长是为了方便理解&#xff0c;所以读起来速度应该很快。 题目描述 有 nnn 堆石子&#xff0c;第 iii 堆有 xix_ixi​ 个。 AliceAliceAlice 和 BobBobBob 轮流去石子&#xff08;先后手未定&#xff09;&#xff0c; …