DataLoader
是一个通用实用程序,用作应用程序数据获取层的一部分,通过批处理和缓存为各种远程数据源(如数据库或 Web 服务)提供简化且一致的 API
批处理
const user = await userLoader.load(1);
const invitedBy = await userLoader.load(user.invitedByID);
console.log(`User 1 was invited by ${invitedBy}`);
// Elsewhere in your application
const user = await userLoader.load(2);
const lastInvited = await userLoader.load(user.lastInvitedID);
console.log(`User 2 last invited ${lastInvited}`);
一个简单的应用程序可能已经发出四次到后端的往返请求以获取所需的信息,但是使用
DataLoader
这个应用程序最多只会进行两次请求
DataLoader 允许您在不牺牲批量数据加载性能的情况下解耦应用程序的不相关部分。虽然加载器提供了一个加载单个值的 API,但所有并发请求将被合并至批量加载函数来使用。这允许您的应用程序在整个应用程序中安全地分发数据获取要求,并保持最少的传出数据请求
批量调度
在同一GraphQL请求中调用DataLoader多次时,DataLoader将收集这些调用,并在GraphQL请求结束时一次性加载所有数据。这可以减少网络延迟和数据库负载,从而提高数据加载的速度
在graphql-java中,可以修改在DataLoader构造函数中BatchingOptions
对象的maxBatchSize
和batchingEnabled
属性
maxBatchSize
:设置DataLoader在执行批处理之前等待负载项的最大数量。默认值为Integer.MAX_VALUE
,这意味着DataLoader将收集所有负载项并一次性加载它们。batchingEnabled
:启用或禁用DataLoader的批处理功能。如果将其设置为false,则DataLoader将在每个负载项上立即执行加载操作,而不是等待负载项集满
BatchingOptions batchingOptions = BatchingOptions.newOptions().setMaxBatchSize(100).setBatchingEnabled(true).build();
DataLoader<Key, Value> dataLoader = new DataLoader<>(batchLoader, batchingOptions);
缓存
DataLoader 为应用程序的单个请求中发生的所有加载提供记忆缓存
当使用给定键调用一次
load()
函数后,结果值将被缓存以避免重复加载。更准确地说load()
是一个记忆函数
DataLoader 缓存不会取代 Redis、Memcache 或任何其他共享应用程序级缓存。DataLoader首先是一种数据加载机制,它的缓存只是为了不在你的Application的单次请求上下文中重复加载相同的数据。为此,它维护了一个内存中记忆缓存
更多关于Dataloader缓存相关知识,如清除缓存、缓存错误等见DataLoader Caching
graphql-java
java-dataloader
是参考Facebook
指引实现的一个功能完整的应用,但在人工调度功能上有一个重要的区别。
java-dataloader
的特点是:
- 简单、直观的 API,使用泛型和流畅的编码
- 使用 lambda 表达式定义批量加载函数
- 在队列中安排加载请求以进行批处理
- 从代码中的任何位置添加加载请求
- 请求返回一个CompleteableFuture请求值
- 可以一次创建多个请求
- 缓存加载请求,因此只获取一次数据
- 可以清除单个缓存键,以便在下一批队列调度时重新获取数据
- 可以使用键/值填充缓存,以避免不必要地获取数据
- 可以使用 lambda 表达式配置缓存键函数,以从复杂的数据加载器键类型中提取缓存键
- 随着批处理的进行,单个批次期货完成/解决
- 结果根据加载请求的插入顺序排序
- 当批处理 future 失败时处理部分错误
- 可以在配置中禁用批处理和/或缓存
- 可以提供您自己的CacheMap<K, V>实现
- 可以提供您自己的ValueCache<K, V>实现
- 具有非常高的测试覆盖率
BatchLoader
一个DataLoader
对象需要一个BatchLoader
函数,该函数负责在给定键列表的情况下加载值
BatchLoader<Long, User> userBatchLoader = new BatchLoader<Long, User>() {
@Override
public CompletionStage<List<User>> load(List<Long> userIds) {
return CompletableFuture.supplyAsync(() -> {
return userManager.loadUsersById(userIds);
});
}
};
DataLoader<Long, User> userLoader = DataLoaderFactory.newDataLoader(userBatchLoader);
DataLoader主要利用了Java的CompletableFuture
异步任务收集再批量处理,最后将结果写回对应任务,故使用CompleteableFuture
加载数据
CompletableFuture<User> load1 = userLoader.load(1L);
在GraphQL查询的每个层级,
dataloader.dispatch()
将被自动调用以触发该部分查询的批处理请求
缓存
DataLoader有一个两层缓存系统:
- 第一个缓存由
org.dataloader.CacheMap
接口表示。它将CompletableFuture
按key缓存,因此在之后load(key)
加载同一个key,会返回相同的Future
该缓存只能在 JVM 本地工作,因为它的缓存
CompletableFuture
无法通过网络序列化。
- 二级缓存是以
org.dataloader.ValueCache
接口表示。默认情况下,这缓存未启用并且是空操作。
值缓存使用异步 API 模式来封装值缓存,以支持外部缓存实现(例如 REDIS 或 Memcached)的想法。
Statistics
java-dataloader的org.dataloader.stats.Statistics
用于收集DataLoader执行过程中的状态,比如加载次数、缓存命中次数等
默认情况下是不会执行数据收集的,需要通过
DataLoaderDispatcherInstrumentation
进行注入
@Bean
public DataLoaderDispatcherInstrumentation dataLoaderDispatcherInstrumentation(){
return new DataLoaderDispatcherInstrumentation(DataLoaderDispatcherInstrumentationOptions.newOptions().includeStatistics(true));
}
Dataloader实例
Client
@Component
public class PointClient {
private static final Logger LOGGER = LoggerFactory.getLogger(PointClient.class);
/**
* 按月查询分值
*
* @param uidList uid列表
* @param month yyyyMM
* @return
*/
public Mono<Map<Long, Integer>> batchGetMonthPoint(Collection<Long> uidList, String month) {
return builder.build().post().uri(BATCH_GET_MONTH_POINT_URI).bodyValue(new BatchUidMonthPointReq(uidList, month))
.retrieve().bodyToMono(BatchUidMonthPointRes.class).retry(RETRY_TIME).map(rsp -> {
LOGGER.info("batchGetMonthPoint uidList:{} rsp:{}", uidList, rsp);
if (rsp.isSuccess()) {
return rsp.getPointMonthList().stream().collect(Collectors.toMap(
p -> Long.parseLong(p.getUid()), p -> p.getPoint().intValue()));
}
return Collections.emptyMap();
});
}
}
DataLoader
@DgsDataLoader(name = "pointLoader")
public class PointBatchLoader implements MappedBatchLoaderWithContext<Long, Integer> {
@Autowired
private PointClient pointClient;
@Override
public CompletionStage<Map<Long, Integer>> load(Set<Long> uidList, BatchLoaderEnvironment batchLoaderEnvironment) {
return pointClient.batchGetMonthPoint(uidList, LocalDate.now().format("yyyyMM")).toFuture();
}
}
DgsData
@DgsData(parentType = "User")
public CompletableFuture<Integer> point(DgsDataFetchingEnvironment dfe){
User user = dfe.getSource();
DataLoader<Long, Integer> dataLoader = dfe.getDataLoader(PointBatchLoader.class);
return dataLoader.load(user.getUid());
}
源码
Dataloader加载及缓存机制可见org.dataloader.DataLoaderHelper
load
若启动缓存,优先从缓存中获取Future
。若启动批处理,加入loader队列,待自动执行;否则立即执行该loader
CompletableFuture<V> load(K key, Object loadContext) {
synchronized (dataLoader) {
boolean batchingEnabled = loaderOptions.batchingEnabled();
boolean cachingEnabled = loaderOptions.cachingEnabled();
stats.incrementLoadCount(new IncrementLoadCountStatisticsContext<>(key, loadContext));
if (cachingEnabled) {
return loadFromCache(key, loadContext, batchingEnabled);
} else {
return queueOrInvokeLoader(key, loadContext, batchingEnabled, false);
}
}
}
dispatch
启动批处理时才会执行的loader分发,达到最大批量配置则执行loader加载。可由ScheduledDataLoaderRegistry定时触发或dispatchAndJoin
手动触发
DispatchResult<V> dispatch() {
boolean batchingEnabled = loaderOptions.batchingEnabled();
//
// we copy the pre-loaded set of futures ready for dispatch
final List<K> keys = new ArrayList<>();
final List<Object> callContexts = new ArrayList<>();
final List<CompletableFuture<V>> queuedFutures = new ArrayList<>();
synchronized (dataLoader) {
loaderQueue.forEach(entry -> {
keys.add(entry.getKey());
queuedFutures.add(entry.getValue());
callContexts.add(entry.getCallContext());
});
loaderQueue.clear();
lastDispatchTime.set(now());
}
if (!batchingEnabled || keys.isEmpty()) {
return new DispatchResult<>(completedFuture(emptyList()), 0);
}
final int totalEntriesHandled = keys.size();
int maxBatchSize = loaderOptions.maxBatchSize();
CompletableFuture<List<V>> futureList;
if (maxBatchSize > 0 && maxBatchSize < keys.size()) {
futureList = sliceIntoBatchesOfBatches(keys, queuedFutures, callContexts, maxBatchSize);
} else {
futureList = dispatchQueueBatch(keys, callContexts, queuedFutures);
}
return new DispatchResult<>(futureList, totalEntriesHandled);
}
参考资料:
- dataloader
- 使用数据加载器
- java-dataloader