基于微服务架构设计并实现了一个实时步数统计系统,采用生产者-消费者模式,利用Kafka实现消息队列,Spark Streaming处理实时数据流,Redis提供高性能数据存储,实现了一个高并发、低延迟的数据处理系统,支持多用户运动数据的实时采集、传输、处理和统计分析。
1.介绍
1.数据采集与生产者(StepDataProducer)
- 作用:负责生成用户步数数据并发送到Kafka主题。
- 原理:生产者会随机生成用户步数数据(包括用户ID、位置、步数和时间戳),然后将这些数据序列化为JSON格式,通过Kafka Producer API发送到指定的Kafka主题。
2. 消息队列(Kafka)
- 作用:作为消息中间件,负责解耦数据生产者和消费者。
- 原理:Kafka是一个分布式流处理平台,支持高吞吐量的消息发布和订阅。生产者将数据发送到Kafka主题,消费者从主题中读取数据。Kafka保证消息的顺序性和持久性,支持水平扩展。
3. 实时数据处理(Spark Streaming)
- 作用:实时处理从Kafka中消费的数据流。
- 原理:Spark Streaming是一个实时数据处理框架,能够处理实时数据流。它从Kafka中消费数据,进行数据解析和处理(如步数统计),并将处理结果输出到下游系统。Spark Streaming支持微批处理,能够在低延迟的情况下处理大规模数据。
4. 数据存储(Redis)
- 作用:存储处理后的用户步数数据,支持快速读写。
- 原理:Redis是一个高性能的内存数据库,支持多种数据结构。处理后的用户步数数据会被存储在Redis中,Redis的高性能读写能力确保了系统的实时性和响应速度。
5. 消费者(StepCounterApp)
- 作用:从Kafka中消费数据,进行处理并更新到Redis。
- 原理:消费者从Kafka主题中读取数据,使用Spark Streaming进行实时处理,然后将处理结果(如用户的累计步数)存储到Redis中。消费者负责整个数据处理链路的执行。
2.文件结构
microservices/
├── pom.xml
├── dependency-reduced-pom.xml
├── 教程.txt
├── query
├── target/
└── src/
└── main/
├── resources/
│ └── application.properties (包含Kafka、Redis和Spark配置)
└── java/
└── com/
└── example/
└── stepcounter/
├── StepCounterApp.java
├── StepDataProducer.java
├── service/
├── config/
└── model/
3.具体代码
Appconfig.java
package com.example.stepcounter.config;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public class AppConfig {
private static final Properties properties = new Properties();
static {
try (InputStream input = AppConfig.class.getClassLoader().getResourceAsStream("application.properties")) {
if (input == null) {
throw new RuntimeException("Unable to find application.properties");
}
properties.load(input);
} catch (IOException e) {
throw new RuntimeException("Error loading application.properties", e);
}
}
public static String getProperty(String key) {
String value = properties.getProperty(key);
return value != null ? value.trim() : null;
}
public static Properties getKafkaProperties() {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", getProperty("kafka.bootstrap.servers"));
kafkaProps.put("group.id", getProperty("kafka.group.id"));
kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return kafkaProps;
}
}
Userstep.java
package com.example.stepcounter.model;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserStep {
private String userId;
private String location;
private Integer steps;
private Long timestamp;
}
RedisService.java
package com.example.stepcounter.service;
import com.example.stepcounter.config.AppConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class RedisService {
private static final JedisPool jedisPool;
static {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(10);
poolConfig.setMaxIdle(5);
poolConfig.setMinIdle(1);
String host = AppConfig.getProperty("redis.host");
int port = Integer.parseInt(AppConfig.getProperty("redis.port"));
String password = AppConfig.getProperty("redis.password");
int database = Integer.parseInt(AppConfig.getProperty("redis.database"));
if (password != null && !password.trim().isEmpty()) {
jedisPool = new JedisPool(poolConfig, host, port, 2000, password, database);
} else {
jedisPool = new JedisPool(poolConfig, host, port, 2000);
}
}
public static void incrementUserSteps(String userId, int steps) {
try (Jedis jedis = jedisPool.getResource()) {
String key = "user:" + userId + ":steps";
jedis.incrBy(key, steps);
}
}
public static Long getUserTotalSteps(String userId) {
try (Jedis jedis = jedisPool.getResource()) {
String key = "user:" + userId + ":steps";
String steps = jedis.get(key);
return steps != null ? Long.parseLong(steps) : 0L;
}
}
}
StepCounterApp.java
package com.example.stepcounter;
import com.example.stepcounter.config.AppConfig;
import com.example.stepcounter.model.UserStep;
import com.example.stepcounter.service.RedisService;
import com.google.gson.Gson;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class StepCounterApp {
private static final Gson gson = new Gson();
public static void main(String[] args) throws InterruptedException {
// 创建Spark配置
SparkConf sparkConf = new SparkConf()
.setAppName(AppConfig.getProperty("spark.app.name"))
.setMaster(AppConfig.getProperty("spark.master"));
// 创建StreamingContext
JavaStreamingContext streamingContext = new JavaStreamingContext(
sparkConf,
Durations.seconds(Integer.parseInt(AppConfig.getProperty("spark.streaming.batch.duration")))
);
// 配置Kafka消费者
Properties kafkaProps = AppConfig.getKafkaProperties();
String topic = AppConfig.getProperty("kafka.topic");
// 将Properties转换为Map
Map<String, Object> kafkaParams = new HashMap<>();
for (String key : kafkaProps.stringPropertyNames()) {
kafkaParams.put(key, kafkaProps.get(key));
}
// 创建Kafka输入流
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(Collections.singletonList(topic), kafkaParams)
);
// 处理数据流
stream.foreachRDD(rdd -> {
rdd.foreach(record -> {
String json = record.value();
UserStep userStep = gson.fromJson(json, UserStep.class);
// 更新Redis中的用户步数
RedisService.incrementUserSteps(userStep.getUserId(), userStep.getSteps());
// 获取并打印用户总步数
Long totalSteps = RedisService.getUserTotalSteps(userStep.getUserId());
System.out.printf("User %s at %s walked %d steps, total steps: %d%n",
userStep.getUserId(),
userStep.getLocation(),
userStep.getSteps(),
totalSteps);
});
});
// 启动Streaming处理
streamingContext.start();
streamingContext.awaitTermination();
}
}
StepDataProducer.java
package com.example.stepcounter;
import com.example.stepcounter.config.AppConfig;
import com.example.stepcounter.model.UserStep;
import com.google.gson.Gson;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Random;
public class StepDataProducer {
private static final Gson gson = new Gson();
private static final Random random = new Random();
private static final String[] LOCATIONS = {"Home", "Park", "Office", "Gym", "Mall"};
private static final String[] USER_IDS = {"user1", "user2", "user3", "user4", "user5"};
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.getProperty("kafka.bootstrap.servers"));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
String topic = AppConfig.getProperty("kafka.topic");
try {
while (true) {
UserStep userStep = generateRandomUserStep();
String json = gson.toJson(userStep);
producer.send(new ProducerRecord<>(topic, userStep.getUserId(), json));
System.out.println("Sent: " + json);
Thread.sleep(1000); // 每秒发送一条数据
}
} finally {
producer.close();
}
}
private static UserStep generateRandomUserStep() {
String userId = USER_IDS[random.nextInt(USER_IDS.length)];
String location = LOCATIONS[random.nextInt(LOCATIONS.length)];
int steps = random.nextInt(100) + 1; // 1-100步
long timestamp = System.currentTimeMillis();
return new UserStep(userId, location, steps, timestamp);
}
}
application.properties
# Kafka Configuration
kafka.bootstrap.servers=localhost:9092
kafka.topic=user-steps
kafka.group.id=step-counter-group
# Redis Configuration
redis.host=localhost
redis.port=6379
redis.password=
redis.database=0
# Spark Configuration
spark.app.name=StepCounter
spark.master=local[2]
spark.streaming.batch.duration=5
dependency-reduced-pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>step-counter</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer>
<mainClass>com.example.stepcounter.StepCounterApp</mainClass>
</transformer>
<transformer />
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
<scope>provided</scope>
</dependency>
</dependencies>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<spark.version>3.3.0</spark.version>
<maven.compiler.target>1.8</maven.compiler.target>
<kafka.version>3.5.0</kafka.version>
</properties>
</project>
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>step-counter</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spark.version>3.3.0</spark.version>
<kafka.version>3.5.0</kafka.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Spark Core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark Streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark Streaming Kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Kafka Clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- Redis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.3.1</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
<scope>provided</scope>
</dependency>
<!-- Gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>
<!-- SLF4J -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.36</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.example.stepcounter.StepCounterApp</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
4.运行
在window开了6个cmd
REM 启动zookeeper!!
C:\kafka\kafka_2.12-3.6.1\bin\windows>zookeeper-server-start.bat ..\..\config\zookeeper.properties
REM 启动Redis!!!
C:\Users\86182>redis-server.exe
REM 启动kafka!!!
C:\kafka\kafka_2.12-3.6.1\bin\windows>kafka-server-start.bat ..\..\config\server.properties
REM 创建kafka主题!!!
C:\kafka\kafka_2.12-3.6.1\bin\windows>kafka-topics.bat --create --topic user-steps --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Created topic user-steps.
REM 消费者!!!
C:\microservices>java -cp target/step-counter-1.0-SNAPSHOT.jar com.example.stepcounter.StepCounterApp
REM 生产者!!!
C:\microservices>java -cp target/step-counter-1.0-SNAPSHOT.jar com.example.stepcounter.StepDataProducer