RabbitMQ Stream插件使用详解

news2025/1/23 13:13:10

2.4版为RabbitMQ流插件引入了对RabbitMQStream插件Java客户端的初始支持。

  • RabbitStreamTemplate
  • StreamListener容器

将spring rabbit流依赖项添加到项目中:

<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit-stream</artifactId>
  <version>3.1.4</version>
</dependency>

您可以使用RabbitAdmin bean,使用QueueBuilder.stream()方法指定队列类型,正常地配置队列。例如:

@Bean
Queue stream() {
    return QueueBuilder.durable("stream.queue1")
            .stream()
            .build();
}

然而,这仅在您还使用non-stream 组件(如SimpleMessageListenerContainer或DirectMessageListeneerContainer)时才有效,因为在打开AMQP连接时会触发管理员来声明定义的bean。如果您的应用程序仅使用流组件,或者您希望使用高级流配置功能,则应改为配置StreamAdmin:

@Bean
StreamAdmin streamAdmin(Environment env) {
    return new StreamAdmin(env, sc -> {
        sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
        sc.stream("stream.queue2").create();
    });
}

一、Sending Messages

RabbitStreamTemplate提供RabbitTemplate(AMQP)功能的子集。

public interface RabbitStreamOperations extends AutoCloseable {

	CompletableFuture<Boolean> send(Message message);

	CompletableFuture<Boolean> convertAndSend(Object message);

	CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);

	CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);

	MessageBuilder messageBuilder();

	MessageConverter messageConverter();

	StreamMessageConverter streamMessageConverter();

	@Override
	void close() throws AmqpException;

}

RabbitStreamTemplate实现具有以下构造函数和属性:

public RabbitStreamTemplate(Environment environment, String streamName) {
}

public void setMessageConverter(MessageConverter messageConverter) {
}

public void setStreamConverter(StreamMessageConverter streamConverter) {
}

public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}

MessageConverter在convertAndSend方法中用于将对象转换为Spring AMQP消息。

StreamMessageConverter用于将Spring AMQP消息转换为本机流消息。

您也可以直接发送本机流消息;使用messageBuilder()方法提供对生产者的消息生成器的访问。

ProducerCustomizer提供了一种机制,用于在生成生产者之前对其进行自定义。

 二、Receiving Messages

异步消息接收由StreamListenerContainer(以及使用@RabbitListener时的StreamRabbitListerContainerFactory)提供。

侦听器容器需要一个Environment以及一个流名称。

您可以使用经典的MessageListener接收Spring AMQP消息,也可以使用新接口接收本地流消息:

public interface StreamMessageListener extends MessageListener {

	void onStreamMessage(Message message, Context context);

}

有关支持的属性的信息,请参阅消息侦听器容器配置。

与模板类似,容器具有ConsumerCustomizer属性。

有关自定义环境和使用者的信息,请参阅Java客户端文档。

使用@RabbitListener时,配置StreamRabbitListerContainerFactory;此时,大多数@RabbitListener属性(并发等)将被忽略。仅支持id、队列、autoStartup和containerFactory。此外,队列只能包含一个流名称。

三、Examples

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
    template.setProducerCustomizer((name, builder) -> builder.name("test"));
    return template;
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
    return new StreamRabbitListenerContainerFactory(env);
}

@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
    ...
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
    StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
    factory.setNativeListener(true);
    factory.setConsumerCustomizer((id, builder) -> {
        builder.name("myConsumer")
                .offset(OffsetSpecification.first())
                .manualTrackingStrategy();
    });
    return factory;
}

@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
    ...
    context.storeOffset();
}

@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue1")
            .stream()
            .build();
}

@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue2")
            .stream()
            .build();
}

2.4.5版将adviceChain属性添加到StreamListenerContainer(及其工厂)。还提供了一个新的工厂bean来创建一个无状态重试拦截器,该拦截器带有一个可选的StreamMessageRecoverer,用于在使用原始流消息时使用。

@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
    StreamRetryOperationsInterceptorFactoryBean rfb =
            new StreamRetryOperationsInterceptorFactoryBean();
    rfb.setRetryOperations(retryTemplate);
    rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
        ...
    });
    return rfb;
}

四、Super Streams

超级流是分区流的抽象概念,通过将多个流队列绑定到具有参数x-Super-Stream:true的交换来实现。

1、调配

为了方便起见,可以通过定义类型为SuperStream的单个bean来提供超级流。

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3);
}

RabbitAdmin检测到这个bean,并将声明交换(my.super.stream)和3个队列(分区)-my.super-stream-n,其中n是0,1,2,绑定的路由密钥等于n。

如果您还希望通过AMQP向exchange 发布,您可以提供自定义路由密钥:

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
					.mapToObj(j -> "rk-" + j)
					.collect(Collectors.toList()));
}

key 的数量必须等于分区的数量。

2、向超级流生产消息

你必须向 RabbitStreamTemplate 添加一个 superStreamRoutingFunction

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
    template.setSuperStreamRouting(message -> {
        // some logic to return a String for the client's hashing algorithm
    });
    return template;
}

你也可以通过AMQP发布,使用 RabbitTemplate

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1600962.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

WebKit内核游览器

WebKit内核游览器 基础概念游览器引擎Chromium 浏览器架构Webkit 资源加载这里就不得不提到http超文本传输协议这个概念了&#xff1a; 游览器多线程HTML 解析总结 基础概念 百度百科介绍 WebKit 是一个开源的浏览器引擎&#xff0c;与之相对应的引擎有Gecko&#xff08;Mozil…

C# 字面量null对于引用类型变量和值类型变量

编译器让相同的字符串字面量共享堆中的同一内存位置以节约内存。 在C#中&#xff0c;字面量&#xff08;literal&#xff09;是指直接表示固定值的符号&#xff0c;比如数字、字符串或者布尔值。而关键字&#xff08;keyword&#xff09;则是由编程语言定义的具有特殊含义的标…

mysql 转pg 两者不同的地方

因项目数据库&#xff08;原来是MySQL&#xff09;要改成PostgreSQL。 项目里面的sql要做一些调整。 1&#xff0c;写法上的区别&#xff1a; 1&#xff0c;数据准备&#xff1a; 新建表格&#xff1a; CREATE TABLE property_config ( CODE VARCHAR(50) NULL…

PHP一句话木马

一句话木马 PHP 的一句话木马是一种用于 Web 应用程序漏洞利用的代码片段。它通常是一小段 PHP 代码&#xff0c;能够在目标服务器上执行任意命令。一句话木马的工作原理是利用 Web 应用程序中的安全漏洞&#xff0c;将恶意代码注入到服务器端的 PHP 脚本中。一旦执行&#xf…

免费ssl通配符证书申请教程

在互联网安全日益受到重视的今天&#xff0c;启用HTTPS已经成为网站运营的基本要求。它不仅保障用户数据传输的安全&#xff0c;提升搜索引擎排名&#xff0c;还能增强用户对网站的信任。通配符证书是一种SSL/TLS证书&#xff0c;用于同时保护一个域名及其所有下一级子域名的安…

【Qt】:界面优化(一:基本语法)

界面优化 一.基本语法1.设置指定控件样式2.设置全局控件样式3.从文件加载样式表4.使⽤Qt Designer编辑样式&#xff08;最常用&#xff09; 二.选择器1.概述2.子控件选择器3.伪类型选择器 三.盒模型 在网页前端开发领域中,CSS是一个至关重要的部分.描述了一个网页的"样式&…

基于JavaWeb开发的springboot网约车智能接单规划小程序[附源码]

基于JavaWeb开发的springboot网约车智能接单规划小程序[附源码] &#x1f345; 作者主页 央顺技术团队 &#x1f345; 欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; &#x1f345; 文末获取源码联系方式 &#x1f4dd; &#x1f345; 查看下方微信号获取联系方式 承接各种…

Weakly Supervised Audio-Visual Violence Detection 论文阅读

Weakly Supervised Audio-Visual Violence Detection 论文阅读 摘要III. METHODOLOGYA. Multimodal FusionB. Relation Modeling ModuleC. Training and Inference IV. EXPERIMENTSV. CONCLUSION阅读总结 文章信息&#xff1a; 发表于&#xff1a;IEEE TRANSACTIONS ON MULTIME…

上海亚商投顾:沪指低开低走跌 两市逾千股跌超10%

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 三大指数昨日低开低走&#xff0c;沪指跌超1.6%&#xff0c;深成指、创业板指尾盘跌逾2%&#xff0c;微盘股指…

【c 语言】结构体指针

&#x1f388;个人主页&#xff1a;豌豆射手^ &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 &#x1f917;收录专栏&#xff1a;C语言 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共同学习、交流进步&…

2024年4月最新版GPT

2024年4月最新版ChatGPT/GPT4, 附上最新的使用教程。 随着人工智能技术的不断发展&#xff0c;ChatGPT和GPT4已经成为了人们日常生活中不可或缺的助手。2024年4月,OpenAI公司推出了最新版本的GPT4,带来了更加强大的功能和更加友好的用户体验。本文将为大家带来最新版GPT4的实用…

外网如何访问内网数据库?

企业和个人常常需要在外部网络环境中访问内部的数据库资源。这也是为了实现更大范围的资源共享和便捷的工作模式。由于网络安全和防火墙的限制&#xff0c;外网访问内网数据库并不是一件容易的事情。 在解决这个问题的过程中&#xff0c;天联组网应运而生。天联组网是一款异地组…

AIGC实战——VQ-GAN(Vector Quantized Generative Adversarial Network)

AIGC实战——VQ-GAN 0. 前言1. VQ-GAN2. ViT VQ-GAN小结系列链接 0. 前言 本节中&#xff0c;我们将介绍 VQ-GAN (Vector Quantized Generative Adversarial Network) 和 ViT VQ-GAN&#xff0c;它们融合了变分自编码器 (Variational Autoencoder, VAE)、Transformer 和生成对…

科技驱动未来,提升AI算力,GPU扩展正当时

要说这两年最火的科技是什么&#xff1f;我想“AI人工智能”肯定是最有资格上榜的&#xff0c;尤其ChatGPT推出后迅速在社交媒体上走红&#xff0c;短短5天&#xff0c;注册用户数就超过100万&#xff0c;2023年一月末&#xff0c;ChatGPT的月活用户更是突破1亿&#xff0c;成为…

内网kift私有网盘如何实现在外网公网访问?快解析映射方案

KIFT是一款面向个人、团队、小型组织的网盘服务器系统&#xff0c;安装运行比较简单&#xff0c;开箱即用&#xff0c;下载解压&#xff0c;双击jar文件即可启动。因为是开源的&#xff0c;不少人选择使用KIFT做开源私有网盘&#xff0c;有能力的大佬还可以对它进行定制开发。 …

python 列表对象函数

对象函数必须通过一个对象调用。 列表名.函数名() append() 将某一个元素对象添加在列表的表尾 如果添加的是其他的序列&#xff0c;该序列也会被看成是一个数据对象 count() 统计列表当中 某一个元素出现的次数 extend() 在当前列表中 将传入的其他序列的元素添加在表尾…

养猫必看!毛发护理秘籍,猫粮选择大揭秘!

亲爱的猫友们&#xff0c;我们都知道&#xff0c;猫咪的毛发是它们健康与美丽的象征。选择一款合适的猫粮&#xff0c;对于猫咪的毛发健康至关重要。那么&#xff0c;如何根据猫咪的毛发情况来选择合适的猫粮呢&#xff1f;接下来&#xff0c;就让我来为你详细解答吧&#xff0…

5.前后端分离

目录 一、前后端分离上传文件 1.在yml中设置port和localhost 2.如何使用postman测试上传文件的接口 二、如何导出excel文件 ​编辑1.在pom.xml中导包 2.在实体类中给每个字段添加注解&#xff0c;导出表格时&#xff0c;列名将会改为对应的中文 3.controller中方法的具体…

Swoole 实践篇之结合 WebRTC 实现音视频实时通信方案

原文首发链接&#xff1a;Swoole 实践篇之结合 WebRTC 实现音视频实时通信方案 大家好&#xff0c;我是码农先森。 引言 这次实现音视频实时通信的方案是基于 WebRTC 技术的&#xff0c;它是一种点对点的通信技术&#xff0c;通过浏览器之间建立对等连接&#xff0c;实现音频…

4.Labview簇、变体与类(上)

在Labview中&#xff0c;何为簇与变体&#xff0c;何为类&#xff1f;应该如何理解&#xff1f;具体有什么应用场景&#xff1f; 本文基于Labview软件&#xff0c;独到的讲解了簇与变体与类函数的使用方法和场景&#xff0c;从理论上讲解其数据流的底层概念&#xff0c;从实践上…