java 多线程处理任务

news2024/9/24 3:27:26

首先介绍一下我的使用场景

我在redis set集合中有几十万个行程id,我需要一个脚本来离线计算每个行程的里程,计算完了之后,将公里数填到mongodb的表中,并且删除set集合中这个元素。

我的目录结构

  1. 我们创建一个maven项目,然后在启动类中增加代码

package com.ke.mileage;

import com.ke.mileage.service.GpsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class CountMileageApplication implements CommandLineRunner {

    @Autowired
    private GpsService gpsService;

    public static void main(String[] args) {
        SpringApplication.run(CountMileageApplication.class,args);
    }

    @Override
    public void run(String... args) throws Exception {
        gpsService.getGps();
    }
}

这里实现CommandLineRunner接口,在项目启动后会执行run方法。

GpsService

package com.ke.mileage.service;

public interface GpsService {
    void getGps();
}

GpsServiceImpl

package com.ke.mileage.service.impl;

import com.ke.mileage.handler.GpsMileageHandler;
import com.ke.mileage.service.GpsService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.*;
import java.util.concurrent.*;

@Slf4j
@Service
public class GpsServiceImpl implements GpsService{

    @Autowired
    private MongoTemplate mongoTemplate;

    @Autowired
    private RedisTemplate redisTemplate;

    private final static int coreNum = Runtime.getRuntime().availableProcessors();
    //一次从set集合中取的个数
    private final static int tripCount = 2000;
    //创建一个线程池对象
    /**
     * 参数信息:
     * int corePoolSize     核心线程大小
     * int maximumPoolSize  线程池最大容量大小
     * long keepAliveTime   线程空闲时,线程存活的时间
     * TimeUnit unit        时间单位
     * BlockingQueue<Runnable> workQueue  任务队列。一个阻塞队列
     */
    private final static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
            coreNum,
            coreNum,
            10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(10000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.DiscardOldestPolicy()
    );
    @Override
    public void getGps() {
        try{
            //从redis中获取行程id
            List<String> tripSet = redisTemplate.opsForSet().randomMembers("truck_gps_trip_set",tripCount);
            log.info("获取的tripSize:{}",tripSet.size());
            CountDownLatch latch = new CountDownLatch(tripSet.size());
            for (String trip :tripSet) {
                //提交任务
                poolExecutor.submit(new GpsMileageHandler(latch,trip,mongoTemplate,redisTemplate));
            }
            try {
                //查看执行情况,有异常将会在此显示。
                latch.await();
                System.out.println("所有行程已经计算完毕");
                if(tripSet.size()<1000){
                    System.out.println("关闭线程池");
                    poolExecutor.shutdown();
                }else{
                    System.out.println("已处理完成,递归继续处理");
                    getGps();
                }

            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }catch (Exception e){
            log.info("GpsServiceImpl异常-------{},getStackTrace:{}",e.getMessage(),e.getStackTrace());
        }
    }
}

上面代码中tripCount的数量是2000,线程池的队列是10000,为什么要这么做呢?

1、在创建线程池的时候,队列我用的是有界队列ArrayBlockingQueue,而不是无界的,因为我想控制内存的占用,不想太大。

2、如果tripCount的数量很大,超出了线程池的队列的数量,则会被线程池给拒绝,这个任务就丢失了,具体的线程池拒绝策略大家可以详细了解一下。是为了保证所有的任务都被执行。

redis中的set集合有40w,一次2000,那如何能进行下一次呢,这个时候就得递归了,所以要拿到每次的所有线程执行完的结果,这里我们用到了一个CountDownLatch,这个就类似于一个计数器,每个线程执行完任务都会-1,最后为0的时候就会执行。

递归结束的条件我这里是判断了一下从set集合中取出来的条数,我取2000条,但得到的不到1000条,说明这次执行完任务,所有任务就全部执行完了,就该退出了

GpsMileageHandler

package com.ke.mileage.handler;
import lombok.extern.slf4j.Slf4j;

import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;

@Slf4j
public class GpsMileageHandler implements Runnable{
    private MongoTemplate mongoTemplate;
    private RedisTemplate redisTemplate
    //存放要处理的行程id
    private String redisTripId;
    private CountDownLatch latch;
    public GpsMileageHandler(CountDownLatch latch, String tripId,MongoTemplate mongoTemplate,RedisTemplate redisTemplate) {
        this.redisTripId = tripId;
        this.latch = latch;
        this.mongoTemplate = mongoTemplate;
        this.redisTemplate = redisTemplate;

    }
    @Override
    public void run() {
        log.info("当前线程:{}",Thread.currentThread().getName());
        handler();
        //这里计时器减1
        latch.countDown();
    }
    public void handler(){
        try{ 
            //在这里执行一系列的处理逻辑就好了
        }catch (Exception e){
            e.printStackTrace();
            log.info("GpsMileageHandler异常2-------{},getStackTrace:{}",e.getMessage(),e.getStackTrace());
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/382922.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

STM32之PWM

PWMPWM&#xff0c;英文名Pulse Width Modulation&#xff0c;是脉冲宽度调制缩写&#xff0c;它是通过对一系列脉冲的宽度进行调制&#xff0c;等效出所需要的波形&#xff08;包含形状以及幅值&#xff09;&#xff0c;对模拟信号电平进行数字编码&#xff0c;也就是说通过调…

Office 365用户报告

通过ADManager Plus的现成Office 365用户报告&#xff0c;您无需复杂的PowerShell脚本&#xff0c;即可查找Office 365环境中用户的重要信息。使用这些报告&#xff0c;您只需点击几次基于Web的控制台&#xff0c;即可提取Office 365环境中活动和不活动用户数量等信息&#xff…

Celery 分布式任务队列

1. 认识 Celery Celery 是一个 基于 Python 开发的分布式异步消息任务队列&#xff0c;可以实现任务异步处理&#xff0c;制定定时任务等。 异步消息队列&#xff1a;执行异步任务时&#xff0c;会返回一个任务 ID 给你&#xff0c;过一段时间后拿着任务 ID 去取执行结果定时…

进程的介绍

文章目录一.进程的概念1.1概念1.2进程的组成1.2.1 PCB中描述进程的特征二.进程的虚拟地址空间三.进程间的通信引入线程一.进程的概念 1.1概念 百科的介绍: 换句话说,一个跑起来的程序,就是一个进程,也就是在操作系统中运行的exe程序就是一个进程,如下图的进程列表 进程是操…

【女士,房间墙上凿个洞,看你在干嘛~】安全攻防内网渗透-绕过防火墙和安全检测,搭建DNS隐蔽隧道

作者&#xff1a;Eason_LYC 悲观者预言失败&#xff0c;十言九中。 乐观者创造奇迹&#xff0c;一次即可。 一个人的价值&#xff0c;在于他所拥有的。所以可以不学无术&#xff0c;但不能一无所有&#xff01; 技术领域&#xff1a;WEB安全、网络攻防 关注WEB安全、网络攻防。…

Java并发包提供了哪些并发工具类?

第19讲 | Java并发包提供了哪些并发工具类&#xff1f; 通过前面的学习&#xff0c;我们一起回顾了线程、锁等各种并发编程的基本元素&#xff0c;也逐步涉及了 Java 并发包中的部分内容&#xff0c;相信经过前面的热身&#xff0c;我们能够更快地理解 Java 并发包。 今天我要…

SpringBoot集成Swagger3.0(入门) 02

文章目录Swagger3常用配置注解接口测试API信息配置Swagger3 Docket开关&#xff0c;过滤&#xff0c;分组Swagger3常用配置注解 ApiImplicitParams,ApiImplicitParam&#xff1a;Swagger3对参数的描述。 参数名参数值name参数名value参数的具体意义&#xff0c;作用。required参…

mes系统核心业务流程及应用场景介绍

现在许多企业已经开始使用MES系统控制和管理工厂的生产过程&#xff0c;实时监控、诊断和控制生产过程&#xff0c;完成单元集成和系统优化。本文将为大家具体介绍一下MES系统的业务流程。 MES系统业务流程 1、计划调度MES系统承接了ERP订单&#xff0c;开始干预生产。该模块…

kaggle RSNA 比赛过程总结

引言 算算时间&#xff0c;有差不多两年多没在打kaggle了&#xff0c;自20年最后一场后&#xff08;其实之前也就打过两场&#xff0c;一场打铁&#xff0c;一场表格赛是金是银不太记得&#xff0c;当时相当于刺激战场&#xff0c;过拟合lb大赛太刺激了&#xff0c;各种trick只…

毕业设计 基于51单片机的指纹红外密码电子锁

基于51单片机的指纹红外密码电子锁1、项目简介1.1 系统框架1.2 系统功能2、部分电路设计2.1 STC89C52单片机最小系统电路设计2.2 矩阵按键电路电路设计2.3 液晶显示模块电路设计3、部分代码展示3.1 LCD12864显示字符串3.2 串口初始化实物图1、项目简介 选题指导&#xff0c;项…

动态规划|特殊的多行规划|dp[2][] 用两行元素分别记录状态变化

多行规划是我自己整理此类问题时起的名字&#xff0c;如有专属名词&#xff0c;麻烦评论告知 用于处理当动态规划中&#xff0c;需要记录多个值的状态变化时。 376. 摆动序列&#xff08;特殊的自定义二维dp&#xff09; 做惯了一般的动态规划&#xff0c;突然看到这种题目&a…

UDPTCP网络编程

udp编程接口 一个UDP程序的编写可以分为3步&#xff1a; 创建一个网络套接字&#xff1a; 它相当于文件操作时的文件描述符&#xff0c;是一个程序进行网络通讯的门户&#xff0c; 所有的网络操作都要基于它 绑定IP和端口&#xff1a; 需要为网络套接字填充IP和端口信息 但是…

Python - 操作txt文件

文章目录打开txt文件读取txt文件写入txt文件删除txt文件打开txt文件 open(file, moder, bufferingNone, encodingNone, errorsNone, newlineNone, closefdTrue)函数用来打开txt文件。 #方法1&#xff0c;这种方式使用后需要关闭文件 f open("data.txt","r&qu…

【Visual Studio】git提交代码时使用GPG

前言 下载安装GPG的过程省略,直接开始进行配置 0.visual studio 版本说明 其余版本未测试,但是应该也是可以的 1 获取GPG的密钥ID 1.1 window下可以打开Kleopatra查看生成好的密钥的密钥ID 1.2 也可以从命令行中获取 gpg --list-keys 红框位置,后16位就是密钥ID 2 配置.git…

QML MouseArea详解

1.MouseArea简介 MouseArea是一个不可见的项目&#xff0c;通常与一个可见的项目一起使用&#xff0c;以便为该项目提供鼠标处理。通过有效地充当代理&#xff0c;鼠标处理的逻辑可以包含在MouseArea项中。 常用属性&#xff1a; 属性 类型描述 containsMouse bool 光标当前…

刷题笔记2 | 977.有序数组的平方 ,209.长度最小的子数组 ,59.螺旋矩阵II ,总结

977.有序数组的平方 给你一个按 非递减顺序 排序的整数数组 nums&#xff0c;返回 每个数字的平方 组成的新数组&#xff0c;要求也按 非递减顺序 排序。 输入&#xff1a;nums [-4,-1,0,3,10] 输出&#xff1a;[0,1,9,16,100] 解释&#xff1a;平方后&#xff0c;数组变为 […

二、Spring概述

1.Spring简介 Spring是一个开源框架&#xff0c;它由Rod Johnson创建。它是为了解决企业应用开发的复杂性而创建的。 从简单性、可测试性和松耦合的角度而言&#xff0c;任何Java应用都可以从Spring中受益。 Spring是一个轻量级的控制反转(IoC)和面向切面(AOP)的容器框架。 Sp…

关于如何合理设置线程池参数解决方案

关于如何合理设置线程池参数解决方案&#xff08;ThreadPoolExecutor&#xff09; 线程池参数有哪些 我们直接来看构造方法 ... public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6,ThreadFactory var7, Rejecte…

W25Q256被写保护如何修改

W25Q256被写保护如何修改1、 W25Q256数据读不到1.1 打印的寄存器的值1.2 可能原因1.3 解决办法1.4 用到的函数1、 W25Q256数据读不到 能够正确的读到ID&#xff0c;但是读到的数据不正确 1.1 打印的寄存器的值 0x2 BUSY &#xff1a;只读&#xff0c; 指令正在执行 WEL (1) &…

物盾安全汤晓冬:工业互联网企业如何应对高发的供应链安全风险?

编者按&#xff1a;物盾安全是一家专注于物联网安全的产品厂商&#xff0c;其核心产品“物安盾”在能源、制造、交通等多个领域落地&#xff0c;为这些行业企业提供覆盖物联网云、管、边、端的安全整体解决方案。“物安盾”集成了腾讯安全制品扫描&#xff08;BSCA&#xff09;…