Spring Cloud Stream与Kafka(一)
在实际开发过程中,消息中间件用于解决应用解耦,异步消息,流量削峰等问题,实现高可用、高性能、可伸缩和最终一致性架构。不同的消息中间件实现方式不同,内部结构是不一样的。比如常见的RabbitMQ和Kafka,RabbitMQ有exchange,kafka有topic、partition,这些中间件的差异性导致我们在实际项目开发过程中造成了一定的干扰。如果采用了其中的一种,后面的业务需求,我想往另一种消息队列迁移,有一堆东西需要重做。Spring Cloud Stream是一种解耦的方式。
文章目录
- Spring Cloud Stream与Kafka(一)
- 简单介绍
- Kafka实例
- 生产者
- 消费者
简单介绍
- Spring Cloud Stream是由一个中间件中立的核心组成,应用通过Spring Cloud Stream插入的input(相当于消费者)和output(相当于生产者)通道与外界交流。通道通过指定中间件的Binder与外部代理连接,业务开发者不需要关注具体的消息中间件,只需要关注Binder对应程序提供的抽象概念来使用中间件实现业务就可以了。Spring Cloud Stream许多抽象和原语,简化了消息驱动微服务应用程序的开发。
- 最底层是消息服务,中间层是绑定层,绑定层和底层的消息服务进行绑定,顶层是消息生产者和消费者,顶层可以向绑定层生产消费、获取消息。
-
Binder绑定器作为中间层,实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要考虑具体的中间件实现。当需要升级或更换中间件产品时,我们要做的就是更换对应的Binder绑定器而不需要修改任何应用逻辑。
-
在Spring Cloud Stream中的消息通信方式遵循发布订阅模式,当一条消息被投递到消息中间件后,它会通过共享的主题进行广播,消费者在订阅的主题收到消息后触发自身的业务逻辑处理。这里的主题是抽象概念,代表发布共享消息给消费者的地方。在不同的消息中间中,主题可能对应着不同的概念。
-
Destination Binders是负责提供与外部消息系统集成的组件。Destination Bindings是外部消息系统和最终用户提供的应用程序代码(生产者/消费者)之间的桥梁。Message是生产者和消费者用于与目标绑定器通信的规范化数据结构。
Kafka实例
- 分别创建生产者kafka-producer和消费者kafka-consumer,引入依赖。
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
</dependencies>
生产者
- 添加配置文件application.yml
spring:
cloud:
stream:
kafka:
binder: # 绑定器
brokers: 192.168.182.171:9092 # broker的IP和端口
application:
name: kafka-provider
server:
port: 8301
- 添加启动类
package org.lxx.stream.kafka.producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaProducerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}
}
- 添加配置文件
package org.lxx.stream.kafka.producer.config;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
@Component
public interface MyProcessor {
String MESSAGE_OUTPUT = "log_output";
//在Kafka中创建主题log_output
@Output(MESSAGE_OUTPUT)
SubscribableChannel logOutput();
}
- 创建实体类
package org.lxx.stream.kafka.producer.entity;
import lombok.Data;
import java.util.Date;
@Data
public class LogInfo {
private String clientVersion;
private String userId;
private String clientIP;
private Date time;
}
- 创建控制器
package org.lxx.stream.kafka.producer.controller;
import lombok.extern.slf4j.Slf4j;
import org.lxx.stream.kafka.producer.config.MyProcessor;
import org.lxx.stream.kafka.producer.entity.LogInfo;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.sql.Date;
import java.time.Instant;
@Slf4j
@RestController
@EnableBinding(value = {MyProcessor.class})
public class MessageController {
@Resource
private MyProcessor myProcessor;
@GetMapping("sendLogMessage")
public void sendLogMessage(String message) {
Message<String> strMessage =
MessageBuilder.withPayload(message).build();
myProcessor.logOutput().send(strMessage);
}
@GetMapping("sendObjLogMessage")
public void sendObjLogMessage() {
LogInfo logInfo = new LogInfo();
logInfo.setClientIP("192.168.1.111");
logInfo.setClientVersion("1.0");
logInfo.setUserId("198663383837434");
logInfo.setTime(Date.from(Instant.now()));
Message<LogInfo> strMessage =
MessageBuilder.withPayload(logInfo).build();
myProcessor.logOutput().send(strMessage);
}
}
消费者
- 添加配置文件application.yml
spring:
cloud:
stream:
kafka:
binder: # 绑定器
brokers: 192.168.182.171:9092 # broker的IP和端口
application:
name: kafka-consumer
server:
port: 8302
- 添加启动类
package org.lxx.stream.kafka.consumer;
import org.lxx.stream.kafka.consumer.config.MyProcessor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
@EnableBinding(value = MyProcessor.class)
@SpringBootApplication
public class KafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaConsumerApplication.class, args);
}
}
- 添加配置类
package org.lxx.stream.kafka.consumer.config;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
@Component
public interface MyProcessor {
String MESSAGE_INPUT = "log_input";
String MESSAGE_OUTPUT = "log_output";
String LOG_FORMAT_INPUT = "log_format_input";
String LOG_FORMAT_OUTPUT = "log_format_output";
@Input(MESSAGE_INPUT)
SubscribableChannel logInput();
@Output(MESSAGE_OUTPUT)
SubscribableChannel logOutput();
@Input(LOG_FORMAT_INPUT)
SubscribableChannel logFormatInput();
@Output(LOG_FORMAT_OUTPUT)
SubscribableChannel logFormatOutput();
}
- 添加监听器
package org.lxx.stream.kafka.consumer.service;
import lombok.extern.slf4j.Slf4j;
import org.lxx.stream.kafka.consumer.config.MyProcessor;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MessageListener {
@StreamListener(MyProcessor.MESSAGE_INPUT)
@SendTo(MyProcessor.LOG_FORMAT_OUTPUT)
public String processLogMessage(String message) {
//通过MyProcessor.MESSAGE_INPUT接收消息
//然后通过SendTo把处理后的消息发送到MyProcessor.LOG_FORMAT_OUTPUT
log.info("GET Message:" + message);
return message;
}
@StreamListener(MyProcessor.LOG_FORMAT_INPUT)
public void processFormatLogMessage(String message) {
//接收来自MyProcessor.LOG_FORMAT_INPUT 的消息
//也就是SendTo发送的加工后的消息
log.info("接收到格式化后的消息:" + message);
}
}