1. 项目结构
2. Maven依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/io.github.resilience4j/resilience4j-spring-boot3 -->
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot3</artifactId>
<version>2.2.0</version>
</dependency>
<!-- Redis cache dependency -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
注:Resilience4j 需要 AOP(面向切面编程)相关的依赖,因为它利用 AOP 来动态地拦截和处理方法调用。
3. application.yml
spring:
application:
name: spring_resilience4j # Spring Boot 应用程序名称
cache:
type: redis # 缓存类型,使用 Redis
data:
redis:
host: 192.168.186.77 # Redis 服务器主机地址
port: 6379 # Redis 服务器端口号
resilience4j:
timelimiter:
instances:
timeoutService:
timeoutDuration: 2s # 超时时间为2秒
circuitbreaker:
instances:
circuitbreakerService:
slidingWindowSize: 10 # 滑动窗口大小为10
failureRateThreshold: 50 # 失败率阈值为50%
waitDurationInOpenState: 10s # 打开状态等待时间为10秒
ratelimiter:
instances:
ratelimiterService:
limitForPeriod: 1 # 每个周期允许的最大请求数为1
limitRefreshPeriod: 10s # 速率限制刷新周期为10秒
timeoutDuration: 500ms # 等待许可的超时时间为500毫秒
retry:
instances:
retryService:
maxAttempts: 5 # 最大重试次数为5次
waitDuration: 1000ms # 重试间隔为1000毫秒(1秒)
enableExponentialBackoff: true # 启用指数回退
exponentialBackoffMultiplier: 1.5 # 指数回退倍数为1.5
retryExceptions:
- java.lang.RuntimeException # 需要重试的异常类型
bulkhead:
instances:
bulkheadService:
maxConcurrentCalls: 5 # 批隔离允许的最大并发调用数为5
maxWaitDuration: 1s # 等待许可的最大时间为1秒
4. SpringResilience4jApplication.java
package org.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
@SpringBootApplication
@EnableCaching //开启缓存
public class SpringResilience4jApplication {
public static void main(String[] args) {
SpringApplication.run(SpringResilience4jApplication.class, args);
}
}
5. Common.java
package org.example.controller;
import org.example.service.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.CompletableFuture;
@RestController
public class Common {
@Autowired
TimeoutService timeoutService;
@Autowired
RateLimiterService rateLimiterService;
@Autowired
CircuitBreakerService circuitBreakerService;
@Autowired
RetryService retryService;
@Autowired
BulkheadService bulkheadService;
//模拟超时
@GetMapping("/timeout")
public CompletableFuture<String> timeout() {
return timeoutService.timeoutExample();
}
//模拟限速
@GetMapping("/rateLimiter")
public CompletableFuture<String> rateLimiter() {
return rateLimiterService.rateLimiterExample();
}
//模拟回退
@GetMapping("/circuitBreaker")
public ResponseEntity<String> circuitBreaker() {
return ResponseEntity.ok(circuitBreakerService.CircuitBreaker());
}
//模拟重试
@GetMapping("/retry/{id}")
public String getItemById(@PathVariable String id) {
return retryService.getItemById(id);
}
// 模拟批隔离
@GetMapping("/process/{id}")
public String processRequest(@PathVariable String id) {
return bulkheadService.processRequest(id);
}
}
6. BulkheadService.java(批隔离)
package org.example.service;
import io.github.resilience4j.bulkhead.BulkheadFullException;
import io.github.resilience4j.bulkhead.annotation.Bulkhead;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service
public class BulkheadService {
private static final Logger logger = LoggerFactory.getLogger(BulkheadService.class);
@Bulkhead(name = "bulkheadService", fallbackMethod = "fallback")
public String processRequest(String id) {
logger.info("Processing request: {}", id);
try {
// 模拟处理延迟
Thread.sleep(10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Processed: " + id;
}
// 回退方法
public String fallback(String id, BulkheadFullException ex) {
logger.error("Bulkhead is full. Falling back for request: {}",id,ex);
return "Bulkhead is full. Please try again later.";
}
}
application.yml对应的配置信息:
bulkhead:
instances:
bulkheadService:
maxConcurrentCalls: 5
maxWaitDuration: 1s
解释:
-
maxConcurrentCalls:
- 最大并发调用数。在此示例中,设置为 5,表示同一时间最多允许 5 个并发调用。如果超过这个数量,额外的调用将被阻塞,直到有空闲的调用资源。
-
maxWaitDuration:
- 在被阻塞的调用被拒绝之前,最大等待时间。在此示例中,设置为 1 秒,表示如果一个调用在 1 秒内没有获取到许可,将会被拒绝并抛出
BulkheadFullException
。
- 在被阻塞的调用被拒绝之前,最大等待时间。在此示例中,设置为 1 秒,表示如果一个调用在 1 秒内没有获取到许可,将会被拒绝并抛出
结果:
先启动(目录12BulkheadTest测试类)模拟并发。
出现该结果证明并发调用数已经满。
7. CircuitBreakerService.java(熔断)
package org.example.service;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import org.springframework.stereotype.Service;
@Service
public class CircuitBreakerService {
@CircuitBreaker(name = "circuitbreakerService", fallbackMethod = "fallback")
public String CircuitBreaker() {
if (Math.random() > 0.5) {
throw new RuntimeException("It is Failure!");
}
return "It is Successfully!";
}
public String fallback(Throwable ex) {
return "Fallback : " + ex.getMessage();
}
}
application.yml对应的配置信息:
circuitbreaker:
instances:
circuitbreakerService:
slidingWindowSize: 10
failureRateThreshold: 50
waitDurationInOpenState: 10s
解释:
-
slidingWindowSize:
- 滑动窗口大小。在此示例中,设置为10,表示
CircuitBreaker
将根据最近的10个调用的结果来计算失败率。
- 滑动窗口大小。在此示例中,设置为10,表示
-
failureRateThreshold:
- 失败率阈值。在此示例中,设置为 50,表示如果滑动窗口中的调用失败率超过 50%,
CircuitBreaker
将打开(短路)。
- 失败率阈值。在此示例中,设置为 50,表示如果滑动窗口中的调用失败率超过 50%,
-
waitDurationInOpenState:
CircuitBreaker
打开状态的等待时间。在此示例中,设置为10秒,表示CircuitBreaker
在打开状态下会保持10秒,然后进入半开状态以重新测试服务的可用性。
-
闭合状态(Closed):
- 默认状态,所有请求正常通过并进行监控。
- 如果失败率超过阈值,进入打开状态。
-
打开状态(Open):
- 在打开状态下,所有请求将立即失败,不会调用实际的方法。
- 在等待时间结束后,进入半开状态。
-
半开状态(Half-Open):
- 在半开状态下,会允许部分请求通过以测试服务是否恢复。
- 如果请求成功率恢复正常,恢复到闭合状态;否则重新进入打开状态。
结果:
监听消息:
CircuitBreaker Event: 2024-07-29T23:12:41.621862900+08:00[Asia/Shanghai]: CircuitBreaker 'circuitbreakerService' recorded an error: 'java.lang.RuntimeException: It is Failure!'. Elapsed time: 0 ms
8. RateLimiterService.java(限速)
package org.example.service;
import io.github.resilience4j.ratelimiter.annotation.RateLimiter;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
@Service
public class RateLimiterService {
@RateLimiter(name = "ratelimiterService", fallbackMethod = "fallbackRateLimiter")
public CompletableFuture<String> rateLimiterExample() {
return CompletableFuture.supplyAsync(() -> "It is Success!");
}
public CompletableFuture<String> fallbackRateLimiter(Exception e) {
return CompletableFuture.completedFuture("Too many requests");
}
}
application.yml对应的配置信息 :
ratelimiter:
instances:
ratelimiterService:
limitForPeriod: 1
limitRefreshPeriod: 10s
timeoutDuration: 500ms
解释:
-
limitForPeriod:
- 指定每个刷新周期内允许的最大请求数。在此示例中,设置为1,表示每个刷新周期只允许1个请求通过。
-
limitRefreshPeriod:
- 指定速率限制器的刷新周期。在此示例中,设置为10秒,表示每10秒刷新一次,重置允许的请求数。
-
timeoutDuration:
- 指定在速率限制器中等待许可的最大时间。如果请求在
timeoutDuration
内未获得许可,则会被拒绝。在此示例中,设置为 500 毫秒。
- 指定在速率限制器中等待许可的最大时间。如果请求在
结果(10s内发送两次请求):
监听消息:
RateLimiter Event: RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='ratelimiterService', creationTime=2024-07-29T23:03:37.189104500+08:00[Asia/Shanghai]}
RateLimiter Event: RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='ratelimiterService', creationTime=2024-07-29T23:03:38.995416700+08:00[Asia/Shanghai]}
RateLimiter Event: RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName='ratelimiterService', creationTime=2024-07-29T23:03:40.927729300+08:00[Asia/Shanghai]}
9. Resilience4jEventListener.java(事件监听)
package org.example.service;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RateLimiterRegistry;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryRegistry;
import io.github.resilience4j.timelimiter.TimeLimiter;
import io.github.resilience4j.timelimiter.TimeLimiterRegistry;
import jakarta.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Resilience4jEventListener {
private static final Logger logger = LoggerFactory.getLogger(Resilience4jEventListener.class);
private final RetryRegistry retryRegistry;
private final TimeLimiterRegistry timeLimiterRegistry;
private final CircuitBreakerRegistry circuitBreakerRegistry;
private final RateLimiterRegistry rateLimiterRegistry;
@Autowired
public Resilience4jEventListener(CircuitBreakerRegistry circuitBreakerRegistry,
RateLimiterRegistry rateLimiterRegistry,
RetryRegistry retryRegistry,
TimeLimiterRegistry timeLimiterRegistry) {
this.circuitBreakerRegistry = circuitBreakerRegistry;
this.rateLimiterRegistry = rateLimiterRegistry;
this.retryRegistry = retryRegistry;
this.timeLimiterRegistry = timeLimiterRegistry;
}
@PostConstruct
public void postConstruct() {
// 注册 Retry 事件监听器
Retry retry = retryRegistry.retry("retryService");
retry.getEventPublisher()
.onEvent(event -> logger.info("Retry event: {}", event));
// 注册 CircuitBreaker 事件监听器
CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker("circuitbreakerService");
circuitBreaker.getEventPublisher()
.onEvent(event -> logger.info("CircuitBreaker Event: {}", event));
// 注册 RateLimiter 事件监听器
RateLimiter rateLimiter = rateLimiterRegistry.rateLimiter("ratelimiterService");
rateLimiter.getEventPublisher()
.onEvent(event -> logger.info("RateLimiter Event: {}", event));
// 注册 TimeLimiter 事件监听器
TimeLimiter timeLimiter = timeLimiterRegistry.timeLimiter("timeoutService");
timeLimiter.getEventPublisher()
.onEvent(event -> logger.info("TimeLimiter Event: {}", event));
}
}
调试运行的信息如下:
2024-07-29T23:02:39.073+08:00 INFO 27996 --- [pool-2-thread-1] o.e.service.Resilience4jEventListener : TimeLimiter Event: 2024-07-29T23:02:39.073307500+08:00[Asia/Shanghai]: TimeLimiter 'timeoutService' recorded a timeout exception.
2024-07-29T23:03:37.189+08:00 INFO 27996 --- [nio-8080-exec-4] o.e.service.Resilience4jEventListener : RateLimiter Event: RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='ratelimiterService', creationTime=2024-07-29T23:03:37.189104500+08:00[Asia/Shanghai]}
2024-07-29T23:03:38.995+08:00 INFO 27996 --- [nio-8080-exec-5] o.e.service.Resilience4jEventListener : RateLimiter Event: RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='ratelimiterService', creationTime=2024-07-29T23:03:38.995416700+08:00[Asia/Shanghai]}
2024-07-29T23:03:40.927+08:00 INFO 27996 --- [nio-8080-exec-6] o.e.service.Resilience4jEventListener : RateLimiter Event: RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName='ratelimiterService', creationTime=2024-07-29T23:03:40.927729300+08:00[Asia/Shanghai]}
2024-07-29T23:12:41.623+08:00 INFO 27996 --- [nio-8080-exec-9] o.e.service.Resilience4jEventListener : CircuitBreaker Event: 2024-07-29T23:12:41.621862900+08:00[Asia/Shanghai]: CircuitBreaker 'circuitbreakerService' recorded an error: 'java.lang.RuntimeException: It is Failure!'. Elapsed time: 0 ms
2024-07-29T23:23:15.612+08:00 INFO 27996 --- [nio-8080-exec-3] org.example.service.RetryService : Fetching item with id 2
2024-07-29T23:27:22.607+08:00 INFO 27996 --- [nio-8080-exec-6] org.example.service.RetryService : Fetching item with id 3
2024-07-29T23:34:40.837+08:00 INFO 27996 --- [nio-8080-exec-2] org.example.service.RetryService : Fetching item with id 4
2024-07-29T23:34:42.973+08:00 INFO 27996 --- [nio-8080-exec-1] org.example.service.RetryService : Fetching item with id 5
2024-07-29T23:34:44.777+08:00 INFO 27996 --- [nio-8080-exec-3] org.example.service.RetryService : Fetching item with id 6
2024-07-29T23:34:44.793+08:00 INFO 27996 --- [nio-8080-exec-3] o.e.service.Resilience4jEventListener : Retry event: 2024-07-29T23:34:44.793527800+08:00[Asia/Shanghai]: Retry 'retryService', waiting PT1S until attempt '1'. Last attempt failed with exception 'java.lang.RuntimeException: Simulated database error'.
2024-07-29T23:34:45.817+08:00 INFO 27996 --- [nio-8080-exec-3] org.example.service.RetryService : Fetching item with id 6
2024-07-29T23:34:45.822+08:00 INFO 27996 --- [nio-8080-exec-3] o.e.service.Resilience4jEventListener : Retry event: 2024-07-29T23:34:45.822567400+08:00[Asia/Shanghai]: Retry 'retryService' recorded a successful retry attempt. Number of retry attempts: '1', Last exception was: 'java.lang.RuntimeException: Simulated database error'.
注:该类进行事件监听,便于调试。
10. RetryService.java(缓存+重试)
package org.example.service;
import io.github.resilience4j.retry.annotation.Retry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
@Service
public class RetryService {
private static final Logger logger = LoggerFactory.getLogger(RetryService.class);
@Cacheable(value = "items", key = "#id")
@Retry(name = "retryService", fallbackMethod = "fallback")
public String getItemById(String id) {
logger.info("Fetching item with id {}", id);
// 模拟数据库调用,可能会引发异常
if (Math.random() > 0.8) {
throw new RuntimeException("Simulated database error");
}
return "Cache " + id;
}
// 回退方法
public String fallback(String id, RuntimeException e) {
return "Fallback : " + id;
}
}
注:该类简单的使用redis缓存,重试机制可以自动重新执行失败的操作。通过配置重试次数和重试间隔,可以在遇到暂时性错误时增加操作成功的机会。
application.yml对应的配置信息:
retry:
instances:
retryService:
maxAttempts: 5
waitDuration: 1000
enableExponentialBackoff: true
exponentialBackoffMultiplier: 1.5
retryExceptions:
- java.lang.RuntimeException
解释:
-
maxAttempts:
- 最大重试次数。在此示例中,设置为 5,表示如果方法调用失败,将最多重试 5 次。
-
waitDuration:
- 重试之间的等待时间(以毫秒为单位)。在此示例中,设置为 1000 毫秒(1 秒),表示每次重试之间等待 1 秒。
-
enableExponentialBackoff:
- 启用指数回退策略。设置为
true
表示启用指数回退。
- 启用指数回退策略。设置为
-
exponentialBackoffMultiplier:
- 指数回退的倍数。在此示例中,设置为1.5,表示每次重试之间的等待时间将乘以 1.5。
-
retryExceptions:
- 需要重试的异常类型列表。在此示例中,指定了
java.lang.RuntimeException
,表示当方法抛出RuntimeException
时会进行重试。
- 需要重试的异常类型列表。在此示例中,指定了
结果:
直接访问(保证redis正常运行),然后不断的尝试把路径1依次累加,然后发送请求,直到出现监听消息的内容即可看到重试,当方法上使用了 @Cacheable
注解时,如果请求的缓存存在且未过期,那么该方法不会实际执行,而是直接返回缓存中的数据。这意味着在这种情况下,@Retry
注解可能不会生效,因为方法调用不会实际发生,导致没有机会触发重试机制。
监听消息:
2024-07-29T23:23:15.612+08:00 INFO 27996 --- [nio-8080-exec-3] org.example.service.RetryService : Fetching item with id 2
2024-07-29T23:27:22.607+08:00 INFO 27996 --- [nio-8080-exec-6] org.example.service.RetryService : Fetching item with id 3
2024-07-29T23:34:40.837+08:00 INFO 27996 --- [nio-8080-exec-2] org.example.service.RetryService : Fetching item with id 4
2024-07-29T23:34:42.973+08:00 INFO 27996 --- [nio-8080-exec-1] org.example.service.RetryService : Fetching item with id 5
2024-07-29T23:34:44.777+08:00 INFO 27996 --- [nio-8080-exec-3] org.example.service.RetryService : Fetching item with id 6
2024-07-29T23:34:44.793+08:00 INFO 27996 --- [nio-8080-exec-3] o.e.service.Resilience4jEventListener : Retry event: 2024-07-29T23:34:44.793527800+08:00[Asia/Shanghai]: Retry 'retryService', waiting PT1S until attempt '1'. Last attempt failed with exception 'java.lang.RuntimeException: Simulated database error'.
2024-07-29T23:34:45.817+08:00 INFO 27996 --- [nio-8080-exec-3] org.example.service.RetryService : Fetching item with id 6
2024-07-29T23:34:45.822+08:00 INFO 27996 --- [nio-8080-exec-3] o.e.service.Resilience4jEventListener : Retry event: 2024-07-29T23:34:45.822567400+08:00[Asia/Shanghai]: Retry 'retryService' recorded a successful retry attempt. Number of retry attempts: '1', Last exception was: 'java.lang.RuntimeException: Simulated database error'.
11. TimeoutService.java(超时)
package org.example.service;
import io.github.resilience4j.timelimiter.annotation.TimeLimiter;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
@Service
public class TimeoutService {
@TimeLimiter(name = "timeoutService", fallbackMethod = "fallback")
public CompletableFuture<String> timeoutExample() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000); // 模拟长时间处理,这里设置为5秒
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Success";
});
}
public CompletableFuture<String> fallback(Throwable t) {
return CompletableFuture.completedFuture("fallback: timeout!");
}
}
application.yml对应的配置信息:
timelimiter:
instances:
timeoutService:
timeoutDuration: 2s
解释:设置为 2 秒。如果方法调用在 2 秒内未完成,TimeLimiter
会中断调用并执行回退方法。
结果:
监听消息:
TimeLimiter Event: 2024-07-29T23:02:39.073307500+08:00[Asia/Shanghai]: TimeLimiter 'timeoutService' recorded a timeout exception.
12. BulkheadTest.java(批隔离测试)
package org.example.test;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BulkheadTest {
public static void main(String[] args) {
// 创建一个固定大小的线程池,线程数为6
ExecutorService executor = Executors.newFixedThreadPool(6);
// 初始化一个 CountDownLatch,用于控制所有任务的开始
CountDownLatch latch = new CountDownLatch(1);
// 提交6个任务到线程池
for (int i = 1; i <= 6; i++) {
final int id = i;
executor.submit(() -> {
try {
// 等待 latch 释放,确保所有任务同时开始
latch.await();
// 创建 URL 对象,指定请求路径
URL url = new URL("http://localhost:8080/process/" + id);
// 打开连接
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
// 设置请求方法为 GET
conn.setRequestMethod("GET");
// 获取响应代码
int responseCode = conn.getResponseCode();
// 打印响应代码
System.out.println("Response Code for request " + id + ": " + responseCode);
} catch (Exception e) {
// 打印异常信息
e.printStackTrace();
}
});
}
// 释放 latch,启动所有任务
latch.countDown();
// 关闭线程池
executor.shutdown();
}
}
注:该类用于模拟批隔离,策略限制了同时处理的并发调用数量,确保系统部分组件的问题不会导致整个系统的瘫痪。
13. 总结
通过Resilience4j+Redis实现超时检测,限速访问,以及重试,还有熔断回退和批隔离等简单的案例模拟,仅供学习交流使用。