文章目录
- 任务执行和调度
- 热帖排行
- 生成长图
- 优化网站的性能
使用Quartz执行定时任务,实现热帖排行功能时,通过定时任务定时计算帖子分数,降低计算的数据量。使用wkhtmltopdf生成长图。通过多级缓存对热帖功能进行优化,提升网站性能。
任务执行和调度
-
JDK线程池
- ExecutorService:普通线程池,可以创建普通的线程。
- ScheduledExecutorService:该线程池创建的线程可以执行间隔任务。
// 示例 public class ThreadPoolTests { private static final Logger logger = LoggerFactory.getLogger(ThreadPoolTests.class); // JDK普通线程池 private ExecutorService executorService = Executors.newFixedThreadPool(5); // JDK可执行定时任务的线程池 private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5); private void sleep(long m){ try { Thread.sleep(m); } catch (InterruptedException e) { throw new RuntimeException(e); } } // 1. JDK普通线程池 @Test public void testExecutorService(){ Runnable task = new Runnable() { @Override public void run() { logger.debug("Hello ExecutorService"); } }; for (int i = 0; i < 10; i++) { executorService.submit(task); } sleep(10000); } // 2.JDK定时任务线程池 @Test public void testScheduledExecutorService(){ Runnable task = new Runnable() { @Override public void run() { logger.debug("Hello ScheduledExecutorService"); } }; scheduledExecutorService.scheduleAtFixedRate(task,10000,1000, TimeUnit.MILLISECONDS); sleep(30000); } }
-
Spring 线程池
- ThreadPoolTaskExecutor:普通线程池,创建普通线程。
- ThreadPoolTaskScheduler:该线程池创建的线程可以执行定时任务。
首先需要在配置文件中进行配置:
# TaskExecutionProperties spring.task.execution.pool.core-size=5 spring.task.execution.pool.max-size=15 spring.task.execution.pool.queue-capacity=100 # TaskSchedulingProperties spring.task.scheduling.pool.size=5
配置类ThreadPoolConfig:
@Configuration @EnableScheduling @EnableAsync public class ThreadPoolConfig { }
测试类:
@RunWith(SpringRunner.class) @SpringBootTest @ContextConfiguration(classes = CommunityApplication.class) public class ThreadPoolTests { private static final Logger logger = LoggerFactory.getLogger(ThreadPoolTests.class); // Spring 普通线程池 @Autowired private ThreadPoolTaskExecutor taskExecutor; // Spring 可执行定时任务的线程池 @Autowired private ThreadPoolTaskScheduler taskScheduler; private void sleep(long m){ try { Thread.sleep(m); } catch (InterruptedException e) { throw new RuntimeException(e); } } // 3.Spring 普通线程池 @Test public void testThreadPoolTaskExecutor(){ Runnable task = new Runnable() { @Override public void run() { logger.debug("Hello ThreadPoolTaskExecutor"); } }; for (int i = 0; i < 10; i++) { taskExecutor.submit(task); } sleep(10000); } // 4. Spring 定时任务线程池 @Test public void testThreadPoolTaskScheduler(){ Runnable task = new Runnable() { @Override public void run() { logger.debug("Hello ThreadPoolTaskScheduler"); } }; Date startTime = new Date(System.currentTimeMillis()+10000); taskScheduler.scheduleAtFixedRate(task,startTime,1000); sleep(30000); } }
Spring线程池的使用(简化版)
@Service public class AlphaService { private static final Logger logger = LoggerFactory.getLogger(AlphaService.class); // 让该方法在多线程环境下,被异步的调用 @Async public void execute1(){ logger.debug("execute1"); } // 让该方法定时的去执行 @Scheduled(initialDelay = 10000,fixedRate = 1000) public void execute2(){ logger.debug("execute2"); } } @RunWith(SpringRunner.class) @SpringBootTest @ContextConfiguration(classes = CommunityApplication.class) public class ThreadPoolTests { private static final Logger logger = LoggerFactory.getLogger(ThreadPoolTests.class); // Spring 普通线程池 @Autowired private ThreadPoolTaskExecutor taskExecutor; @Autowired private AlphaService alphaService; // Spring 可执行定时任务的线程池 @Autowired private ThreadPoolTaskScheduler taskScheduler; private void sleep(long m){ try { Thread.sleep(m); } catch (InterruptedException e) { throw new RuntimeException(e); } } // 5. Spring普通线程池(简化) @Test public void testThreadPoolTaskExecutorSimple(){ for (int i = 0; i < 10; i++) { alphaService.execute1(); } sleep(10000); } // 6.Spring定时任务线程池(简化) @Test public void testThreadPoolTaskSchedulerSimple(){ sleep(30000); } }
-
分布式任务 Quartz分布式任务调度
- Spring Quartz
- Quartz将程序运行所需要依赖的参数都存在了数据库里。所以是依赖于数据库的,用的时候需要提前创建表。
- Spring Quartz使用:主要就是两步,定义任务相关的Job类和在配置类QuartzConfig中配置相关的JobDetail和Trigger接口。
1.导入依赖 spring-boot-starter-quartz
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-quartz</artifactId> </dependency>
2.定义任务,通过Job接口进行定义,在/quartz路径下创建一个job类,实现Job接口。
// 示例 public class AlphaJob implements Job { @Override public void execute(JobExecutionContext context) throws JobExecutionException { System.out.println(Thread.currentThread().getName() + ": execute a quartz job."); } }
3.在QuartzConfig类下进行相关配置,主要是配置JobDetail和Trigger接口。
// 配置 -> 数据库 -> 调用 @Configuration public class QuartzConfig { // FactoryBean 可简化Bean的实例化过程 // 1. 通过 FactoryBean 封装 Bean的实例化过程 // 2. 将FactoryBean装配到Spring容器里 // 3. 将FactoryBean注入给其他的Bean // 4. 该Bean得到的是FactoryBean所管理的对象实例 // 配置JobDetail @Bean public JobDetailFactoryBean alphaJobDetail(){ JobDetailFactoryBean factoryBean = new JobDetailFactoryBean(); factoryBean.setJobClass(AlphaJob.class); factoryBean.setName("alphaJob"); factoryBean.setGroup("alphaJobGroup"); factoryBean.setDurability(true); factoryBean.setRequestsRecovery(true); return factoryBean; } // 配置Trigger(SimpleTriggerFactoryBean, CronTriggerFactoryBean) @Bean public SimpleTriggerFactoryBean alphaTrigger(JobDetail alphaJobDetail){ SimpleTriggerFactoryBean factoryBean = new SimpleTriggerFactoryBean(); factoryBean.setJobDetail(alphaJobDetail); factoryBean.setName("alphaTrigger"); factoryBean.setGroup("alphaTriggerGroup"); factoryBean.setRepeatInterval(3000); factoryBean.setJobDataAsMap(new JobDataMap()); return factoryBean; } }
注意⚠️:BeanFactory是容器的顶层接口,FactoryBean可简化Bean的实例化过程。
4.配置文件application.properties,可以按照自己的想法来配置。配置前quartz是读取内存中的配置来执行任务的。配置后将任务持久化到数据库里,从而实现分布式任务。
# QuartzProperties spring.quartz.job-store-type=jdbc spring.quartz.scheduler-name=communityScheduler spring.quartz.properties.org.quartz.scheduler.instanceId=AUTO spring.quartz.properties.org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX spring.quartz.properties.org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate spring.quartz.properties.org.quartz.jobStore.isClustered=true spring.quartz.properties.org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool spring.quartz.properties.org.quartz.threadPool.threadCount=5
5.删除Job,删除该job后如果希望再次启动项目时不会有该定时任务,那么将配置类中配置JobDetail和Trigger相关的Bean注释掉即可。
public class QuartzTests { @Autowired private Scheduler scheduler; // 调度器 @Test public void testDeleteJob(){ try { boolean result = scheduler.deleteJob(new JobKey("alphaJob", "alphaJobGroup")); System.out.println(result); } catch (SchedulerException e) { throw new RuntimeException(e); } } }
补充:Quartz核心概念
- 任务
Job
:Job 就是你想要实现的任务类,每一个Job必须实现 org.quartz.job 接口,且只需实现接口定义的 execute() 方法。 - 触发器
Trigger
:Trigger 为你执行任务的触发器,比如你想每天定时3点发送一份统计邮件,Trigger 将会设置3点执行该任务。 Trigger 主要包含两种 SimplerTrigger 和 CronTrigger 两种。 - 调度器
Scheduler
:Scheduler 为任务的调度器,它会将任务 Job 及触发器 Trigger 整合起来,负责基于 Trigger 设定的时间来执行 Job。
热帖排行
-
启动定时任务,定时的计算帖子的分数,为首页帖子列表增加按分数排序的功能。
-
当帖子被加精、点赞、评论的时候,将帖子id放到Redis缓存里,然后定时任务定时去计算缓存中帖子的分数,从而降低计算的数据量。
具体实现:
1.定义与帖子跟书相关的redisKey,数据结构为set,存储的内容是需要重新计算帖子分数的帖子id。
// 帖子分数
public static String getPostScoreKey(){
return PREFIX_POST + SPLIT + "score";
}
2.当发布帖子,给帖子加精,评论或点赞的时候,需要重新计算帖子分数,此时将需要被计算的帖子的id存储到redis中。
// 将需要计算分数的帖子id存到redis中
String redisKey = RedisKeyUtil.getPostScoreKey();
redisTemplate.opsForSet().add(redisKey,post.getId());
3.定时任务,定期刷新帖子分数,使用Quartz实现。首先定义一个刷新帖子分数的Job,然后在配置类中进行JobDetail和Trigger相关的配置。
- 定义任务,包括初始化纪元,定时任务刷新帖子分数。
public class PostScoreRefreshJob implements Job, CommunityConstant {
private static final Logger logger = LoggerFactory.getLogger(PostScoreRefreshJob.class);
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private DiscussPostService discussPostService;
@Autowired
private LikeService likeService;
@Autowired
private ElasticsearchService elasticsearchService;
// 初始化纪元
private static final Date epoch;
static {
try {
epoch = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2014-08-01 00:00:00");
} catch (ParseException e) {
throw new RuntimeException("初始化纪元失败!",e);
}
}
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
String redisKey = RedisKeyUtil.getPostScoreKey();
BoundSetOperations operations = redisTemplate.boundSetOps(redisKey);
if(operations.size() == 0){
logger.info("[任务取消] 没有需要刷新的帖子!");
return;
}
logger.info("[任务开始] 正在刷新帖子分数:"+operations.size());
while (operations.size()>0){
this.refresh((Integer)operations.pop());
}
logger.info("[任务结束] 帖子分数刷新完毕!");
}
private void refresh(int postId){
DiscussPost post = discussPostService.findDiscussPostById(postId);
if(post == null){
logger.error("该帖子不存在:id = "+postId);
return;
}
//正式算分
// 是否加精
boolean wonderful = post.getStatus()==1;
// 评论数量
int commentCount = post.getCommentCount();
// 点赞数量
long likeCount = likeService.findEntityLikeCount(ENTITY_TYPE_POST,postId);
// 算分公式:log(精华分 + 评论数 * 10 + 点赞数 * 2 + 收藏数 * 2) + (发布时间 - 初始纪元)
// 计算权重
double w = (wonderful ? 75 : 0) + commentCount * 10 + likeCount * 2;
// 分数 = 帖子权重 + 距离天数
double score = Math.log10(Math.max(w,1)) + (post.getCreateTime().getTime() - epoch.getTime()) / (1000 * 3600 * 24);
// 更新帖子的分数
discussPostService.updateScore(postId,score);
// 同步搜索数据
post.setScore(score);
elasticsearchService.saveDiscussPost(post);
}
}
- 配置JobDetail和Trigger
// 配置 -> 数据库 -> 调用
@Configuration
public class QuartzConfig {
// FactoryBean 可简化Bean的实例化过程
// 1. 通过 FactoryBean 封装 Bean的实例化过程
// 2. 将FactoryBean装配到Spring容器里
// 3. 将FactoryBean注入给其他的Bean
// 4. 该Bean得到的是FactoryBean所管理的对象实例
// 刷新帖子分数任务
// 配置JobDetail
@Bean
public JobDetailFactoryBean postScoreRefreshJobDetail(){
JobDetailFactoryBean factoryBean = new JobDetailFactoryBean();
factoryBean.setJobClass(PostScoreRefreshJob.class);
factoryBean.setName("postScoreRefreshJob");
factoryBean.setGroup("communityJobGroup");
factoryBean.setDurability(true);
factoryBean.setRequestsRecovery(true);
return factoryBean;
}
// 配置Trigger(SimpleTriggerFactoryBean, CronTriggerFactoryBean)
@Bean
public SimpleTriggerFactoryBean postScoreRefreshTrigger(JobDetail postScoreRefreshJobDetail){
SimpleTriggerFactoryBean factoryBean = new SimpleTriggerFactoryBean();
factoryBean.setJobDetail(postScoreRefreshJobDetail);
factoryBean.setName("postScoreRefreshTrigger");
factoryBean.setGroup("communityTriggerGroup");
factoryBean.setRepeatInterval(1000 * 60 * 5); // 五分钟
factoryBean.setJobDataAsMap(new JobDataMap());
return factoryBean;
}
}
4.热帖排行:访问首页热帖功能时,将帖子按分数排序后返回。该部分按照数据访问层、业务层、表现层,一步步修改重构原来的代码就可以。
⚠️注意一点:如果需要对请求参数设置默认值,可以用@RequestParam注解
// 示例
@RequestParam(name = "orderMode",defaultValue = "0") int orderMode
生成长图
-
wkhtmltopdf 命令行使用
wkhtmltopdf url file
wkhtmltoimage url file
示例:
wkhtmltoimage —quality 75 url file
:图片压缩质量75% -
Java 代码使用
- Runtime.getRuntime().exec()
// 示例 public class WkTests { public static void main(String[] args) { String cmd = "/usr/local/bin/wkhtmltoimage --quality 75 <https://www.baidu.com> /Users/amelia/IdeaProjects/data/wk-images/1.png"; try { Process p = Runtime.getRuntime().exec(cmd); if(p.waitFor()==0){ System.out.println("ok."); } } catch (IOException | InterruptedException e) { e.printStackTrace(); } } }
-
模拟实现分享功能
1.配置文件中添加wk的相关配置
wk.image.command=/usr/local/bin/wkhtmltoimage wk.image.storage=/Users/amelia/IdeaProjects/data/wk-images
2.在相关配置类WkConfig中进行一些初始化配置
@Configuration public class WkConfig { private static final Logger logger = LoggerFactory.getLogger(WkConfig.class); @Value("${wk.image.storage}") private String wkImageStorage; @PostConstruct public void init(){ // 创建WK图片目录 File file = new File(wkImageStorage); if (!file.exists()){ file.mkdir(); logger.info("创建WK图片目录:"+wkImageStorage); } } }
3.在ShareController类中编写表现层逻辑,主要有两个方法,生成长图方法和获取长图方法
- 在生成长图方法中触发生成长图事件,所以还需要在EventConsumer中添加一个消费生成长图事件的方法。
@Component public class EventConsumer implements CommunityConstant { private static final Logger logger = LoggerFactory.getLogger(EventConsumer. @Value("${wk.image.command}") private String wkImageCommand; @Value("${wk.image.storage}") private String wkImageStorage; // 消费分享事件,也就是生成长图事件 @KafkaListener(topics = {TOPIC_SHARE}) public void handleShareMessage(ConsumerRecord record){ if(record == null || record.value() == null){ logger.error("消息的内容为空!"); return; } Event event = JSONObject.parseObject(record.value().toString(),Event.class); if(event == null){ logger.error("消息格式错误!"); return; } String htmlUrl = (String) event.getData().get("htmlUrl"); String fileName = (String) event.getData().get("fileName"); String suffix = (String) event.getData().get("suffix"); String cmd = wkImageCommand + " --quality 75 " + htmlUrl + " " + wkImageStorage + "/" + fileName + suffix; try { Runtime.getRuntime().exec(cmd); logger.info("生成长图成功:" + cmd); } catch (IOException e) { logger.error("生成长图失败:" + e.getMessage()); } } }
// 生成长图,并返回生成的长图的链接 ShareController @RequestMapping(path = "/share",method = RequestMethod.GET) @ResponseBody public String share(String htmlUrl){ // 文件名 String fileName = CommunityUtil.generateUUID(); // 异步生成长图,触发生成长图事件 Event event = new Event() .setTopic(TOPIC_SHARE) .setData("htmlUrl",htmlUrl) .setData("fileName",fileName) .setData("suffix",".png"); eventProducer.fireEvent(event); // 返回访问路径 Map<String,Object> map = new HashMap<>(); map.put("shareUrl",domain+contextPath+"/share/image/"+fileName); //map.put("shareUrl",shareBucketUrl + "/" +fileName); return CommunityUtil.getJSONString(0,null,map); }
- 获取长图
// 获取长图 @RequestMapping(path = "/share/image/{fileName}",method = RequestMethod.GET) public void getShareImage(@PathVariable("fileName") String fileName, HttpServletResponse response){ if(StringUtils.isBlank(fileName)){ throw new IllegalArgumentException("文件名不能为空!"); } response.setContentType("image/png"); File file = new File(wkImageStorage+"/"+fileName+".png"); try { OutputStream os = response.getOutputStream(); FileInputStream fis = new FileInputStream(file); byte[] buffer = new byte[1024]; int b = 0; while ((b = fis.read(buffer)) != -1){ os.write(buffer,0,b); } } catch (IOException e) { logger.error("获取长图失败:"+e.getMessage()); } }
优化网站的性能
-
本地缓存
- 将数据缓存在应用服务器上,性能最好。
- 常用缓存工具:Ehcache、Guava、Caffeine等。
-
分布式缓存
- 将数据缓存在NoSQL数据库上,跨服务器。
- 常用缓存工具:MemCache、Redis等。
-
多级缓存
-
一级缓存(本地缓存)> 二级缓存(分布式缓存)> DB。
-
避免缓存雪崩(缓存失效,大量请求直达DB),提高系统的可用性。
-
-
将热帖用多级缓存进行优化,增加Caffeine本地缓存。
1.导入依赖 caffeine
<dependency> <groupId>com.github.ben-manes.caffeine</groupId> <artifactId>caffeine</artifactId> <version>2.7.0</version> </dependency>
- 配置文件:application.properties,进行caffeine相关配置
# caffeine caffeine.posts.maxsize=15 caffeine.posts.expire-seconds=180
3.Service层,查询帖子列表和帖子总数的时候添加本地缓存
@Service public class DiscussPostService { private static final Logger logger = LoggerFactory.getLogger(DiscussPostService.class); @Autowired public DiscussPostMapper discussPostMapper; @Value("${caffeine.posts.maxsize}") private int maxSize; @Value("${caffeine.posts.expire-seconds}") private int expireSeconds; // Caffeine核心接口:Cache, // 两个常用接口 // 1.LoadingCache: 同步缓存 // 2.AsyncLoadingCache: 异步缓存 // 帖子列表缓存 private LoadingCache<String,List<DiscussPost>> postListCache; // 帖子总数缓存 private LoadingCache<Integer,Integer> postRowsCache; @PostConstruct public void init(){ // 初始化帖子列表缓存 postListCache = Caffeine.newBuilder() .maximumSize(maxSize) .expireAfterWrite(expireSeconds, TimeUnit.SECONDS) .build(new CacheLoader<String, List<DiscussPost>>() { @Override public @Nullable List<DiscussPost> load(@NonNull String key) throws Exception { if(key == null || key.length() == 0){ throw new IllegalArgumentException("参数错误!"); } String[] params = key.split(":"); if(params == null || params.length != 2){ throw new IllegalArgumentException("参数错误!"); } int offset = Integer.valueOf(params[0]); int limit = Integer.valueOf(params[1]); // 可以加二级缓存: Redis -> mysql logger.debug("load post list from DB."); return discussPostMapper.selectDiscussPosts(0,offset,limit,1); } }); // 初始化帖子总数缓存 postRowsCache = Caffeine.newBuilder() .maximumSize(maxSize) .expireAfterWrite(expireSeconds,TimeUnit.SECONDS) .build(new CacheLoader<Integer, Integer>() { @Override public @Nullable Integer load(@NonNull Integer key) throws Exception { logger.debug("load post rows from DB."); return discussPostMapper.selectDiscussRows(key); } }); } public List<DiscussPost> findDiscussPosts(int userId,int offset,int limit,int orderMode){ if(userId == 0 && orderMode == 1){ return postListCache.get(offset+":"+limit); } logger.debug("load post list from DB."); return discussPostMapper.selectDiscussPosts(userId,offset,limit,orderMode); } public int findDiscussPostRows(int userId){ if(userId == 0){ return postRowsCache.get(userId); } logger.debug("load post rows from DB."); return discussPostMapper.selectDiscussRows(userId); } }
-
压力测试 JMeter
-
线程组进行压力测试,线程数100,Ramp-Up时间:1秒(一秒内创建),循环次数:永远,调度器持续时间:60(持续执行,持续60秒)
-
http请求
-
定时器(统一随机定时器)Random Delay Maximum(in milliseconds):1000
-
监听器:聚合报告,没有加本地缓存前,吞吐量24.0/sec;加了本地缓存后,吞吐量160.7/sec。
-