spring cloud stream 自定义binder

news2025/1/16 4:58:54

背景xxx,关键字 binder stream ,解决多中间件通信及切换问题

直接主菜:

  1. spring cloud stream 架构

中间件 --- binder --- channel --- sink --- (处理)---source ---channel ---binder ---中间件

springcloudstream已自己集成了kafka、rabbitmq ,其他厂商也集成了一些。在官网有说明 https://docs.spring.io/spring-cloud-stream/docs/3.2.7/reference/html/index.html

但是有时候还需自己实现,官方也给出了响应步骤

https://docs.spring.io/spring-cloud-stream/docs/3.2.7/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-custom-binder-impl

  1. 自定义实现

定义xxBinder

cp了一网友的项目,我换成了maven,

https://github.com/yangyongdehao30/spring-cloud-stream-binder-mqtt/tree/yangyongdehao30-maven

具体实现如下:

设置config类


import com.sheunglaili.binder.mqtt.MqttMessageChannelBinder;
import com.sheunglaili.binder.mqtt.MqttProvisioningProvider;
import com.sheunglaili.binder.mqtt.properties.MqttBinderConfigurationProperties;
import com.sheunglaili.binder.mqtt.properties.MqttBindingProperties;
import com.sheunglaili.binder.mqtt.properties.MqttExtendedBindingProperties;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.config.BindingHandlerAdvise;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.util.ObjectUtils;

/**
 * Mqtt binder configuration class
 * @author Alex , Li Sheung Lai
 */
@Configuration
@EnableConfigurationProperties({
        MqttExtendedBindingProperties.class})
public class MqttBinderConfiguration {

    @Autowired
    private MqttExtendedBindingProperties mqttExtendedBindingProperties;




    @Bean
    public MqttBinderConfigurationProperties configurationProperties(){
        return new MqttBinderConfigurationProperties();
    }

    @Bean
    public MqttProvisioningProvider provisioningProvider(MqttBinderConfigurationProperties configurationProperties){
        return new MqttProvisioningProvider();
    }

    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(MqttBinderConfigurationProperties configurationProperties) {

        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(configurationProperties.getUrl());
        options.setUserName(configurationProperties.getUsername());
        options.setPassword(configurationProperties.getPassword().toCharArray());
        options.setCleanSession(configurationProperties.isCleanSession());
        options.setConnectionTimeout(configurationProperties.getConnectionTimeout());
        options.setKeepAliveInterval(configurationProperties.getKeepAliveInterval());
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(options);
        if (ObjectUtils.nullSafeEquals(configurationProperties.getPersistence(), "file")) {
            factory.setPersistence(new MqttDefaultFilePersistence(configurationProperties.getPersistenceDirectory()));
        }
        else if (ObjectUtils.nullSafeEquals(configurationProperties.getPersistence(), "memory")) {
            factory.setPersistence(new MemoryPersistence());
        }
        return factory;
    }

    @Bean
    public MqttMessageChannelBinder mqttMessageChannelBinder(MqttPahoClientFactory mqttPahoClientFactory,
                                                             MqttProvisioningProvider provisioningProvider){

       MqttMessageChannelBinder mqttMessageChannelBinder = new MqttMessageChannelBinder(mqttPahoClientFactory,provisioningProvider);
        return mqttMessageChannelBinder;
    }

配置properties


import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.config.BinderProperties;
import org.springframework.context.annotation.PropertySource;
import org.springframework.util.Assert;
import org.springframework.validation.annotation.Validated;

import javax.validation.constraints.Size;

/**
 * Configuration properties for the Mqtt binder . The properties in the class
 * are prefixed with <b>spring.cloud.stream.mqtt.binder</b>
 * @author Alex , Li Sheung Lai
 */
@Data
@Validated
@ConfigurationProperties(prefix = "spring.cloud.stream.mqtt")
public class MqttBinderConfigurationProperties {


    /**
     * location of the mqtt broker(s) (comma-delimited list)
     */
    @Size(min = 1)
    private String[] url = new String[] { "tcp://localhost:1883" };

    /**
     * the username to use when connecting to the broker
     */
    private String username = "guest";

    /**
     * the password to use when connecting to the broker
     */
    private String password = "guest";

    /**
     * whether the client and server should remember state across restarts and reconnects
     */
    private boolean cleanSession = true;

    /**
     * the connection timeout in seconds
     */
    private int connectionTimeout = 30;

    /**
     * the ping interval in seconds
     */
    private int keepAliveInterval = 60;

    /**
     * 'memory' or 'file'
     */
    private String persistence = "memory";

    /**
     * Persistence directory
     */
    private String persistenceDirectory = "/tmp/paho";

    public MqttBinderConfigurationProperties() {
    }


    public String[] getUrl() {
        return url;
    }

    public void setUrl(String[] url) {
        this.url = url;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public boolean isCleanSession() {
        return cleanSession;
    }

    public void setCleanSession(boolean cleanSession) {
        this.cleanSession = cleanSession;
    }

    public int getConnectionTimeout() {
        return connectionTimeout;
    }

    public void setConnectionTimeout(int connectionTimeout) {
        this.connectionTimeout = connectionTimeout;
    }

    public int getKeepAliveInterval() {
        return keepAliveInterval;
    }

    public void setKeepAliveInterval(int keepAliveInterval) {
        this.keepAliveInterval = keepAliveInterval;
    }

    public String getPersistence() {
        return persistence;
    }

    public void setPersistence(String persistence) {
        this.persistence = persistence;
    }

    public String getPersistenceDirectory() {
        return persistenceDirectory;
    }

    public void setPersistenceDirectory(String persistenceDirectory) {
        this.persistenceDirectory = persistenceDirectory;
    }
//注,和本properties同文件夹的还有几个类,具体在 git中 ,可下载拷贝

实现一个channel binder


import com.sheunglaili.binder.mqtt.properties.MqttExtendedBindingProperties;
import com.sheunglaili.binder.mqtt.properties.MqttSinkProperties;
import com.sheunglaili.binder.mqtt.properties.MqttSourceProperties;
import org.springframework.cloud.stream.binder.*;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

public class MqttMessageChannelBinder
        extends AbstractMessageChannelBinder<ExtendedConsumerProperties<MqttSourceProperties>, ExtendedProducerProperties<MqttSinkProperties>, MqttProvisioningProvider>
        implements ExtendedPropertiesBinder<MessageChannel, MqttSourceProperties, MqttSinkProperties> {


    private MqttExtendedBindingProperties extendedBindingProperties = new MqttExtendedBindingProperties();

    private MqttPahoClientFactory mqttPahoClientFactory;

    public void setMqttPahoClientFactory(MqttPahoClientFactory mqttPahoClientFactory) {
        this.mqttPahoClientFactory = mqttPahoClientFactory;
    }

    public MqttMessageChannelBinder(
            MqttPahoClientFactory factory,
            MqttProvisioningProvider provisioningProvider) {
        super(BinderHeaders.STANDARD_HEADERS, provisioningProvider);
        this.mqttPahoClientFactory = factory;
    }

    @Override
    protected MessageHandler createProducerMessageHandler(
            ProducerDestination destination,
            ExtendedProducerProperties<MqttSinkProperties> producerProperties,
            MessageChannel errorChannel) throws Exception {

        MqttSinkProperties sinkProperties = producerProperties.getExtension();

        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(
                sinkProperties.getQos(),
                sinkProperties.isRetained(),
                sinkProperties.getCharset()
        );

        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(
                sinkProperties.getClientId(),
                this.mqttPahoClientFactory
        );
        handler.setAsync(sinkProperties.isAsync());
        handler.setDefaultTopic(sinkProperties.getTopic());
        handler.setConverter(converter);
        return handler;
    }

    @Override
    protected MessageProducer createConsumerEndpoint(
            ConsumerDestination destination,
            String group,
            ExtendedConsumerProperties<MqttSourceProperties> properties) throws Exception {

        MqttSourceProperties sourceProperties = properties.getExtension();

        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(
                sourceProperties.getCharset()
        );
        converter.setPayloadAsBytes(sourceProperties.isBinary());

        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                sourceProperties.getClientId(),
                this.mqttPahoClientFactory,
                sourceProperties.getTopics()
        );

        adapter.setBeanFactory(this.getBeanFactory());
        adapter.setQos(sourceProperties.getQos());
        adapter.setConverter(converter);
        adapter.setOutputChannelName(destination.getName());
        return adapter;
    }

    public void setExtendedBindingProperties(MqttExtendedBindingProperties extendedBindingProperties) {
        this.extendedBindingProperties = extendedBindingProperties;
    }

    @Override
    public MqttSourceProperties getExtendedConsumerProperties(String channelName) {
        return this.extendedBindingProperties.getExtendedConsumerProperties(channelName);
    }

    @Override
    public MqttSinkProperties getExtendedProducerProperties(String channelName) {
        return this.extendedBindingProperties.getExtendedProducerProperties(channelName);
    }

    @Override
    public String getDefaultsPrefix() {
        return this.extendedBindingProperties.getDefaultsPrefix();
    }

    @Override
    public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
        return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
    }

}

实现一个Provider


import com.sheunglaili.binder.mqtt.properties.MqttSinkProperties;
import com.sheunglaili.binder.mqtt.properties.MqttSourceProperties;
import lombok.RequiredArgsConstructor;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;

public class MqttProvisioningProvider implements
        ProvisioningProvider<ExtendedConsumerProperties<MqttSourceProperties>, ExtendedProducerProperties<MqttSinkProperties>> {

    @Override
    public ProducerDestination provisionProducerDestination(
            String name,
            ExtendedProducerProperties<MqttSinkProperties> properties) throws ProvisioningException {
        return new MqttTopicDestination(name);
    }

    @Override
    public ConsumerDestination provisionConsumerDestination(String name, String group, ExtendedConsumerProperties<MqttSourceProperties> properties) throws ProvisioningException {
        return new MqttTopicDestination(name);
    }

    @RequiredArgsConstructor
    private class MqttTopicDestination implements ProducerDestination , ConsumerDestination{

        private final String destination;

        @Override
        public String getName() {
            return this.destination.trim();
        }

        @Override
        public String getNameForPartition(int partition) {
            throw  new UnsupportedOperationException("Partitioning is not implemented for mqtt");
        }
    }
}

配置 spring.binders

mqtt:\
com.sheunglaili.binder.mqtt.config.MqttBinderConfiguration

配置如下:

spring.cloud.stream.binders.mqtt1.type=mqtt
spring.cloud.stream.binders.mqtt1.environment.spring.cloud.stream.mqtt.url=tcp://localhost:1883
spring.cloud.stream.binders.mqtt1.environment.spring.cloud.stream.mqtt.username=admin
spring.cloud.stream.binders.mqtt1.environment.spring.cloud.stream.mqtt.password=admin

记得,不要扫描到BinderConfiguration,xxBinderConfiguration 是在binderService动态配置的,具体构建Binder在这,如果扫描到BinderConfiguration类,此处binders.size就不是0了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/399860.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

12升400V 升压DC-DC高压脱毛仪解决方案SC3671

ipl(intense pulsed light&#xff0c;强脉冲光)脱毛&#xff0c;也叫光子脱毛&#xff0c;是市场上的一种新型脱毛技术和美容方法&#xff0c;其利用强脉冲光特殊的波长和光热效应实现破坏毛囊并达到永久脱毛的效果&#xff0c;具有速度快&#xff0c;效果好&#xff0c;安全性…

【前端学习】D5:CSS进阶

文章目录前言系列文章目录1 精灵图Sprites1.1 为什么需要精灵图&#xff1f;1.2 精灵图的使用2 字体图标iconfont2.1 字体图标的产生2.2 字体图标的优点2.3 字体文件格式2.4 字体图标的使用2.5 字体图标的引入2.6 字体图标的追加3 CSS三角3.1 普通三角3.2 案例4 CSS用户界面样式…

【学习笔记】Docker(二)

Docker镜像加载原理 UnionFS(联合文件系统) 一种分层、轻量级并且高性能的文件系统&#xff0c;它支持对文件系统的修改作为一次提交来一层层的叠加&#xff0c;同时可以将不同目录挂载到同一个虚拟文件系统下。Union文件系统是Docker镜像的基础。镜像可以通过分层来进行继承&a…

音视频开发之IOMX调用端—OMXCodec源码分析

概述 OMX Codec是stagefrightplayer中负责解码的模块。由于遵循openmax接口规范&#xff0c;因此结构稍微有点负责&#xff0c;这里就依照awesomeplayer中的调用顺序来介绍。 主要分如下几步&#xff1a; 1 mClient->connect2 InitAudioDecoder & InitVideoDecoder3 …

C++回顾(十三)—— 运算符重载提高

13.1 为什么不要重载 && 和 || 运算符 1&#xff09;&&和||是C中非常特殊的操作符2&#xff09;&&和||内置实现了短路规则3&#xff09;操作符重载是靠函数重载来完成的4&#xff09;操作数作为函数参数传递5&#xff09;C的函数参数都会被求值&#…

xxl-job分布式任务调度平台

分布式任务调度平台XXL-JOB (xuxueli.com) 1 xxl-job概述 XXL-JOB是一个分布式任务调度平台&#xff0c;其核心设计目标是开发迅速、学习简单、轻量级、易扩展。 1.1 xxl-job架构 我把上面的图精简了一下&#xff0c;xxl-job 的调度器和业务执行是独立的。调度器决定任务的调…

【数据分析师求职面试指南】必备基础知识整理

数据分析师基础知识统计 数据分析知识基础概念随机变量常用特征正态分布与大数定律、中心极限定律假设检验模型、数据挖掘知识常用概念数据集划分欠拟合过拟合模型分类方法常见模型介绍线性回归模型&#xff1a;逻辑回归模型决策树模型随机森林模型Boosting模型XGBoost模型模型…

前后端分离项目学习-vue+springboot 博客

前后端分离项目 文章总体分为2大部分&#xff0c;Java后端接口和vue前端页面 项目演示&#xff1a;www.markerhub.com:8084/blogs Java后端接口开发 1、前言 从零开始搭建一个项目骨架&#xff0c;最好选择合适&#xff0c;熟悉的技术&#xff0c;并且在未来易拓展&#xf…

精简:设计模式

1.设计模式概述 1.什么是设计模式 设计模式(Design Pattern)是前辈们对代码开发经验的总结&#xff0c;是解决特定问题的一系列套路。 它不是语法规定&#xff0c;而是一套用来提高代码可复用性、可维护性、可读性、稳健性以及安全性的解决方案。 1995年&#xff0c;GoF (Ga…

flutter工程创建过程中遇到一些问题。

安装环境版本&#xff1a;JDK7.-JDK 8 Andriod SDK 10 flutter 版本 3.0 1.当创建完后flutter工程后会遇到 run gradle task assemlble Debug 的问题&#xff0c;需要设置远程仓库&#xff0c;共需要修改三个地方build.gradle两处以及flutter 下面的D:\FVM\versions\3.0.0\pac…

Excel常用可视化图表

目录柱状图与条形图折线图饼图漏斗图雷达图瀑布图及甘特图旭日图组合图excel图表&#xff1a;柱状数据条、excel热力图、mini图可视化工具的表现形式&#xff1a;看板、可视化大屏、驾驶舱 柱状图与条形图 条形图是柱状图的转置 类别&#xff1a; 单一柱状图&#xff1a;反映…

Linux内核移植

内核移植半导体厂商会从linux内核官网下载某个版本&#xff0c;将其移植到自己的CPU上&#xff0c;测试成功后就会将其开放给该半导体的厂商的CPU开发者&#xff0c;开发者下载其提供的linux内核&#xff0c;然后将其移植到自己的 产品上。1、NXP官方开发板Linux内核编译测试编…

VR会议不断升级,为商务会谈打造云端洽谈服务!

VR会议不断升级&#xff0c;为商务会谈打造云端洽谈服务。在商务合作中&#xff0c;对客户需求的理解以及与客户讲解方案都需要建立在一个有效的沟通上&#xff0c;因此VR会议的用武之地就有了&#xff0c;以VR全景技术为核心&#xff0c;通过同屏互动和全景通信技术&#xff0…

wiki(维基)是什么?企业为什么需要搭建wiki?

维基百科是wiki的一个著名例子。维基百科上的内容可以由任何人创建和编辑&#xff0c;只要他们能够访问网络浏览器&#xff0c;并且可以使用简化的加价语言进行写作。对于 wiki&#xff0c;没有集中的作者或团队负责内容生成。从某种意义上说&#xff0c;维基是非常民主的。维基…

【SCL】移位和循环指令的应用(音乐喷泉改进)

移位指令&#xff1a;右移&#xff08;SHR&#xff09;左移&#xff08;SHL&#xff09;和循环左移/右移&#xff08;ROR/ROL&#xff09;指令的应用 文章目录 目录 一、移位和循环移位指令 1.左移右移 2.使用左移和脉冲实现音乐喷泉 3.循环移位指令 二、优化的其它方法 1.使用…

计算机SCI期刊的分值是什么意思? - 易智编译EaseEditing

影响因子&#xff08;Impact Factor,IF)是美国ISI&#xff08;科学信息研究所)的JCR(期刊引证报告)中的一项数据。 即某期刊前两年发表的论文在统计当年的被引用总次数除以该期刊在前两年内发表的论文总数。这是一个国际上通行的期刊评价指标。 例如&#xff0c;某期刊2005年影…

2023年主流的固定资产管理方式

2023年主流的固定资产管理方式可能有以下三种&#xff1a; 基于PaaS平台的固定资产管理系统&#xff0c;支持低代码平台&#xff0c;可以通过拖拉拽的方式进行表单搭建、流程搭建、自定义仪表盘等&#xff0c;满足不同行业和企业的个性化需求。基于RFID和二维码相结合的固定资…

卷麻了,00后Jmeter用的比我还熟练,简直没脸见人......

经常看到无论是刚入职场的新人&#xff0c;还是工作了一段时间的老人&#xff0c;都会对测试工具的使用感到困扰&#xff1f;前言性能测试是一个全栈工程师/架构师必会的技能之一&#xff0c;只有学会性能测试&#xff0c;才能根据得到的测试报告进行分析&#xff0c;找到系统性…

Allegro如何快速查看差分对是否等长的方法

在用Allegro进行PCB设计时&#xff0c;用快速查看差分对是否等长的方法&#xff0c;可以提高效率。那如何操作呢&#xff1f;具体操作方法如下&#xff1a;&#xff08;1&#xff09;选择菜单栏Route选择Timing Vision&#xff08;时序视图&#xff09; 然后在Options选项卡Tim…

陀螺和加计有关参数部分说明

部分参数计算一、零偏二、随机游走三、Allan方差分析使用要点一、零偏 如果只用一个指标来代表一款IMU的精度的话&#xff0c;那毫无疑问是陀螺零偏。这是因为&#xff1a;1) 惯导系统的精度主要取决于IMU中的陀螺器件精度&#xff0c;而不是加速度计精度&#xff1b;2) 陀螺的…