Spring Cloud Stream 结合rocketmq

news2025/1/5 9:12:05

Spring Cloud Stream 结合rocketmq

官方网址:https://github.com/alibaba/spring-cloud-alibaba/wiki/RocketMQ

你可以在这个地址上下载到相关示例项目,配置项等相关信息

spring-cloud-stream 文档(这个地址似乎只有集合kafaka和rabbit的示例): https://docs.spring.io/spring-cloud-stream/docs/3.2.6/reference/html/

spring-cloud-stream-rocketmq文档 :https://spring-cloud-alibaba-group.github.io/github-pages/2021/en-us/index.html

无语的文档,太简陋了

介绍

stream 介绍

Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration 与 Broker 进行连接。

Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。

Spring Cloud Stream 内部有两个概念:Binder 和 Binding。

  • Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。

比如 Kafka 的实现 KafkaMessageChannelBinderRabbitMQ 的实现 RabbitMessageChannelBinder 以及 RocketMQ 的实现 RocketMQMessageChannelBinder

  • Binding: 包括 Input Binding 和 Output Binding。

Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。

下图是 Spring Cloud Stream 的架构设计。

大白话:直接理解成和日志门面一个slfj4一个概念即可,这是一个消息的门面,然我们可以替换消息的底层实现,而不用修改代码

rocket binder介绍

这是Spring Cloud Stream Rocket MQ Binder的实现架构:

Rocket MQ Binder的实现依赖于Rocket MQ-Spring框架。RocketMQ Spring框架是Rocket MQ和Spring Boot的集成。它提供了三个主要功能:

@RocketMQTemplate:发送消息,包括同步、异步和事务消息。

@RocketMQTransactionListener:侦听并检查事务消息。

@RocketMQMessageListener:使用消息。

RocketMQMessageChannelBinder是Binder的标准实现,它将在内部构建RocketMQInboundChannelAdapterr和RocketMQMessageHandler。

RocketMQMessageHandler将基于binding配置构造RocketMQTemplate。RocketMQTemplate将转换org.springframework.message(spring消息模块的消息消息类)转换为Rocket MQ消息类org.apache.rocketmq。common.message,然后发送出去。

RocketMQInboundChannelAdapter还将基于binding配置构造RocketMQListenerBindingContainer,RocketMQListenerBindingContainer将启动Rocket q Consumer以接收消息。

前置了解

官方示例包包含了的部分示例,你需要了解相关概念,部分我也是懵逼,大致了解就行,先把示例跑起来

ApplicationRunner

https://blog.csdn.net/weixin_41667076/article/details/121701303

一个接口,其实现类对应的bean被spring管理后,会在项目启动时执行

@Component  //此类一定要交给spring管理
@Order(value=2) //其次执行
public class ConsumerRunnerB implements ApplicationRunner{
	@Override
	public void run(ApplicationArgumers args) throws Exception{
		//代码
		System.out.println("需要在springBoot项目启动时执行的代码2---");
	}
}

函数式接口

https://www.cainiaojc.com/java/java8-functional-interfaces.html

一个函数式接口定义是在接口上加上注解@FunctionalInterface,java自行实现的函数式接口位于java.util.function,函数式接口可以隐式的转换成lmabda 表达式

一般会有一个唯一的未实现方法,我们一般使用lmabda来构建对应函数式接口的实现类

Predicate示例

Predicate入参是泛型指定,可以接收任意类型的参数,返回类型是boolean

package cn.sry1201.recketmq.config;

import java.util.Arrays;
import java.util.List;
import java.util.function.Predicate;
 
public class Java8Tester {
   public static void main(String args[]){
      List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
        
//       Predicate<Integer> predicate = new Predicate<Integer>() {
//          @Override
//          public boolean test(Integer integer) {
//             return false;
//          }
//       };
//       Predicate<Integer> predicate = n -> true
      // n 是一个参数传递到 Predicate 接口的 test 方法
      // n 如果存在则 test 方法返回 true
        
      System.out.println("输出所有数据:");
        
      // 传递参数 n  这里其实就可以理解为定义这个函数的实现类,效果和上方注释代码等同
      eval(list, n->true);
        
      // Predicate<Integer> predicate1 = n -> n%2 == 0
      // n 是一个参数传递到 Predicate 接口的 test 方法
      // 如果 n%2 为 0 test 方法返回 true
        
      System.out.println("输出所有偶数:");
      eval(list, n-> n%2 == 0 );
        
      // Predicate<Integer> predicate2 = n -> n > 3
      // n 是一个参数传递到 Predicate 接口的 test 方法
      // 如果 n 大于 3 test 方法返回 true
        
      System.out.println("输出大于 3 的所有数字:");
      eval(list, n-> n > 3 );
   }
    
   public static void eval(List<Integer> list, Predicate<Integer> predicate) {
      for(Integer n: list) {
        // 调用函数实现的方法
         if(predicate.test(n)) {
            System.out.println(n + " ");
         }
      }
   }
}
Function示例

包含了函数式接口funtion的默认方法的使用

public class FunctionExample2 {
    public static void main(String[] args) {
        // 定义两个函数
        Function<Integer,Integer> function1 = t -> (t - 5);
        Function<Integer,Integer> function2 = t -> (t * 2);
        //Using andThen() method 组合成一个符合函数,先执行第一个函数,再执行第二个函数,得到90
        int a = function1.andThen(function2).apply(50);
        System.out.println(a);
        //Using compose function compose构建一个复合函数,先执行函数2,再执行函数一
        int c = function1.compose(function2).apply(50);
        System.out.println(c);
    }
}
stream示例使用到的

Consumer void accept(T t) 有入参,无返回值 消费型接口
Supplier T get() 无入参,有返回值 供给型接口
Function<T, R> R apply(T t) T类型入参,R类型出参,T和R可以相同 函数型接口

reactor.core.publisher.Flux

待定

反应式编程相关: https://zhuanlan.zhihu.com/p/356997738

https://zhuanlan.zhihu.com/p/95966853

spring-cloud-function

官方文档 :https://spring.io/projects/spring-cloud-function#learn

Spring Cloud Function是一个具有以下高级目标的项目:通过功能促进业务逻辑的实现。将业务逻辑的开发生命周期与任何特定的运行时目标分离,以便相同的代码可以作为web端点、流处理器或任务运行。支持跨无服务器提供商的统一编程模型,以及独立运行(本地或在PaaS中)的能力。在无服务器提供者上启用Spring Boot功能(自动配置、依赖注入、度量)。它抽象了所有的传输细节和基础设施,允许开发人员保留所有熟悉的工具和流程,并牢牢地关注业务逻辑。

需要引入依赖spring-cloud-function-context

Spring Cloud Stream - functional and reactive

基础示例

引入依赖

这里引入了springcloud alibaba 管理的版本,以下两个依赖引入一个即可

        <!-- springcloud alibaba 管理的版本-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>

        <!--需要单独引入-->
<!--        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
        </dependency>-->

需要说明的是这个也不是最新的客户端,可以考虑排除再引入

消息发布者

消息发布者配置

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: k8s-master:9876 # rcoketmq 命名服务地址
        # RocketMQ Consumer 相关的配置。
        bindings:
          # 通道名称,和spring.cloud.stream.bindings 下的通道名称对应
          producer-out-0:
            producer:
              enable: true # 是否启用producer
              group: output_1 # producer分组
      bindings:
        producer-out-0:
          destination: num # 对于rocketmq,此处定义的是消息的topic

消费者发布消息代码

@RestController
@Slf4j
public class MqController {

    public static final String TOPIC = "TopicTest";

    public static final String TAG = "TagA";
    public static final String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",
            "TagE" };

    @Autowired
    private StreamBridge streamBridge;


    // http://127.0.0.1:8013/rocketmq-application/send/msg?msg=abc
    @RequestMapping("/send/msg")
    public String hello(@RequestParam(name = "msg", defaultValue = "hello world") String msg) {
        for (int i = 0; i < 100; i++) {
            String key = "KEY" + i;
            Map<String, Object> headers = new HashMap<>();
            headers.put(MessageConst.PROPERTY_KEYS, key);
            headers.put(MessageConst.PROPERTY_TAGS, tags[i % tags.length]);
            headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
            Message<SimpleMsg> message = new GenericMessage(
                    new SimpleMsg(msg + " : " + i), headers);
            streamBridge.send("producer-out-0", message);
        }

        return "消息发送成功";
    }

}

成功截图

function消息发布者示例

配置文件

通道名称和函数名称涉及到约定配置

官方文档:https://docs.spring.io/spring-cloud-stream/docs/3.2.6/reference/html/spring-cloud-stream.html#_functional_binding_names

input - <functionName> + -in- + <index>

output - <functionName> + -out- + <index>

你可以指定函数绑定的通道名称

--spring.cloud.stream.function.bindings.uppercase-in-0=input
--spring.cloud.stream.bindings.input.destination=my-topic
spring:
  cloud:
    stream:
      function:
        definition: producer
      # RocketMQ Consumer 相关的配置。
      rocketmq:
        binder:
          name-server: k8s-master:9876 # rcoketmq 命名服务地址
          group: output_common # 多个通道生产消息,生产者组在此处配置,否则多个通道生产者组不一致报错spring.cloud.stream.rocketmq.bindings.《channelname》.producer.group中配置的无效,
      bindings:
        producer-out-0:
          destination: num

代码

大致是spring cloud stream 根据配置中的名称拿到了容器中的函数实例,调用Supplier实例的get方法,会被循环调用,每调用一次,发送一次消息,消息的主题

@Configuration
@Slf4j
public class RocketMQComprehensive {


	@Bean
	public Supplier<Flux<User>> producer() {

		return () -> Flux.interval(Duration.ofSeconds(2)).map(id -> {
			User user = new User();
			user.setId(id.toString());
			user.setName("freeman");
			user.setMeta(new StringObjectMapBuilder()
					.put("hobbies", Arrays.asList("movies", "songs")).put("age", 21)
					.get());
			return user;
		}).log();
	}

}

function消息消费者示例

@StreamListener @Input 等注解按照显示已经被弃用,而且使用起来报错,所以消费者示例就这一个

配置文件

spring:
  cloud:
    stream:
      function:
        definition: consumer
      rocketmq:
        binder:
          name-server: k8s-master:9876 # rcoketmq 命名服务地址
          group: output_common # 要求相同角色的消费者拥有完全相同的订阅和消费者组,以正确地实现负载平衡。它必须是全球独一无二的。生产者组在概念上聚合完全相同角色的所有生产者实例,这在涉及事务消息时尤其重要。对于非事务性消息,只要每个进程都是惟一的就没关系。进一步讨论请参见这里。
        # RocketMQ Consumer 相关的配置。
        bindings:
          # 通道名称,和spring.cloud.stream.bindings 下的通道名称对应
          consumer-in-0:
            enable: true # 是否启用consumer,默认true
      bindings:
        consumer-in-0:
          destination: num
          group: consumer_group




代码

@Configuration
@Slf4j
public class RocketMQComprehensive {

	@Bean
	public Consumer<User> consumer() {
		return num -> {
			log.info("接收到消息:"+ num.toString());
		};
	}

}

官方普通消息示例

配置

spring:
  cloud:
    stream:
      function:
        definition: producer;consumer;processor
      rocketmq:
        binder:
          name-server: k8s-master:9876 # rcoketmq 命名服务地址
          group: output_common # 要求相同角色的消费者拥有完全相同的订阅和消费者组,以正确地实现负载平衡。它必须是全球独一无二的。生产者组在概念上聚合完全相同角色的所有生产者实例,这在涉及事务消息时尤其重要。对于非事务性消息,只要每个进程都是惟一的就没关系。进一步讨论请参见这里。
        # RocketMQ Consumer 相关的配置。
        bindings:
          # 通道名称,和spring.cloud.stream.bindings 下的通道名称对应
          producer_out_0:
            producer:
              group: output_1 # producer分组 ,建议配置spring.cloud.stream.rocketmq.binder.group(优先更高)
          processor-out-0:
            producer:
              group: output_2
      bindings:
        producer-out-0:
          destination: num
        processor-out-0:
          destination: square
        processor-in-0:
          destination: num
          group: processor_group
        consumer-in-0:
          destination: square
          group: consumer_group

代码

@Configuration
@Slf4j
public class RocketMQComprehensive {


	@Bean
	public Supplier<Flux<User>> producer() {

		return () -> Flux.interval(Duration.ofSeconds(2)).map(id -> {
			User user = new User();
			user.setId(id.toString());
			user.setName("freeman");
			user.setMeta(new StringObjectMapBuilder()
					.put("hobbies", Arrays.asList("movies", "songs")).put("age", 21)
					.get());
			return user;
		}).log();
	}

	@Bean
	public Function<Flux<User>, Flux<User>> processor() {

		return flux -> flux.map(user -> {
			log.info("用户信息:{}" ,user.toString());
			user.setId(String.valueOf(
					Long.parseLong(user.getId()) * Long.parseLong(user.getId())));
			return user;
		});
	}

	@Bean
	public Consumer<User> consumer() {
		return num -> {
			log.info("接收到消息:"+ num.toString());
		};
	}

}

流程说明

1、定义了三个函数,在配置文件中函数producer和通道producer-out-0进行绑定,发送消息到num这个主题上,

2、rocessor和processor-in-0还有processor-out-0 绑定,接收来自num的消息,又再次发布到square这个主题上

3、 consumer和 consumer-in-0绑定,接收square的消息

顺序消息和过滤消息示例

生产者

配置

spring:
  cloud:
    stream:
      function:
        definition: producer
      # RocketMQ Consumer 相关的配置。
      rocketmq:
        binder:
          name-server: k8s-master:9876 # rcoketmq 命名服务地址
          group: output_common
        bindings:
          producer-out-0:
            producer:
              group: output_1
              messageQueueSelector: orderlyMessageQueueSelector # MessageQueue选择器
      bindings:
        producer-out-0:
          destination: orderly

代码

选择器代码

@Component
@Slf4j
public class OrderlyMessageQueueSelector implements MessageQueueSelector {


	@Override
	public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
		Integer id = (Integer) ((MessageHeaders) arg)
				.get(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID);
//mqs.size()是这个topic总共的messagequeue的个数,我这里是8,
//id % OrderlyExample.tags.length 获取1 2 3 4 5
// 那么 1 6 11 16 21 。。。 应该是在同一个messageQueue上,并且tag都是tagA        
		int index = id % OrderlyExample.tags.length % mqs.size();
		return mqs.get(index);
	}

}

消息发送代码

@Configuration
public class OrderlyExample {
    public static final String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",
            "TagE" };
    @Autowired
    private StreamBridge streamBridge;

    @Bean
    public ApplicationRunner producer() {
        return args -> {
            for (int i = 0; i < 100; i++) {
                String key = "KEY" + i;
                Map<String, Object> headers = new HashMap<>();
                headers.put(MessageConst.PROPERTY_KEYS, key);
                headers.put(MessageConst.PROPERTY_TAGS, tags[i % tags.length]);
                headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
                Message<SimpleMsg> msg = new GenericMessage(
                        new SimpleMsg("Hello RocketMQ " + i), headers);
                streamBridge.send("producer-out-0", msg);
            }
        };
    }


}

消费者

配置

spring:
  cloud:
    stream:
      function:
        definition: consumer
      rocketmq:
        binder:
          name-server: k8s-master:9876 # rcoketmq 命名服务地址
          group: output_common # 要求相同角色的消费者拥有完全相同的订阅和消费者组,以正确地实现负载平衡。它必须是全球独一无二的。生产者组在概念上聚合完全相同角色的所有生产者实例,这在涉及事务消息时尤其重要。对于非事务性消息,只要每个进程都是惟一的就没关系。进一步讨论请参见这里。
        # RocketMQ Consumer 相关的配置。
        bindings:
          consumer-in-0:
            consumer:
              # tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } .
              subscription: 'TagA || TagC || TagD'
              push:
                orderly: true
      bindings:
        consumer-in-0:
          destination: orderly
          group: orderly-consumer

sql的过滤延时,仅做记录,非测试配置

# 对应的需要发消息时在请求头里添加相关配置
#           consumer-in-0:
#             consumer:
#             tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } .
#               subscription: sql:(color in ('red1', 'red2', 'red4') and price>3)

代码

@Configuration
@Slf4j
public class OrderlyExample {

	// 主要是观察是否接收到过滤条件之外的数据,
    // 然后接收到的统一tags的数据是否从小到达有序排列
    @Bean
    public Consumer<Message<SimpleMsg>> consumer() {
        return msg -> {
            String tagHeaderKey = RocketMQMessageConverterSupport
                    .toRocketHeaderKey(MessageConst.PROPERTY_TAGS).toString();
            log.info(Thread.currentThread().getName() + " Receive New Messages: "
                    + msg.getPayload().getMsg() + " TAG:"
                    + msg.getHeaders().get(tagHeaderKey).toString());
            try {
                Thread.sleep(100);
            }
            catch (InterruptedException ignored) {
            }
        };
    }
}

说明

似乎没啥好说的,看到上面的配置和注释吧

事务消息

事务消息是消息发送者端配置,用于保证发送消息和本地事务的原子性

      rocketmq:
        binder:
          name-server: localhost:9876
        bindings:
          producer-out-0:
            producer:
              group: output_1
              transactionListener: myTransactionListener  # 指定监听器
              # 设置消息为事务消息,原始的api使用专门的事务消息生产者TransactionMQProducer
              producerType: Trans 
@Component("myTransactionListener")
public class TransactionListenerImpl implements TransactionListener {

	/**
	 * 执行本地事务,由当前发送消息的线程执行
	 */
	@Override
	public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
		Object num = msg.getProperty("test");

		if ("1".equals(num)) {
			System.out.println("executer: " + new String(msg.getBody()) + " unknown");
			return LocalTransactionState.UNKNOW;
		}
		else if ("2".equals(num)) {
			System.out.println("executer: " + new String(msg.getBody()) + " rollback");
			return LocalTransactionState.ROLLBACK_MESSAGE;
		}
		System.out.println("executer: " + new String(msg.getBody()) + " commit");
		return LocalTransactionState.COMMIT_MESSAGE;
	}

	/**
	 * broker确定本地事务是否成功的回调接口,这个是单独配置的线程池中的线程执行
	 */
	@Override
	public LocalTransactionState checkLocalTransaction(MessageExt msg) {
		System.out.println("check: " + new String(msg.getBody()));
		return LocalTransactionState.COMMIT_MESSAGE;
	}

}

	@Bean
	public ApplicationRunner producer() {
		return args -> {
			for (int i = 1; i <= 4; i++) {
				MessageBuilder builder = MessageBuilder
						.withPayload(new SimpleMsg("Hello Tx msg " + i));
				builder.setHeader("test", String.valueOf(i)).setHeader(
						MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);
                // 不太确定这行代码是否有作用,以及改怎么配置
				builder.setHeader(RocketMQConst.USER_TRANSACTIONAL_ARGS, "binder");
				Message<SimpleMsg> msg = builder.build();
				streamBridge.send("producer-out-0", msg);
				System.out.println("send Msg:" + msg.toString());
			}
		};
	}

延时消息

	@Bean
	public ApplicationRunner producerDelay() {
		return args -> {
			for (int i = 0; i < 100; i++) {
				String key = "KEY" + i;
				Map<String, Object> headers = new HashMap<>();
				headers.put(MessageConst.PROPERTY_KEYS, key);
				headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
				// 主要是这一行设置延时级别
				headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 2);
				headers.put("a", "userproperties");
				Message<SimpleMsg> msg = new GenericMessage(
						new SimpleMsg("Delay RocketMQ " + i), headers);
				streamBridge.send("producer-out-0", msg);
			}
		};
	}

rocketmq5.0支持设置到具体是时刻, 其直接代码是这样的

message.setDeliverTimeMs(System.currentTimeMillis() + 10_000L);

可以尝试排除rocketmq4.9.4的依赖,然后引入5.0的依赖,大概率可能不行,有可能会是这个配置

MessageConst.PROPERTY_CONSUME_START_TIMESTAMP

广播消息

默认是集群模式,而不是广播模式,这个实在消费者端设置的

      rocketmq:
        binder:
          name-server: localhost:9876
        bindings:
          consumer-in-0:
            consumer:
              messageModel: BROADCASTING

消息重试

消费端配置

上一级配置是rocketmq

          consumer-in-0:
            consumer:
              ## According to the configured number of `max-reconsume-times`,
              ## the server will re-push the message according to whether the client's consumption is successful or not
              push:
                max-reconsume-times: 3
	@Bean
	public Consumer<Message<SimpleMsg>> consumer() {
		return msg -> {
			throw new RuntimeException("mock exception.");
		};
	}

比较全的配置文件

配置来自官网说明的配置项,地址前面已经提供,使用这个配置你可能启动不了应用,仅做参考

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: k8s-master:9876 # rcoketmq 命名服务地址
          enable-msg-trace: true # 是否为 Producer 和 Consumer 开启消息轨迹功能 默认true,感觉还需要结合服务端配置使用
          access-key: #阿里云账号 AccessKey。
          secret-key: #阿里云账号 SecretKey。消息轨迹开启后存储的 topic 名称。
          customized-trace-topic: # 消息轨迹开启后存储的 topic 名称。 默认RMQ_SYS_TRACE_TOPIC
        # RocketMQ Consumer 相关的配置。
        bindings:
          # 通道名称,和spring.cloud.stream.bindings 下的通道名称对应
          producer-out-0:
            producer:
              enable: true # 是否启用producer
              group: output_1 # producer分组
              maxMessageSize: 8249344 #消息发送的最大字节数。默认8249344
              transactional: false # 是否发送事务消息 默认false
              sync: false # 是否使用同步方式发送消息,就是发送完需要等发送的结果
              vipChannelEnabled: true # 默认true 是否走vip通道发送消息,也就是broker fastRemotingServer端口发送消息
              compressMessageBodyThreshold: 4096 # 消息体压缩阈值,默认超过4k会压缩
              retryTimesWhenSendFailed: 2 # 在同步发送消息的模式下,消息发送失败的重试次数。默认2
              retryTimesWhenSendAsyncFailed: 2 # 在异步发送消息的模式下,消息发送失败的重试次数。默认2
              retryNextServer: false # 消息发送失败的情况下是否重试其它的 broker。
            consumer:
              enable: true # 是否启用consumer
              tags: tagA||tagB # Consumer 基于 TAGS 订阅,多个 tag 以 || 分割。
              sql: "TAGS is not null and TAGS in ('TagA', 'TagB')" # 基于sql过来消息
              broadcasting: false # Consumer 是否是广播消费模式。如果想让所有的订阅者都能接收到消息,可以使用广播模式。默认值false
              orderly: false # 是否有序消费消息,需要客户端发送消息到同一个message queue
              delayLevelWhenNextConsume: 0 #异步消费消息模式下消费失败重试策略 -1,不重复,直接放入死信队列 0,broker 控制重试策略 >0,client 控制重试策略 默认值: 0.
              suspendCurrentQueueTimeMillis: 3000 # 顺序消息消费失败后,再次消费的时间间隔
          processor-out-0:
            producer:
              group: output_2
      bindings:
        producer-out-0:
          destination: num # 对于rocketmq,此处定义的是消息的topic
        processor-out-0:
          destination: square
        processor-in-0:
          destination: num
          group: processor_group
        consumer-in-0:
          destination: square
          group: consumer_group

org.springframework.cloud.stream.config.BindingProperties

被弃用的注解

@EnableBinding
@StreamListener # 按照示例使用报错,找不到对应的bean
@Input # 同样失败

报错

1、Dispatcher has no subscribers for channel

Dispatcher has no subscribers for channel 'rocketmq-application.processor-in-0'.; 

---- Dispatcher has no subscribers,

将function的配置定义在bindings前面,或者手动指定绑定关系(不确定)

      function:
        definition: producer;consumer;processor

就最终的解决来看,你可以往前翻看一下报错,我这里是由于消息接收后处理异常,大致是id接收了一个string类型的,所以异常了,然后导致后续问题,比如可能因为异常,所以消息订阅者就直接关闭了,详细的话可能还需要了解源码或原理

2、Property ‘group’ is required - producerGroup

多个通道生产者组的名称统一配置

Exception thrown while building outbound endpoint
Property 'group' is required - producerGroup

就我当前这个版本而言,如果定义了多个生产者,那么生产者组需要统一定义,但是不影响消费者组的定义

      rocketmq:
        binder:
          name-server: k8s-master:9876 # rcoketmq 命名服务地址
          group: output_common # 要求相同角色的消费者拥有完全相同的订阅和消费者组,以正确地实现负载平衡。它必须是全球独一无二的。生产者组在概念上聚合完全相同角色的所有生产者实例,这在涉及事务消息时尤其重要。对于非事务性消息,只要每个进程都是惟一的就没关系。进一步讨论请参见这里。
        # RocketMQ Consumer 相关的配置。

n.RemotingTooMuchRequestException: sendDefaultImpl call timeout

其他

我看下载的示例项目中有这样的配置

        bindings:
          consumer-in-0:
            consumer:
              messageModel: BROADCASTING

可能是对应rokcetMq 消费者类org.apache.rocketmq.client.consumer.DefaultMQPushConsumer的一个属性

private MessageModel messageModel;

然后如果以后有找不到的配置,可以尝试这样,当然仅仅是猜测,这块我没试验

还有就是官方示例中的配置部分和官方文档上写明的配置不一样,可能够有效吧,由于部分示例我这边没有进行测试,所以如果示例中的配置不好使,可以参考本文中比较全的配置文件

还有springCloud bus 结合rocketmq的架构,这个之后再说吧,https://blog.csdn.net/weixin_43847283/article/details/122419187

关联信息

  • 关联的主题:
  • 上一篇:
  • 下一篇:
  • image: 20221111/1
  • 转载自:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/76957.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Akka 学习(六)Actor的监督机制

目录一 监督机制1.1 错误隔离性1.2 系统冗余性1.3 Actor的监督1.3.1 监督体系1.3.2 理解1,3.3 监督策越一 监督机制 1.1 错误隔离性 在学习Akka如何对失败情况进行响应之前&#xff0c;先了解一些在分布式应用程序中都应该遵循的通用策略&#xff1a;隔离错误。假设每个组件都…

【刷题-数组篇】狂刷力扣三十题,“数组”嘎嘎乱写 | 2022 12-5到12-9

前言 &#xff08;12月5日&#xff09;突然想起了很久以前别人&#xff08;具体来源已经记不清了&#xff09;传给我的一套题单。网上的题单不少&#xff0c;光收藏可不行&#xff0c;关键还得下手。 这套题单的题目数量为300出头&#xff0c;什么时候刷完我还没有明确计划&a…

web前端大作业 (仿英雄联盟网站制作HTML+CSS+JavaScript) 学生dreamweaver网页设计作业

&#x1f389;精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业…

2.IOC之xml配置

1.使用IDEA创建工程 2.引入项目使用的依赖 <dependencies><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.2.2.RELEASE</version></dependency> </depe…

英文外链代发怎么做有效果?英文外链购买平台

英文外链代发怎么做有效果&#xff1f; 答案是&#xff1a;选择权重较好的GPB外链 我们首先要知道一个观点&#xff0c;什么样的外链才有效果&#xff1f; 1.英文外链网站的有一定的权重&#xff0c;可高可低&#xff0c;但一定要有权重&#xff0c;数值指标可以参考MOZ的Do…

10.AOP之xml配置

1.使用IDEA创建工程 2.引入项目使用的依赖 <dependencies><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.2.2.RELEASE</version></dependency><depend…

HPPH偶联无机纳米材料/白蛋白/白蛋白纳米粒/抗体/量子点/活性基团/荧光/细胞膜的研究

小编分享了HPPH偶联无机纳米材料/白蛋白/白蛋白纳米粒/抗体/量子点/活性基团/荧光/细胞膜的研究知识&#xff0c;一起来看&#xff01; HPPH偶联无机纳米材料/白蛋白纳米粒的研究&#xff1a; HPPH 具有的光动力活性的作用光谱以及靶向性&#xff0c;对组织的穿透率&#xff0…

Android基础学习(十九)—— 进程与线程

1、进程 程序和进程的区别&#xff1a;&#xff08;1&#xff09;程序是静态的&#xff0c;就是存放在磁盘里的可执行文件&#xff0c;就是一系列的指令集合&#xff1b;&#xff08;2&#xff09;进程是动态的&#xff0c;是程序的一次执行过程&#xff0c;同一程序多次执行会…

物联网开发笔记(58)- 使用Micropython开发ESP32开发板之控制2.90寸电子墨水屏模块

一、目的 这一节我们学习如何使用我们的ESP32开发板来控制2.90寸电子墨水屏模块。 二、环境 ESP32 2.90寸 电子墨水屏模块 Thonny IDE 几根杜邦线 接线方法&#xff1a; 三、墨水屏驱动 此处注意注意&#xff1a;不同的型号、不同厂家的墨水屏驱动方式有些不同&#xff0c;…

VIIF:自监督:自适应:GAN

Self-supervised feature adaption for infrared and visible image fusion &#xff08;红外和可见光图像融合的自监督特征自适应&#xff09; 总述&#xff1a;首先&#xff0c;我们采用编码器网络来提取自适应特征。然后&#xff0c;利用两个具有注意机制块的解码器以自我…

【扫描PDF】如何将颜色淡的扫描PDF颜色变深,便于阅读??PDF中文字太淡怎么加深?汇总网上已有的方法,一波小结

一、问题背景 如果你扫描得到的PDF&#xff0c;像下图一样文字颜色非常淡&#xff0c;看起来不舒服&#xff0c;需要加深处理&#xff0c;就烦请看我下面的几个解决方法&#xff0c;都是从网上汇总得到&#xff0c;加上自己的实践和体会总结。 二、Adobe Acrobat DC PDF扫描…

20221209英语学习

今日新词&#xff1a; receiver n.收受者; 收件人; 接待者; (电话)听筒, 耳机; 收音机; (电视)接收机; 接收器; 接球手 annoy n.同“annoyance” delight n.快乐&#xff0c;愉快 railroad n.铁路, 铁道, 铁路公司, 铁路系统 brilliance n.光辉, 【光】辉度, 漂亮, (名声)…

3.IOC之注解配置

1.编写Spring框架核心配置文件applicationContext.xml 在项目目录“/src/main/resources”下新建applicationContext.xml文件&#xff0c;具体代码如下。 <?xml version"1.0" encoding"UTF-8"?> <beans xmlns"http://www.springframework…

Google如何增加外链?谷歌外链自动化靠谱吗?

Google如何增加外链&#xff1f; 答案是&#xff1a;循序渐进增加免费开放性注册的外链和GPB外链 我们在发布Google外链的时候&#xff0c;总想找捷径&#xff0c;通过软件工具自动发布外链来提高网站排名和流量&#xff0c;加快SEO优化进度&#xff0c;缩短时间成本&#xf…

ChatGPT:构建与人类聊天一样自然的机器人

ChatGPT&#xff1a;构建与人类聊天一样自然的机器人 —— ChatGPT 文章目录ChatGPT&#xff1a;构建与人类聊天一样自然的机器人 —— ChatGPT1 官网2 注册OpenAI账号3 使用ChatGPT3.1 普通聊天3.2 生成代码3.3 写诗3.4 解一道算法题4 ChatGPT中文版VsCode 插件5 一些体会Hi&a…

浅析即时通讯开发之RTMP数据传输协议的实时流媒体

近年来,随着网络带宽的提升,以及多媒体压缩编码技术的发展,流媒体技术得到了非常广泛的应用。全球的流媒体市场正在以极高的速度向前发展,并逐步取代了以文本和图片为主的传统互联网。根据Cisco的VisualNetworkingIndex(VNI)统计,2005年流媒体流量仅占全球互联网总流量的5%,而到…

【玩转c++】c++模板和泛型编程

本期主题&#xff1a;c模板和泛型编程博客主页&#xff1a;小峰同学分享小编的在Linux中学习到的知识和遇到的问题小编的能力有限&#xff0c;出现错误希望大家不吝赐身为程序员&#xff0c;不会有人没女朋友吧&#xff01;&#xff01; 目录 &#x1f341;1.泛型编程 &#x…

ChatGPT 是何方神圣?为什么这么猛?

哈喽&#xff0c;大家好&#xff0c;我是木易巷&#xff01; 本篇文章给大家介绍一下这个很猛的玩意&#xff1a;ChatGPT &#xff01;&#xff01;&#xff01; 什么是ChatGPT &#xff1f; 在12月初&#xff0c;人工智能实验室OpenAI发布了一款名为ChatGPT的自然语言生成式…

【Pytorch】第 5 章 :解决多臂老虎机问题

&#x1f50e;大家好&#xff0c;我是Sonhhxg_柒&#xff0c;希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流&#x1f50e; &#x1f4dd;个人主页&#xff0d;Sonhhxg_柒的博客_CSDN博客 &#x1f4c3; &#x1f381;欢迎各位→点赞…

web期末网站设计大作业 HTML+CSS+JS仿爱奇艺官网影视网站

HTML实例网页代码, 本实例适合于初学HTML的同学。该实例里面有设置了css的样式设置&#xff0c;有div的样式格局&#xff0c;这个实例比较全面&#xff0c;有助于同学的学习,本文将介绍如何通过从头开始设计个人网站并将其转换为代码的过程来实践设计。 文章目录一、网页介绍一…