云计算、AI、云原生、大数据等一站式技术学习平台

网站首页 > 教程文章 正文

基于阿里RocketMq的SpringCloud Stream进阶实战二

jxf315 2025-03-12 16:09:11 教程文章 30 ℃

欢迎关注头条号:老顾聊技术

精品原创技术分享,知识的组装工

目录

  1. 前言
  2. 自定义Binding声明接口
  3. 发布与消费
  4. 自定义MessageConverter
  5. 异常处理
  6. 重试机制
  7. 定制消费者线程数
  8. 边接收边发送
  9. RocketMq的Tag特性
  10. RocketMq广播方式
  11. 总结

前言

之前老顾介绍了基本的实战知识点,今天老顾介绍一下深入的知识点。

自定义Binding声明接口

除了使用Spring Cloud默认提供的Sink、Source和Processor接口外,用户还可以根据需要新建自己的Binding接口。下面的代码中就自定义了一个声明Binding的接口,其中声明了两个Binding名为input1的INPUT类型的Binding名为output1的OUTPUT类型的Binding

定义@Input和@Ouput时如果没有自定Binding的名称,默认获取当前方法的名称作为Binding的名称。

应用的时候跟其它Binding接口一样通过@EnableBinding进行声明,其它的配置方式等都是一样的。

发布与消费

Spring Cloud Stream默认在接收和发送消息时对应的消息格式类型都是JSON格式,可以通过Binding的contentType属性进行指定。

spring.cloud.stream.bindings.output.content-type=application/json

当发送和接收消息时都会被
org.springframework.messaging.converter.MessageConverter进行转换

现假设我们有如下这样一个User类型,我们期望发送的消息就是发送的User类型。

那我们的发送消息代码可以类似于如下这样写,下面的代码中往名为output1这个Binding中发送的消息就是User类型的。

如果你的消费者端的代码还像以前一样写,接收的是String类型的,你会发现接收到的字符串是JSON格式的,因为发送端默认会把对象转换为JSON格式进行发送

这里也可以直接以User类型进行接收,此时Spring Cloud将自动将接收到的JSON字符串转换为消费者方法的入参对象,比如下面这样。

上面应用的名为output1和input1的Binding的配置如下。

自定义MessageConverter

Spring Cloud Stream在进行对象和JSON转换时默认使用的是
org.springframework.messaging.converter.MappingJackson2MessageConverter

有时候我们也可以实现自己的MessageConverter,在实现自定义的MessageConverter时通常不直接实现MessageConverter接口而是继承
org.springframework.messaging.converter.AbstractMessageConverter
,然后重写其supports(..)、convertFromInternal(..)和convertToInternal

比如下面的代码中实现了一个只能转换User对象的MessageConverter底层使用的是FastJson,在进行发送消息时重置了user的name属性,加上了t-前缀。

然后为了使它生效,我们需要把它定义为一个bean,并标注@StreamMessageConverter,比如下面这样。

如果在转换为JSON时不希望使用默认的Jackson实现,而希望使用Alibaba的FastJson也是可以的

FastJson已经提供了MessageConverter的实现类
com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter。

所以如果希望使用FastJson的实现,只需要进行类似如下这样的定义。

异常处理

在接收消息时,如果消息处理失败,Spring Cloud会把失败的消息转到名为..errors的Channel,并可通过@ServiceActivator方法进行接收。比如有如下Binding定义。

当消息消费失败时将转发包装了失败原因的消息到名为
test-topic1.test-group1.errors的Channel
,我们可以通过在某个bean中定义一个@ServiceActivator方法处理相应的异常。

上面介绍的方法是处理某个特定Binding的消息消费异常的,如果你的消息消费异常的处理方式都是一样的,你可能希望有一个统一的入口来处理所有的消息消费异常,而不用管当前的Binding是什么。

Spring Cloud Stream也考虑到了这一点,它提供了一个名为errorChannel的Binding,所有的消息消费异常都会转发到该Binding,所以如果我们想有一个统一的处理所有的消息消费异常的入口则可以定义一个Binding名为errorChannel的@StreamListener方法。

重试机制

Spring Cloud Stream在进行消息的接收处理时也是利用Spring Retry进行了包装的。当消息消费失败时默认会最多试3次(加上第一次),使用的是Spring Retry的RetryTemplate的默认配置。

如果默认的重试逻辑不能满足你的需求,你也可以定义自己的RetryTemplate,但是需要使用@StreamRetryTemplate进行标注(StreamRetryTemplate上标注了@Bean)。

上面的代码中就应用了自定义的RetryTemplate,指定最多尝试5次的消息消费尝试5次后仍然失败将走前面介绍的异常处理逻辑,即投递消息到相应的异常处理的Channel。

也可以通过配置的方式,最多尝试次数通过Binding的consumer.maxAttempts参数进行指定,如果需要指定名为input1的Binding在消费某条消息时最多允许尝试5次,则可以进行如下定义。如果将该属性值定义为1,则表示不允许进行重试。

定制消费者线程数

默认情况下,每个Binding对应的消费者线程数是1,可以通过
spring.cloud.stream.bindings..consumer.concurrency属性进行指定,比如下面的配置就指定了名称为input1的Binding的
消费者线程是3即Spring Cloud Stream将同时启动3个线程用于从名为input1的Binding进行消费

边接收边发送

所谓的边接收边发送是指接收到消息经过处理后可以产生新的消息,然后允许通过配置指定新的消息的发送目的地。比如下面的代码就定义了从名为input的Binding接收消息,经过处理后再返回,然后经过方法上的@SendTo指定返回的内容将发送到名为output的Binding

RocketMQ的Tag特性

RocketMQ建议我们一个应用就使用一个Topic不同的消息类型通过Tag来区分。我们发送的消息都在header里面加入了消息对应的Tag

如果我们的某个Binding只希望接收某些Tag的消息,则可以通过
spring.cloud.stream.rocketmq.bindings..consumer.tags属性
来指定期望接收的Tag,多个Tag之间通过双竖线分隔

spring.cloud.stream.rocketmq.bindings.input1.consumer.tags=tag0||tag1

上面配置就指定了名为input1的Binding期望接收的消息的Tag是tag0或tag1

指定RocketMQ特性配置的属性前缀是
spring.cloud.stream.rocketmq,如果是Binder的配置则后面可以接binder,如果是Binding的配置则后面接binding。

RocketMq广播方式

RocketMQ的消息消费有两种方式,CLUSTERING(集群)和BROADCASTING(广播)默认是CLUSTERINIG

CLUSTERING的意思是同一消费组的多个消费者共享同一消息队列,彼此分担压力

比如消息队列中有100条消息,当同时有3个相同消费者组的消费者按照CLUSTERING方式进行消息消费时,它们总的消息的消费数量是100,但是分摊到每个消费者的数量可能是40、30、30

BROADCASTING的意思是广播即可以理解为每个消费者都有唯一的消息队列与之对应。当消息队列中有100条消息时,如果有相同消费者组的3个消费者时,每个消费者都将完整的消费这100条消息

通过
spring.cloud.stream.rocketmq.bindings..consumer.broadcasting=true指定该消费者将通过广播的方式进行消费。

总结

关于RocketMq的特性还有顺序消息、死信队列以及4.x发布的事务消息;老顾之前的文章已经介绍了RocketMq事务;小伙伴们可以自行查看,有机会再给大家介绍顺序消息等。谢谢!!!

---End---

最近老顾上传了微服务网关的分享课程,请大家多多支持

推荐阅读

1基于RocketMq的SpringCloud Stream框架实战入门

2、如何搭建消息中间件应用框架之SpringCloud Stream

3面试必备:网关异常了怎么办?如何做全局异常处理?

4Gateway网关系列(二):SpringCloud Gateway入门实战,路由规则

5Gateway网关系列开篇:SpringCloud的官方网关Gateway介绍

6API网关在微服务架构中的应用,这一篇就够了

7学习Lambda表达式看这篇就够了,不会让你失望的哦(续篇)

8Lambda用在哪里?几种场景?

9、为什么会出现Lambda表达式,你知道吗?

10、不说“分布式事务”理论,直接上大厂阿里的解决方案,绝对实用

11、女程序员问到这个问题,让我思考了半天,Mysql的“三高”架构

12、大厂二面:CAP原则为什么只能满足其中两项?而不能同时满足

13、阿里P7二面:聊聊零拷贝的原理

14、秒杀系统的核心点都在这里,快来取

15、你了解如何利用token方式实现分布式Session吗?

16、Mysql索引结构演变,为什么最终会是那个结构呢?让你一看就懂

17、一场比赛涉及到的知识,用通俗易通的方式介绍并发协调

18、企业实战Redis全方面思考,你思考了吗?

19、面试题:Thread的start和run的区别

20、面试题:什么是CAS?CAS的作用以及缺点

21、如何访问redis中的海量数据?避免事故产生

22、如何解决Redis热点问题?以及如何发现热点?

23、如何设计API接口,实现统一格式返回?

24、你真的知道在生产环境下如何部署tomcat吗?

25、分享一线互联网大厂分布式唯一ID设计 之 snowflake方案

26、分享大厂分布式唯一ID设计方案,快来围观

27、你想了解一线大厂的分布式唯一ID生成方案吗?

28、你知道如何处理大数据量吗?(数据拆分篇)

29、如何永不迁移数据和避免热点? 根据服务器指标分配数据量(揭秘篇)

30、你知道怎么分库分表吗?如何做到永不迁移数据和避免热点吗?

31、你了解大型网站的页面静态化吗?

32、你知道如何更新缓存吗?如何保证缓存和数据库双写一致性?

33、你知道怎么解决DB读写分离,导致数据不一致问题吗?

34、DB读写分离情况下,如何解决缓存和数据库不一致性问题?

35、你真的知道怎么使用缓存吗?

36、如何利用锁,防止缓存击穿?重构思想的重要性

37、海量订单产生的业务高峰期,如何避免消息的重复消费?

38、你知道如何保障生产端100%消息投递成功吗?

39、微服务下的分布式session该如何管理?

40、阿里二面:filter、interceptor、aspect应如何选择?很多人中招

41、互联网架构重要组员CDN,很多高级开发都没有实操过,来看这里

42、阿里二面:CDN缓存控制原理,看看能不能难住你

Tags:

最近发表
标签列表