背景xxx,关键字 binder stream ,解决多中间件通信及切换问题
直接主菜:
spring cloud stream 架构
中间件 --- binder --- channel --- sink --- (处理)---source ---channel ---binder ---中间件
springcloudstream已自己集成了kafka、rabbitmq ,其他厂商也集成了一些。在官网有说明 https://docs.spring.io/spring-cloud-stream/docs/3.2.7/reference/html/index.html
但是有时候还需自己实现,官方也给出了响应步骤
https://docs.spring.io/spring-cloud-stream/docs/3.2.7/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-custom-binder-impl
自定义实现
定义xxBinder
cp了一网友的项目,我换成了maven,
https://github.com/yangyongdehao30/spring-cloud-stream-binder-mqtt/tree/yangyongdehao30-maven
具体实现如下:
设置config类
import com.sheunglaili.binder.mqtt.MqttMessageChannelBinder;
import com.sheunglaili.binder.mqtt.MqttProvisioningProvider;
import com.sheunglaili.binder.mqtt.properties.MqttBinderConfigurationProperties;
import com.sheunglaili.binder.mqtt.properties.MqttBindingProperties;
import com.sheunglaili.binder.mqtt.properties.MqttExtendedBindingProperties;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.config.BindingHandlerAdvise;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.util.ObjectUtils;
/**
* Mqtt binder configuration class
* @author Alex , Li Sheung Lai
*/
@Configuration
@EnableConfigurationProperties({
MqttExtendedBindingProperties.class})
public class MqttBinderConfiguration {
@Autowired
private MqttExtendedBindingProperties mqttExtendedBindingProperties;
@Bean
public MqttBinderConfigurationProperties configurationProperties(){
return new MqttBinderConfigurationProperties();
}
@Bean
public MqttProvisioningProvider provisioningProvider(MqttBinderConfigurationProperties configurationProperties){
return new MqttProvisioningProvider();
}
@Bean
public MqttPahoClientFactory mqttPahoClientFactory(MqttBinderConfigurationProperties configurationProperties) {
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(configurationProperties.getUrl());
options.setUserName(configurationProperties.getUsername());
options.setPassword(configurationProperties.getPassword().toCharArray());
options.setCleanSession(configurationProperties.isCleanSession());
options.setConnectionTimeout(configurationProperties.getConnectionTimeout());
options.setKeepAliveInterval(configurationProperties.getKeepAliveInterval());
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(options);
if (ObjectUtils.nullSafeEquals(configurationProperties.getPersistence(), "file")) {
factory.setPersistence(new MqttDefaultFilePersistence(configurationProperties.getPersistenceDirectory()));
}
else if (ObjectUtils.nullSafeEquals(configurationProperties.getPersistence(), "memory")) {
factory.setPersistence(new MemoryPersistence());
}
return factory;
}
@Bean
public MqttMessageChannelBinder mqttMessageChannelBinder(MqttPahoClientFactory mqttPahoClientFactory,
MqttProvisioningProvider provisioningProvider){
MqttMessageChannelBinder mqttMessageChannelBinder = new MqttMessageChannelBinder(mqttPahoClientFactory,provisioningProvider);
return mqttMessageChannelBinder;
}
配置properties
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.config.BinderProperties;
import org.springframework.context.annotation.PropertySource;
import org.springframework.util.Assert;
import org.springframework.validation.annotation.Validated;
import javax.validation.constraints.Size;
/**
* Configuration properties for the Mqtt binder . The properties in the class
* are prefixed with <b>spring.cloud.stream.mqtt.binder</b>
* @author Alex , Li Sheung Lai
*/
@Data
@Validated
@ConfigurationProperties(prefix = "spring.cloud.stream.mqtt")
public class MqttBinderConfigurationProperties {
/**
* location of the mqtt broker(s) (comma-delimited list)
*/
@Size(min = 1)
private String[] url = new String[] { "tcp://localhost:1883" };
/**
* the username to use when connecting to the broker
*/
private String username = "guest";
/**
* the password to use when connecting to the broker
*/
private String password = "guest";
/**
* whether the client and server should remember state across restarts and reconnects
*/
private boolean cleanSession = true;
/**
* the connection timeout in seconds
*/
private int connectionTimeout = 30;
/**
* the ping interval in seconds
*/
private int keepAliveInterval = 60;
/**
* 'memory' or 'file'
*/
private String persistence = "memory";
/**
* Persistence directory
*/
private String persistenceDirectory = "/tmp/paho";
public MqttBinderConfigurationProperties() {
}
public String[] getUrl() {
return url;
}
public void setUrl(String[] url) {
this.url = url;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public boolean isCleanSession() {
return cleanSession;
}
public void setCleanSession(boolean cleanSession) {
this.cleanSession = cleanSession;
}
public int getConnectionTimeout() {
return connectionTimeout;
}
public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}
public int getKeepAliveInterval() {
return keepAliveInterval;
}
public void setKeepAliveInterval(int keepAliveInterval) {
this.keepAliveInterval = keepAliveInterval;
}
public String getPersistence() {
return persistence;
}
public void setPersistence(String persistence) {
this.persistence = persistence;
}
public String getPersistenceDirectory() {
return persistenceDirectory;
}
public void setPersistenceDirectory(String persistenceDirectory) {
this.persistenceDirectory = persistenceDirectory;
}
//注,和本properties同文件夹的还有几个类,具体在 git中 ,可下载拷贝
实现一个channel binder
import com.sheunglaili.binder.mqtt.properties.MqttExtendedBindingProperties;
import com.sheunglaili.binder.mqtt.properties.MqttSinkProperties;
import com.sheunglaili.binder.mqtt.properties.MqttSourceProperties;
import org.springframework.cloud.stream.binder.*;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
public class MqttMessageChannelBinder
extends AbstractMessageChannelBinder<ExtendedConsumerProperties<MqttSourceProperties>, ExtendedProducerProperties<MqttSinkProperties>, MqttProvisioningProvider>
implements ExtendedPropertiesBinder<MessageChannel, MqttSourceProperties, MqttSinkProperties> {
private MqttExtendedBindingProperties extendedBindingProperties = new MqttExtendedBindingProperties();
private MqttPahoClientFactory mqttPahoClientFactory;
public void setMqttPahoClientFactory(MqttPahoClientFactory mqttPahoClientFactory) {
this.mqttPahoClientFactory = mqttPahoClientFactory;
}
public MqttMessageChannelBinder(
MqttPahoClientFactory factory,
MqttProvisioningProvider provisioningProvider) {
super(BinderHeaders.STANDARD_HEADERS, provisioningProvider);
this.mqttPahoClientFactory = factory;
}
@Override
protected MessageHandler createProducerMessageHandler(
ProducerDestination destination,
ExtendedProducerProperties<MqttSinkProperties> producerProperties,
MessageChannel errorChannel) throws Exception {
MqttSinkProperties sinkProperties = producerProperties.getExtension();
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(
sinkProperties.getQos(),
sinkProperties.isRetained(),
sinkProperties.getCharset()
);
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(
sinkProperties.getClientId(),
this.mqttPahoClientFactory
);
handler.setAsync(sinkProperties.isAsync());
handler.setDefaultTopic(sinkProperties.getTopic());
handler.setConverter(converter);
return handler;
}
@Override
protected MessageProducer createConsumerEndpoint(
ConsumerDestination destination,
String group,
ExtendedConsumerProperties<MqttSourceProperties> properties) throws Exception {
MqttSourceProperties sourceProperties = properties.getExtension();
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(
sourceProperties.getCharset()
);
converter.setPayloadAsBytes(sourceProperties.isBinary());
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
sourceProperties.getClientId(),
this.mqttPahoClientFactory,
sourceProperties.getTopics()
);
adapter.setBeanFactory(this.getBeanFactory());
adapter.setQos(sourceProperties.getQos());
adapter.setConverter(converter);
adapter.setOutputChannelName(destination.getName());
return adapter;
}
public void setExtendedBindingProperties(MqttExtendedBindingProperties extendedBindingProperties) {
this.extendedBindingProperties = extendedBindingProperties;
}
@Override
public MqttSourceProperties getExtendedConsumerProperties(String channelName) {
return this.extendedBindingProperties.getExtendedConsumerProperties(channelName);
}
@Override
public MqttSinkProperties getExtendedProducerProperties(String channelName) {
return this.extendedBindingProperties.getExtendedProducerProperties(channelName);
}
@Override
public String getDefaultsPrefix() {
return this.extendedBindingProperties.getDefaultsPrefix();
}
@Override
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
}
}
实现一个Provider
import com.sheunglaili.binder.mqtt.properties.MqttSinkProperties;
import com.sheunglaili.binder.mqtt.properties.MqttSourceProperties;
import lombok.RequiredArgsConstructor;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
public class MqttProvisioningProvider implements
ProvisioningProvider<ExtendedConsumerProperties<MqttSourceProperties>, ExtendedProducerProperties<MqttSinkProperties>> {
@Override
public ProducerDestination provisionProducerDestination(
String name,
ExtendedProducerProperties<MqttSinkProperties> properties) throws ProvisioningException {
return new MqttTopicDestination(name);
}
@Override
public ConsumerDestination provisionConsumerDestination(String name, String group, ExtendedConsumerProperties<MqttSourceProperties> properties) throws ProvisioningException {
return new MqttTopicDestination(name);
}
@RequiredArgsConstructor
private class MqttTopicDestination implements ProducerDestination , ConsumerDestination{
private final String destination;
@Override
public String getName() {
return this.destination.trim();
}
@Override
public String getNameForPartition(int partition) {
throw new UnsupportedOperationException("Partitioning is not implemented for mqtt");
}
}
}
配置 spring.binders
mqtt:\
com.sheunglaili.binder.mqtt.config.MqttBinderConfiguration
配置如下:
spring.cloud.stream.binders.mqtt1.type=mqtt
spring.cloud.stream.binders.mqtt1.environment.spring.cloud.stream.mqtt.url=tcp://localhost:1883
spring.cloud.stream.binders.mqtt1.environment.spring.cloud.stream.mqtt.username=admin
spring.cloud.stream.binders.mqtt1.environment.spring.cloud.stream.mqtt.password=admin
记得,不要扫描到BinderConfiguration,xxBinderConfiguration 是在binderService动态配置的,具体构建Binder在这,如果扫描到BinderConfiguration类,此处binders.size就不是0了