Spring Cloud Stream消费失败后的处理策略(二):自定义错误处理逻辑

news/2025/4/22 1:19:39

应用场景

上一篇《Spring Cloud Stream消费失败后的处理策略(一):自动重试》介绍了默认就会生效的消息重试功能。对于一些因环境原因、网络抖动等不稳定因素引发的问题可以起到比较好的作用。但是对于诸如代码本身存在的逻辑错误等,无论重试多少次都不可能成功的问题,是无法修复的。对于这样的情况,前文中说了可以利用日志记录消息内容,配合告警来做补救,但是很显然,这样做非常原始,并且太过笨拙,处理复杂度过高。所以,我们需要需求更好的办法,本文将介绍针对该类问题的一种处理方法:自定义错误处理逻辑。

动手试试

准备一个会消费失败的例子,可以直接沿用前文的工程,也可以新建一个,然后创建如下代码的逻辑:

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {public static void main(String[] args) {SpringApplication.run(TestApplication.class, args);}@RestControllerstatic class TestController {@Autowiredprivate TestTopic testTopic;/*** 消息生产接口** @param message* @return*/@GetMapping("/sendMessage")public String messageWithMQ(@RequestParam String message) {testTopic.output().send(MessageBuilder.withPayload(message).build());return "ok";}}/*** 消息消费逻辑*/@Slf4j@Componentstatic class TestListener {@StreamListener(TestTopic.INPUT)public void receive(String payload) {log.info("Received payload : " + payload);throw new RuntimeException("Message consumer failed!");}}interface TestTopic {String OUTPUT = "example-topic-output";String INPUT = "example-topic-input";@Output(OUTPUT)MessageChannel output();@Input(INPUT)SubscribableChannel input();}}

内容很简单,既包含了消息的生产,也包含了消息消费。消息消费的时候主动抛出了一个异常来模拟消息的消费失败。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名)、并设置一下分组,比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1spring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后,启动应用并访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了,此时可以看到消费失败后抛出了异常,跟上一篇文章的结果一样,消息消费失败,记录了日志,消息信息丢弃。

下面,针对消息消费失败,在TestListener中针对消息消费逻辑创建一段错误处理逻辑,比如:

@Slf4j
@Component
static class TestListener {@StreamListener(TestTopic.INPUT)public void receive(String payload) {log.info("Received payload : " + payload);throw new RuntimeException("Message consumer failed!");}/*** 消息消费失败的降级处理逻辑** @param message*/@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")public void error(Message<?> message) {log.info("Message consumer failed, call fallback!");}}

通过使用@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")指定了某个通道的错误处理映射。其中,inputChannel的配置中对应关系如下:

  • test-topic:消息通道对应的目标(destination,即:spring.cloud.stream.bindings.example-topic-input.destination的配置)
  • stream-exception-handler:消息通道对应的消费组(group,即:spring.cloud.stream.bindings.example-topic-input.group的配置)

再启动应用并访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中,此时可以看到日志如下:

2018-12-11 12:00:35.500  INFO 75269 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2018-12-11 12:00:35.512  INFO 75269 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory.publisher#311db1cb:0/SimpleConnection@40370d8c [delegate=amqp://guest@127.0.0.1:5672/, localPort= 54391]
2018-12-11 12:00:35.527  INFO 75269 --- [ption-handler-1] c.d.stream.TestApplication$TestListener  : Received: hello,
2018-12-11 12:00:38.541  INFO 75269 --- [ption-handler-1] c.d.stream.TestApplication$TestListener  : Message consumer failed, call fallback!

虽然消费逻辑中输出了消息内容之后抛出了异常,但是会进入到error函数中,执行错误处理逻辑(这里只是答应了一句话),用户可以根据需要读取消息内容以及异常详情做更进一步的细化处理。

深入思考

由于error逻辑是通过编码方式来实现的,所以这段逻辑相对来说比较死。通常,只有业务上有明确的错误处理逻辑的时候,这种方法才可以比较好的被应用到。不然能做的可能也只是将消息记录下来,然后具体的分析原因后再去做补救措施。所以这种方法也不是万能的,主要适用于有明确错误处理方案的方式来使用(这种场景并不多),另外。。。

注意:有坑! 这个方案在目前版本(2.0.x)其实还有一个坑,这种方式并不能很好的处理异常消息,会有部分消息得不到正确的处理,由于应用场景也不多,所以目前不推荐使用这种方法来做(完全可以用原始的异常捕获机制来处理,只是没有这种方式那么优雅)。目前看官方issue是在Spring Cloud Stream的2.1.0版本中会修复,后续发布之后可以使用该功能,具体点击查看:Issue #1357。

而对于没有特定的错误处理方案的,也只能通过记录和后续处理来解决,可能这样的方式也只是比从日志中抓去简单那么一些,并没有得到很大的提升。但是,不要紧,因为下一篇我们将继续介绍其他更好的处理方案。

代码示例

本文示例读者可以通过查看下面仓库的中的stream-exception-handler-2项目:

  • Github
  • Gitee

如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!

以下专题教程也许您会有兴趣

  • Spring Boot基础教程
  • Spring Cloud基础教程

本文首发:http://blog.didispace.com/spr...


https://dhexx.cn/news/show-536768.html

相关文章

北航14年上机题

4年上机题 第一题&#xff0c;阶乘数。输入一个正整数&#xff0c;输出时&#xff0c;先输出这个数本身&#xff0c;跟着一个逗号&#xff0c;再输出这个数的各位数字的阶乘和&#xff0c;等号&#xff0c;阶乘和的计算结果&#xff0c;并判断阶乘和是否等于原数&#xff0c;如…

QT初级学习12--设置程序发布图标、打开指定的浏览器页面、播放动画

一、设置程序图标 超级简单。第一步找一个自己想设置为图标的.ico后缀的文件&#xff0c;并将其放在工程目录&#xff1b;第二步&#xff0c;在xx.pro文件末尾加入RC_ICONS yourImageName.ico;重新编译后即可。 二、打开指定浏览器页面 要求点击某action后&#xff0c;程序…

Kubernetes节点资源耗尽状态的处理

今天上午一到工位&#xff0c;就收到来自同事的“投诉”&#xff1a;私有云上的Kubernetes cluster中的一个node似乎不工作了&#xff0c;因为专门部署于那个节点上的应用挂掉了&#xff0c;并且长时间没有恢复。这个公司私有云上Kubernetes集群是v1.7.5版本&#xff0c;部署于…

QT :-1: error: fatal error: no input files

网上有很多其他的原因&#xff0c;我这里提供我的出错的原因。 build directory 这里我出现了中文路径&#xff0c;并且这个定位是我上个项目的位置&#xff0c;我修改为目前这个项目的位置后&#xff0c;且不存在中文路径了&#xff0c;就好了。 QT里面不能出现任何中文路径…

POJ 1650

1 #include <iostream>2 #include <cmath> //wo de 编译器里的这个abs的功能不能用啊&#xff01;3 using namespace std;4 int main()5 {6 double a;7 double per;//误差计数器&#xff1b;8 int son_num;//代表分子的枚举&#xff1b;9 …

后缀表达式求值

思路&#xff1a;判断是不是数字字符&#xff0c;如果是数字&#xff0c;入栈&#xff0c;遇到字符就将栈顶元素和次栈顶元素出栈&#xff0c;进行计算操作&#xff0c;然后将计算结果入栈&#xff0c;循环最后&#xff0c;最后剩下一个栈顶元素就是表达式的数值。 代码如下&a…

go语言编码规范

开发工具&#xff1a;GoLand官方代码规范&#xff1a;https://golang.org/doc/effective_go.html1.工具提交代码前需使用fmt工具格式化提交代码前需使用vet工具进行静态检查2.目录规范|– bin|– build|–build_dev/test/release.sh|– gen-go|– config|– dev/test/release.y…

QTreeWidget 用法整理 信号与槽设置

1. 添加根节点以及子节点 继承关系为&#xff1a;1#工厂—1#车间—1#产线—1#工位、2#工位除了一级item在定义之时&#xff0c;就确定了是在文件treeWidget里&#xff1b;其他的二级&#xff0c;三级ite等&#xff0c;通过一级item的->addchild()操作来进行继承。 void fr…

挨踢人生路

点击打开链接 推荐一篇IT人工作经历的文章&#xff0c;希望对从事IT行业的人有指导作用。 转载于:https://www.cnblogs.com/Tovi/p/6194885.html

九度oj:题目1049:字符串去特定字符

题目描述&#xff1a; 输入字符串s和字符c&#xff0c;要求去掉s中所有的c字符&#xff0c;并输出结果。 输入&#xff1a; 测试数据有多组&#xff0c;每组输入字符串s和字符c。 输出&#xff1a; 对于每组输入,输出去除c字符后的结果。 样例输入&#xff1a; heallo a 样例输…