1 添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2 配置application.yml文件
server:port: 8021
spring:#给项目来个名字application:name: rabbitmq-test#配置rabbitMq 服务器rabbitmq:host: 127.0.0.1port: 5672username: needpassword: 123456#虚拟host 可以不设置,使用server默认hostvirtual-host: /testhost
3 编写配置队列,交换机,绑定的类,这里以fanoutExchange为列,其他的交换机类似
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 1 创建三个队列 :fanout.A fanout.B fanout.C* 2 定义交换机* 3 将三个队列都绑定在交换机 fanoutExchange 上* 因为是扇型交换机, 路由键无需配置,配置也不起作用*/
@Configuration
public class FanoutRabbitConfig {// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//一般设置一下队列的持久化就好,其余两个就是默认false@Beanpublic Queue queueA() {return new Queue("fanout.A",true);}@Beanpublic Queue queueB() {return new Queue("fanout.B",true);}@Beanpublic Queue queueC() {return new Queue("fanout.C",true);}@Bean//DirectExchange Direct交换机//TopicExchange Topic交换机FanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange",true,false);}@BeanBinding bindingExchangeA() {return BindingBuilder.bind(queueA()).to(fanoutExchange());//将队列和交换机绑定, 并设置用于路由键 (Direct交换机,Topic交换机使用)//return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");}@BeanBinding bindingExchangeB() {return BindingBuilder.bind(queueB()).to(fanoutExchange());}@BeanBinding bindingExchangeC() {return BindingBuilder.bind(queueC()).to(fanoutExchange());}
}
4 发送消息
import cn.huawei.rabbitmqtest1.pojo.User;
import com.alibaba.fastjson.JSON;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
class RabbitmqTest1ApplicationTests_1 {@AutowiredRabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法@Testvoid test_1() {for (int i = 1; i <= 3; i++) {User user = new User(i + "", "小明 " + i);rabbitTemplate.convertAndSend("fanoutExchange", "", JSON.toJSONString(user));}}@Testpublic void test_2() {for (int i = 1; i <= 3; i++) {User user = new User(i + "", "张三 " + i);rabbitTemplate.convertAndSend("TestTopicExchange", "news.chongqing", JSON.toJSONString(user));}}@Testpublic void test_3() {for (int i = 1; i <= 3; i++) {User user = new User(i + "", "陈四 " + i);rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", JSON.toJSONString(user));}}}
5 接收消息
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class FanoutReceiverA {@RabbitListener(queues = "fanout.A")public void process(Message testMessage) {System.out.println("id:"+testMessage.getMessageProperties().getMessageId());System.out.println("tag:"+testMessage.getMessageProperties().getConsumerTag());System.out.println("FanoutReceiverA消费者收到消息 : " +new String(testMessage.getBody()));}
}import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class FanoutReceiverB {@RabbitListener(queues = "fanout.B")public void process(Message testMessage) {System.out.println("id:"+testMessage.getMessageProperties().getMessageId());System.out.println("tag:"+testMessage.getMessageProperties().getConsumerTag());System.out.println("FanoutReceiverB消费者收到消息 : " +new String(testMessage.getBody()));}
}