首先介绍一下我的使用场景
我在redis set集合中有几十万个行程id,我需要一个脚本来离线计算每个行程的里程,计算完了之后,将公里数填到mongodb的表中,并且删除set集合中这个元素。
我的目录结构
我们创建一个maven项目,然后在启动类中增加代码
package com.ke.mileage;
import com.ke.mileage.service.GpsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class CountMileageApplication implements CommandLineRunner {
@Autowired
private GpsService gpsService;
public static void main(String[] args) {
SpringApplication.run(CountMileageApplication.class,args);
}
@Override
public void run(String... args) throws Exception {
gpsService.getGps();
}
}
这里实现CommandLineRunner接口,在项目启动后会执行run方法。
GpsService
package com.ke.mileage.service;
public interface GpsService {
void getGps();
}
GpsServiceImpl
package com.ke.mileage.service.impl;
import com.ke.mileage.handler.GpsMileageHandler;
import com.ke.mileage.service.GpsService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.*;
@Slf4j
@Service
public class GpsServiceImpl implements GpsService{
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
private RedisTemplate redisTemplate;
private final static int coreNum = Runtime.getRuntime().availableProcessors();
//一次从set集合中取的个数
private final static int tripCount = 2000;
//创建一个线程池对象
/**
* 参数信息:
* int corePoolSize 核心线程大小
* int maximumPoolSize 线程池最大容量大小
* long keepAliveTime 线程空闲时,线程存活的时间
* TimeUnit unit 时间单位
* BlockingQueue<Runnable> workQueue 任务队列。一个阻塞队列
*/
private final static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
coreNum,
coreNum,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
@Override
public void getGps() {
try{
//从redis中获取行程id
List<String> tripSet = redisTemplate.opsForSet().randomMembers("truck_gps_trip_set",tripCount);
log.info("获取的tripSize:{}",tripSet.size());
CountDownLatch latch = new CountDownLatch(tripSet.size());
for (String trip :tripSet) {
//提交任务
poolExecutor.submit(new GpsMileageHandler(latch,trip,mongoTemplate,redisTemplate));
}
try {
//查看执行情况,有异常将会在此显示。
latch.await();
System.out.println("所有行程已经计算完毕");
if(tripSet.size()<1000){
System.out.println("关闭线程池");
poolExecutor.shutdown();
}else{
System.out.println("已处理完成,递归继续处理");
getGps();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}catch (Exception e){
log.info("GpsServiceImpl异常-------{},getStackTrace:{}",e.getMessage(),e.getStackTrace());
}
}
}
上面代码中tripCount的数量是2000,线程池的队列是10000,为什么要这么做呢?
1、在创建线程池的时候,队列我用的是有界队列ArrayBlockingQueue,而不是无界的,因为我想控制内存的占用,不想太大。
2、如果tripCount的数量很大,超出了线程池的队列的数量,则会被线程池给拒绝,这个任务就丢失了,具体的线程池拒绝策略大家可以详细了解一下。是为了保证所有的任务都被执行。
redis中的set集合有40w,一次2000,那如何能进行下一次呢,这个时候就得递归了,所以要拿到每次的所有线程执行完的结果,这里我们用到了一个CountDownLatch,这个就类似于一个计数器,每个线程执行完任务都会-1,最后为0的时候就会执行。
递归结束的条件我这里是判断了一下从set集合中取出来的条数,我取2000条,但得到的不到1000条,说明这次执行完任务,所有任务就全部执行完了,就该退出了
GpsMileageHandler
package com.ke.mileage.handler;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
@Slf4j
public class GpsMileageHandler implements Runnable{
private MongoTemplate mongoTemplate;
private RedisTemplate redisTemplate
//存放要处理的行程id
private String redisTripId;
private CountDownLatch latch;
public GpsMileageHandler(CountDownLatch latch, String tripId,MongoTemplate mongoTemplate,RedisTemplate redisTemplate) {
this.redisTripId = tripId;
this.latch = latch;
this.mongoTemplate = mongoTemplate;
this.redisTemplate = redisTemplate;
}
@Override
public void run() {
log.info("当前线程:{}",Thread.currentThread().getName());
handler();
//这里计时器减1
latch.countDown();
}
public void handler(){
try{
//在这里执行一系列的处理逻辑就好了
}catch (Exception e){
e.printStackTrace();
log.info("GpsMileageHandler异常2-------{},getStackTrace:{}",e.getMessage(),e.getStackTrace());
}
}
}