Kafka在Vue和Spring Boot中的使用实例
一、项目概述
本项目演示了如何在Vue前端和Spring Boot后端中集成Kafka,实现实时消息的发送和接收,以及数据的实时展示。
后端实现:springboot配置、kafka配置、消息模型和仓库、消息服务和消费者、websocket配置、REST api控制器
前端实现:vue项目创建、websocket客户端配置、api服务、消息聊天组件、统计图表组件、主页面和路由配置
1.1 功能特点
- 前端实时发送消息到Kafka
- 后端接收Kafka消息并处理
- 后端发送消息到Kafka
- 前端实时接收并展示Kafka消息
- 消息历史记录展示
- 消息统计图表展示
1.2 技术栈
- 前端:Vue 3 + Element Plus + ECharts
- 后端:Spring Boot 2.7.x + Spring Kafka
- 消息中间件:Apache Kafka
- 数据库:MySQL (存储消息历史)
二、环境准备
2.1 安装Kafka
# 下载Kafka
wget https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
# 解压
tar -xzf kafka_2.13-3.5.1.tgz
# 启动Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动Kafka
bin/kafka-server-start.sh config/server.properties
2.2 创建Topic
# 创建消息主题
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 \
--replication-factor 1 --partitions 3 --topic chat-messages
# 创建通知主题
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 \
--replication-factor 1 --partitions 3 --topic notifications
三、后端实现
3.1 添加依赖
<!-- pom.xml -->
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- MySQL -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
3.2 配置Kafka
# application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: chat-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "com.example.kafkademo.model"
datasource:
url: jdbc:mysql://localhost:3306/kafka_demo?useSSL=false&serverTimezone=UTC
username: root
password: password
driver-class-name: com.mysql.cj.jdbc.Driver
jpa:
hibernate:
ddl-auto: update
show-sql: true
3.3 创建消息模型
// Message.java
package com.example.kafkademo.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.*;
import java.time.LocalDateTime;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "messages")
public class Message {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String sender;
private String content;
private LocalDateTime timestamp;
@PrePersist
public void prePersist() {
if (timestamp == null) {
timestamp = LocalDateTime.now();
}
}
}
3.4 创建消息仓库
// MessageRepository.java
package com.example.kafkademo.repository;
import com.example.kafkademo.model.Message;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import java.util.List;
public interface MessageRepository extends JpaRepository<Message, Long> {
List<Message> findTop50ByOrderByTimestampDesc();
@Query("SELECT m.sender, COUNT(m) FROM Message m GROUP BY m.sender")
List<Object[]> countMessagesBySender();
}
3.5 创建Kafka配置类
// KafkaConfig.java
package com.example.kafkademo.config;
import com.example.kafkademo.model.Message;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic chatTopic() {
return new NewTopic("chat-messages", 3, (short) 1);
}
@Bean
public NewTopic notificationTopic() {
return new NewTopic("notifications", 3, (short) 1);
}
@Bean
public ProducerFactory<String, Message> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
configProps.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Message> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, Message> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, "chat-group");
props.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
props.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.kafkademo.model");
return new DefaultKafkaConsumerFactory<>(props,
new org.apache.kafka.common.serialization.StringDeserializer(),
new JsonDeserializer<>(Message.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Message> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
3.6 创建消息服务
// MessageService.java
package com.example.kafkademo.service;
import com.example.kafkademo.model.Message;
import com.example.kafkademo.repository.MessageRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
@RequiredArgsConstructor
public class MessageService {
private final KafkaTemplate<String, Message> kafkaTemplate;
private final MessageRepository messageRepository;
public void sendMessage(Message message) {
// 发送消息到Kafka
kafkaTemplate.send("chat-messages", message);
// 保存消息到数据库
messageRepository.save(message);
}
public void sendNotification(String content) {
Message notification = new Message();
notification.setSender("系统");
notification.setContent(content);
kafkaTemplate.send("notifications", notification);
}
public List<Message> getRecentMessages() {
return messageRepository.findTop50ByOrderByTimestampDesc();
}
public List<Object[]> getMessageStats() {
return messageRepository.countMessagesBySender();
}
}
3.7 创建Kafka消费者
// KafkaConsumer.java
package com.example.kafkademo.kafka;
import com.example.kafkademo.model.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class KafkaConsumer {
private final SimpMessagingTemplate messagingTemplate;
public KafkaConsumer(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
@KafkaListener(topics = "chat-messages", groupId = "chat-group")
public void listenChatMessages(Message message) {
log.info("收到聊天消息: {}", message);
// 通过WebSocket发送消息到前端
messagingTemplate.convertAndSend("/topic/messages", message);
}
@KafkaListener(topics = "notifications", groupId = "chat-group")
public void listenNotifications(Message message) {
log.info("收到通知: {}", message);
// 通过WebSocket发送通知到前端
messagingTemplate.convertAndSend("/topic/notifications", message);
}
}
3.8 创建WebSocket配置
// WebSocketConfig.java
package com.example.kafkademo.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOrigins("http://localhost:8080")
.withSockJS();
}
}
3.9 创建控制器
// MessageController.java
package com.example.kafkademo.controller;
import com.example.kafkademo.model.Message;
import com.example.kafkademo.service.MessageService;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@RestController
@RequestMapping("/api/messages")
@RequiredArgsConstructor
@CrossOrigin(origins = "http://localhost:8080")
public class MessageController {
private final MessageService messageService;
@PostMapping
public ResponseEntity<Message> sendMessage(@RequestBody Message message) {
messageService.sendMessage(message);
return ResponseEntity.ok(message);
}
@GetMapping
public ResponseEntity<List<Message>> getRecentMessages() {
return ResponseEntity.ok(messageService.getRecentMessages());
}
@GetMapping("/stats")
public ResponseEntity<List<Object[]>> getMessageStats() {
return ResponseEntity.ok(messageService.getMessageStats());
}
}
四、前端实现
4.1 创建Vue项目
# 创建Vue项目
npm create vue@latest kafka-vue-demo
# 进入项目目录
cd kafka-vue-demo
# 安装依赖
npm install
# 安装额外依赖
npm install element-plus axios sockjs-client @stomp/stompjs echarts
4.2 配置WebSocket客户端
// src/utils/websocket.js
import SockJS from 'sockjs-client';
import { Stomp } from '@stomp/stompjs';
class WebSocketClient {
constructor() {
this.stompClient = null;
this.connected = false;
}
connect() {
const socket = new SockJS('http://localhost:8081/ws');
this.stompClient = Stomp.over(socket);
this.stompClient.connect({}, this.onConnected, this.onError);
}
onConnected = () => {
this.connected = true;
console.log('WebSocket连接成功');
// 订阅消息主题
this.stompClient.subscribe('/topic/messages', this.onMessageReceived);
this.stompClient.subscribe('/topic/notifications', this.onNotificationReceived);
}
onError = (error) => {
console.error('WebSocket连接错误:', error);
this.connected = false;
}
onMessageReceived = (payload) => {
const message = JSON.parse(payload.body);
console.log('收到消息:', message);
// 触发消息接收事件
window.dispatchEvent(new CustomEvent('messageReceived', { detail: message }));
}
onNotificationReceived = (payload) => {
const notification = JSON.parse(payload.body);
console.log('收到通知:', notification);
// 触发通知接收事件
window.dispatchEvent(new CustomEvent('notificationReceived', { detail: notification }));
}
disconnect() {
if (this.stompClient) {
this.stompClient.disconnect();
this.connected = false;
}
}
}
export default new WebSocketClient();
4.3 创建API服务
// src/api/messageApi.js
import axios from 'axios';
const API_URL = 'http://localhost:8081/api';
export default {
sendMessage(message) {
return axios.post(`${API_URL}/messages`, message);
},
getRecentMessages() {
return axios.get(`${API_URL}/messages`);
},
getMessageStats() {
return axios.get(`${API_URL}/messages/stats`);
}
};
4.4 创建消息组件
<!-- src/components/MessageChat.vue -->
<template>
<div class="message-chat">
<el-card class="chat-container">
<template #header>
<div class="card-header">
<h2>实时聊天</h2>
</div>
</template>
<div class="message-list" ref="messageList">
<div v-for="message in messages" :key="message.id" class="message-item" :class="{ 'message-self': message.sender === username }">
<div class="message-sender">{{ message.sender }}</div>
<div class="message-content">{{ message.content }}</div>
<div class="message-time">{{ formatTime(message.timestamp) }}</div>
</div>
</div>
<div class="message-input">
<el-input
v-model="newMessage"
placeholder="输入消息..."
@keyup.enter="sendMessage"
>
<template #append>
<el-button type="primary" @click="sendMessage">发送</el-button>
</template>
</el-input>
</div>
</el-card>
</div>
</template>
<script>
import { ref, onMounted, onUnmounted, nextTick } from 'vue';
import { ElMessage } from 'element-plus';
import messageApi from '../api/messageApi';
import websocket from '../utils/websocket';
export default {
name: 'MessageChat',
props: {
username: {
type: String,
required: true
}
},
setup(props) {
const messages = ref([]);
const newMessage = ref('');
const messageList = ref(null);
// 加载历史消息
const loadMessages = async () => {
try {
const response = await messageApi.getRecentMessages();
messages.value = response.data;
scrollToBottom();
} catch (error) {
console.error('加载消息失败:', error);
ElMessage.error('加载消息失败');
}
};
// 发送消息
const sendMessage = async () => {
if (!newMessage.value.trim()) return;
const message = {
sender: props.username,
content: newMessage.value,
timestamp: new Date()
};
try {
await messageApi.sendMessage(message);
newMessage.value = '';
} catch (error) {
console.error('发送消息失败:', error);
ElMessage.error('发送消息失败');
}
};
// 处理接收到的消息
const handleMessageReceived = (event) => {
const message = event.detail;
messages.value.push(message);
scrollToBottom();
};
// 处理接收到的通知
const handleNotificationReceived = (event) => {
const notification = event.detail;
ElMessage.info(notification.content);
};
// 滚动到底部
const scrollToBottom = async () => {
await nextTick();
if (messageList.value) {
messageList.value.scrollTop = messageList.value.scrollHeight;
}
};
// 格式化时间
const formatTime = (timestamp) => {
if (!timestamp) return '';
const date = new Date(timestamp);
return date.toLocaleTimeString();
};
onMounted(() => {
loadMessages();
websocket.connect();
window.addEventListener('messageReceived', handleMessageReceived);
window.addEventListener('notificationReceived', handleNotificationReceived);
});
onUnmounted(() => {
websocket.disconnect();
window.removeEventListener('messageReceived', handleMessageReceived);
window.removeEventListener('notificationReceived', handleNotificationReceived);
});
return {
messages,
newMessage,
messageList,
sendMessage,
formatTime
};
}
};
</script>
<style scoped>
.message-chat {
height: 100%;
display: flex;
flex-direction: column;
}
.chat-container {
height: 100%;
display: flex;
flex-direction: column;
}
.card-header {
display: flex;
justify-content: space-between;
align-items: center;
}
.message-list {
flex: 1;
overflow-y: auto;
padding: 10px;
margin-bottom: 10px;
height: 400px;
}
.message-item {
margin-bottom: 10px;
padding: 10px;
border-radius: 5px;
background-color: #f5f7fa;
max-width: 80%;
}
.message-self {
margin-left: auto;
background-color: #ecf5ff;
}
.message-sender {
font-weight: bold;
margin-bottom: 5px;
}
.message-time {
font-size: 12px;
color: #909399;
text-align: right;
margin-top: 5px;
}
.message-input {
margin-top: 10px;
}
</style>
4.5 创建统计图表组件
<!-- src/components/MessageStats.vue -->
<template>
<div class="message-stats">
<el-card>
<template #header>
<div class="card-header">
<h2>消息统计</h2>
</div>
</template>
<div class="chart-container" ref="chartContainer"></div>
</el-card>
</div>
</template>
<script>
import { ref, onMounted, onUnmounted } from 'vue';
import { ElMessage } from 'element-plus';
import * as echarts from 'echarts';
import messageApi from '../api/messageApi';
export default {
name: 'MessageStats',
setup() {
const chartContainer = ref(null);
let chart = null;
// 加载统计数据
const loadStats = async () => {
try {
const response = await messageApi.getMessageStats();
const stats = response.data;
const senders = stats.map(item => item[0]);
const counts = stats.map(item => item[1]);
updateChart(senders, counts);
} catch (error) {
console.error('加载统计数据失败:', error);
ElMessage.error('加载统计数据失败');
}
};
// 更新图表
const updateChart = (senders, counts) => {
if (!chart) return;
const option = {
title: {
text: '用户消息数量统计'
},
tooltip: {
trigger: 'axis',
axisPointer: {
type: 'shadow'
}
},
xAxis: {
type: 'category',
data: senders
},
yAxis: {
type: 'value'
},
series: [
{
name: '消息数量',
type: 'bar',
data: counts
}
]
};
chart.setOption(option);
};
// 初始化图表
const initChart = () => {
if (chartContainer.value) {
chart = echarts.init(chartContainer.value);
loadStats();
}
};
// 处理窗口大小变化
const handleResize = () => {
if (chart) {
chart.resize();
}
};
onMounted(() => {
initChart();
window.addEventListener('resize', handleResize);
});
onUnmounted(() => {
if (chart) {
chart.dispose();
}
window.removeEventListener('resize', handleResize);
});
return {
chartContainer
};
}
};
</script>
<style scoped>
.message-stats {
height: 100%;
}
.chart-container {
height: 400px;
}
</style>
4.6 创建主页面
<!-- src/App.vue -->
<template>
<div class="app-container">
<el-container>
<el-header>
<h1>Kafka实时聊天演示</h1>
<div class="user-info" v-if="!username">
<el-input v-model="inputUsername" placeholder="请输入用户名" />
<el-button type="primary" @click="login">登录</el-button>
</div>
<div class="user-info" v-else>
<span>欢迎, {{ username }}</span>
<el-button type="danger" @click="logout">退出</el-button>
</div>
</el-header>
<el-main v-if="username">
<el-row :gutter="20">
<el-col :span="16">
<MessageChat :username="username" />
</el-col>
<el-col :span="8">
<MessageStats />
</el-col>
</el-row>
</el-main>
<el-main v-else class="login-container">
<el-card class="login-card">
<h2>欢迎使用Kafka实时聊天</h2>
<p>请输入用户名开始聊天</p>
</el-card>
</el-main>
</el-container>
</div>
</template>
<script>
import { ref } from 'vue';
import MessageChat from './components/MessageChat.vue';
import MessageStats from './components/MessageStats.vue';
export default {
name: 'App',
components: {
MessageChat,
MessageStats
},
setup() {
const username = ref('');
const inputUsername = ref('');
const login = () => {
if (inputUsername.value.trim()) {
username.value = inputUsername.value;
inputUsername.value = '';
}
};
const logout = () => {
username.value = '';
};
return {
username,
inputUsername,
login,
logout
};
}
};
</script>
<style>
.app-container {
height: 100vh;
}
.el-header {
background-color: #409eff;
color: white;
display: flex;
justify-content: space-between;
align-items: center;
padding: 0 20px;
}
.user-info {
display: flex;
align-items: center;
gap: 10px;
}
.login-container {
display: flex;
justify-content: center;
align-items: center;
}
.login-card {
width: 400px;
text-align: center;
padding: 20px;
}
.el-main {
padding: 20px;
}
</style>
4.7 配置路由
// src/router/index.js
import { createRouter, createWebHistory } from 'vue-router';
import App from '../App.vue';
const routes = [
{
path: '/',
name: 'Home',
component: App
}
];
const router = createRouter({
history: createWebHistory(import.meta.env.BASE_URL),
routes
});
export default router;
4.8 配置主入口
// src/main.js
import { createApp } from 'vue';
import ElementPlus from 'element-plus';
import 'element-plus/dist/index.css';
import App from './App.vue';
import router from './router';
const app = createApp(App);
app.use(ElementPlus);
app.use(router);
app.mount('#app');
五、运行项目
5.1 启动后端
# 进入后端项目目录
cd kafka-spring-demo
# 启动Spring Boot应用
./mvnw spring-boot:run
5.2 启动前端
# 进入前端项目目录
cd kafka-vue-demo
# 启动开发服务器
npm run dev
5.3 访问应用
打开浏览器,访问 http://localhost:8080 即可使用应用。
六、功能演示
6.1 发送和接收消息
- 打开多个浏览器窗口,分别以不同用户名登录
- 在任意窗口发送消息,所有窗口都能实时接收到消息
- 消息会显示发送者、内容和时间
6.2 消息统计
- 发送多条消息后,统计图表会显示每个用户发送的消息数量
- 图表会自动更新,反映最新的统计数据
6.3 系统通知
- 后端可以发送系统通知,所有用户都能收到
- 通知会以Element Plus的消息提示形式显示
七、扩展功能
7.1 添加消息类型
// 在Message类中添加消息类型字段
private String type; // 消息类型:text, image, file等
7.2 添加消息已读状态
// 在Message类中添加已读状态字段
private boolean read;
7.3 添加私聊功能
// 在Message类中添加接收者字段
private String receiver; // 私聊接收者,为空表示群聊
7.4 添加消息搜索功能
// 在MessageRepository中添加搜索方法
List<Message> findByContentContainingOrSenderContaining(String content, String sender);
八、总结
本项目演示了如何在Vue前端和Spring Boot后端中集成Kafka,实现实时消息的发送和接收。通过WebSocket和Kafka的结合,我们实现了一个功能完善的实时聊天应用。
8.1 技术要点
- Kafka消息的发送和接收
- WebSocket实时通信
- Vue组件化开发
- Spring Boot后端服务
- 数据可视化
8.2 项目亮点
- 实时消息推送
- 消息历史记录
- 消息统计图表
- 系统通知功能
- 响应式设计
8.3 后续优化方向
- 添加消息加密
- 实现消息撤回功能
- 添加文件上传功能
- 优化移动端适配
- 添加用户认证和授权