剑指JUC原理-20.并发编程实践

news2024/12/26 11:45:09
  • 👏作者简介:大家好,我是爱吃芝士的土豆倪,24届校招生Java选手,很高兴认识大家
  • 📕系列专栏:Spring源码、JUC源码
  • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
  • 🍂博主正在努力完成2023计划中:源码溯源,一探究竟
  • 📝联系方式:nhs19990716,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀

文章目录

  • 并发编程实践
    • 定时任务
    • 模拟监听器
    • 文件中转暂存数据
    • 模拟大数据导入解析
    • 查询接口优化 - 串行改并行
    • ThreadLocal
      • Mysql应用
        • 场景构建
          • 引入事务
          • 常规解决方案
          • 常规方案的实现
          • 常规方案的弊端
          • ThreadLocal方案的实现
      • TraceId日志 - 传递参数
      • 过滤器 和 拦截器详解
        • 过滤器与拦截器相同点
        • 过滤器与拦截器区别
        • 过滤器 与 拦截器经典问题?
          • 1.过滤器和拦截器的区别是什么?
          • 2.过滤器和拦截器的执行顺序是怎样的?
          • 3.过滤器和拦截器的作用有哪些?
          • 4.过滤器和拦截器的使用场景有哪些?
          • 5.如何在Java Web应用程序中使用过滤器和拦截器?
      • TransmittableThreadLocal
        • 线程池场景下ThreadLocal的值传递问题
          • 复写submit或者execute方法
          • 利用InheritableThreadLocals
          • 利用TransmittableThreadLocal
    • 模拟高并发
    • 处理消息队列消息
    • 统计数量
        • AtomicLong可以弃用了吗?
        • 总结
  • 参考文章

并发编程实践

定时任务

其实使用Thread是能够模拟定时任务的,其中一些定时任务框架的底层源码中,最后也会使用到Thread

实现这种定时任务的具体代码如下:

public static void init() {
    new Thread(() -> {
        while (true) {
            try {
                System.out.println("下载文件");
                Thread.sleep(1000 * 60 * 5);
            } catch (Exception e) {
                log.error(e);
            }
        }
    }).start();
}

使用Thread类可以做最简单的定时任务,在run方法中有个while的死循环(当然还有其他方式),执行我们自己的任务。有个需要特别注意的地方是,需要用try...catch捕获异常,否则如果出现异常,就直接退出循环,下次将无法继续执行了。

但这种方式做的定时任务,只能周期性执行,不能支持定时在某个时间点执行。

特别提醒一下,该线程建议定义成守护线程,可以通过setDaemon方法设置,让它在后台默默执行就好。

使用场景:比如项目中有时需要每隔5分钟去下载某个文件,或者每隔10分钟去读取模板文件生成静态html页面等等,一些简单的周期性任务场景。

使用Thread类做定时任务的优缺点:

  • 优点:这种定时任务非常简单,学习成本低,容易入手,对于那些简单的周期性任务,是个不错的选择。
  • 缺点:不支持指定某个时间点执行任务,不支持延迟执行等操作,功能过于单一,无法应对一些较为复杂的场景。

因为为了尽可能满足延迟执行 和 在某个时间点执行任务,比如:如果用户下单后,超过30分钟还未完成支付,则系统自动将该订单取消。

这里需求就可以使用延迟定时任务实现。

ScheduledExecutorServiceJDK1.5+版本引进的定时任务,该类位于java.util.concurrent并发包下。

ScheduledExecutorService是基于多线程的,设计的初衷是为了解决Timer单线程执行,多个任务之间会互相影响的问题。

它主要包含4个方法:

  • schedule(Runnable command,long delay,TimeUnit unit),带延迟时间的调度,只执行一次,调度之后可通过Future.get()阻塞直至任务执行完毕。
  • schedule(Callable callable,long delay,TimeUnit unit),带延迟时间的调度,只执行一次,调度之后可通过Future.get()阻塞直至任务执行完毕,并且可以获取执行结果。
  • scheduleAtFixedRate,表示以固定频率执行的任务,如果当前任务耗时较多,超过定时周期period,则当前任务结束后会立即执行。
  • scheduleWithFixedDelay,表示以固定延时执行任务,延时是相对当前任务结束为起点计算开始时间。

实现这种定时任务的具体代码如下:

public class ScheduleExecutorTest {

    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            System.out.println("doSomething");
        },1000,1000, TimeUnit.MILLISECONDS);
    }
}

调用ScheduledExecutorService类的scheduleAtFixedRate方法实现周期性任务,每隔1秒钟执行一次,每次延迟1秒再执行。

当然也可以配置 定时执行任务,比如说如何让每周四 18:00:00 定时执行任务?

// 获得当前时间
        LocalDateTime now = LocalDateTime.now();
        // 获取本周四 18:00:00.000
        LocalDateTime thursday =
                		now.with(DayOfWeek.THURSDAY).withHour(18).withMinute(0).withSecond(0).withNano(0);
        // 如果当前时间已经超过 本周四 18:00:00.000, 那么找下周四 18:00:00.000
        if(now.compareTo(thursday) >= 0) {
            thursday = thursday.plusWeeks(1);
        }
        // 计算时间差,即延时执行时间
        long initialDelay = Duration.between(now, thursday).toMillis();
        // 计算间隔时间,即 1 周的毫秒值
        long oneWeek = 7 * 24 * 3600 * 1000;
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

        System.out.println("开始时间:" + new Date());
        executor.scheduleAtFixedRate(() -> {
            System.out.println("执行时间:" + new Date());
        }, initialDelay, oneWeek, TimeUnit.MILLISECONDS);

模拟监听器

有时候,我们需要写个监听器,去监听某些数据的变化。

比如:我们在使用canal的时候,需要监听binlog的变化,能够及时把数据库中的数据,同步到另外一个业务数据库中。

在这里插入图片描述

如果直接写一个监听器去监听数据就太没意思了,我们想实现这样一个功能:在配置中心有个开关,配置监听器是否开启,如果开启了使用单线程异步执行。

主要代码如下:

@Service
public CanalService {
    private volatile boolean running = false;
    private Thread thread;

    @Autowired
    private CanalConnector canalConnector;
    
    public void handle() {
        //连接canal
        while(running) {
           //业务处理
        }
    }
    
    public void start() {
       thread = new Thread(this::handle, "name");
       running = true;
       thread.start();
    }
    
    public void stop() {
       if(!running) {
          return;
       }
       running = false;
    }
}

在start方法中开启了一个线程,在该线程中异步执行handle方法的具体任务。然后通过调用stop方法,可以停止该线程。

其中,使用volatile关键字控制的running变量作为开关,它可以控制线程中的状态。

接下来,有个比较关键的点是:如何通过配置中心的配置,控制这个开关呢?

apollo配置为例,我们在配置中心的后台,修改配置之后,自动获取最新配置的核心代码如下:

public class CanalConfig {
    @Autowired
    private CanalService canalService;

    @ApolloConfigChangeListener
    public void change(ConfigChangeEvent event) {
        String value = event.getChange("test.canal.enable").getNewValue();
        if(BooleanUtils.toBoolean(value)) {
            canalService.start();
        } else {
            canalService.stop();
        }
    }
}

通过apolloApolloConfigChangeListener注解,可以监听配置参数的变化。

如果test.canal.enable开关配置的true,则调用canalService类的start方法开启canal数据同步功能。如果开关配置的false,则调用canalService类的stop方法,自动停止canal数据同步功能。

文件中转暂存数据

举个例子,在某些高并发的场景中,我们需要收集部分用户的日志(比如:用户登录的日志),写到数据库中,以便于做分析。

但由于项目中,还没有引入消息中间件,比如:kafkarocketmq等。

如果直接将日志同步写入数据库,可能会影响接口性能。

所以,大家很自然想到了异步处理。

实现这个需求最简单的做法是,开启一个线程,异步写入数据到数据库即可。

这样做,可以是可以。

但如果用户登录操作的耗时,比异步写入数据库的时间要少得多。这样导致的结果是:生产日志的速度,比消费日志的速度要快得多,最终的性能瓶颈在消费端。

其实,还有更优雅的处理方式,虽说没有使用消息中间件,但借用了它的思想。

这套记录登录日志的功能,分为:日志生产端、日志存储端和日志消费端。

如下图所示:

在这里插入图片描述

先定义了一个阻塞队列。

@Component
public class LoginLogQueue {
    private static final int QUEUE_MAX_SIZE    = 1000;

    private BlockingQueueblockingQueue queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);

    //生成消息
    public boolean push(LoginLog loginLog) {
        return this.queue.add(loginLog);
    } 

    //消费消息
    public LoginLog poll() {
        LoginLog loginLog = null;
        try {
            loginLog = this.queue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return result;
    }
}

然后定义了一个日志的生产者。

@Service
public class LoginSerivce {
    
    @Autowired
    private LoginLogQueue loginLogQueue;

    public int login(UserInfo userInfo) {
        //业务处理
        LoginLog loginLog = convert(userInfo);
        loginLogQueue.push(loginLog);
    }  
}

接下来,定义了日志的消费者。

@Service
public class LoginInfoConsumer {
    @Autowired
    private LoginLogQueue queue;

    @PostConstruct
    public voit init {
       new Thread(() -> {
          while (true) {
              LoginLog loginLog = queue.take();
              //写入数据库
          }
        }).start();
    }
}

当然,这个例子中使用单线程接收登录日志,为了提升性能,也可以使用线程池来处理业务逻辑(比如:写入数据库)等。

其实这种思想,总的来说,就是我们耗时的瓶颈是在数据库插入操作这里,有的时候哪怕我们使用批量操作,可能效果还不是很理想,那么就可以考虑别的方式,比如放入文件或者MQ中,可能会比直接插入效率高。下面再举一个例子。

比如说一个转账接口,如果是并发开启,10个并发度,每个批次1000笔转账明细数据,数据库插入会特别耗时,大概6秒左右;这个跟我们公司的数据库同步机制有关,并发情况下,因为优先保证同步,所以并行的插入变成串行啦,就很耗时。

数据库同步机制可能导致并行的插入变成串行的原因有很多,下面列举了一些可能的情况:

  • 锁竞争:当多个事务同时尝试向相同的数据页或数据行插入数据时,数据库系统可能会使用锁来确保数据的一致性。如果同步机制导致大量的锁竞争,那么并行插入操作可能会被迫等待其他事务释放锁,从而导致串行化。
  • 同步点阻塞:某些数据库同步机制可能会引入同步点,要求所有的写操作都必须在这些同步点进行同步,这样就会导致并行的写操作变成串行化。
  • 冲突检测与重试:在数据库同步的过程中,可能会发生数据冲突,系统需要检测并解决这些冲突。这种检测和解决过程可能会导致并行插入变成串行化,因为某些操作需要等待其他操作完成后才能执行。
  • 数据复制延迟:如果数据库采用了主从复制或者集群复制的机制,数据同步可能会引入一定的延迟。在这种情况下,并行的插入操作可能会因为数据尚未完全同步而变成串行化。

优化前1000笔明细转账数据,先落地DB数据库,返回处理中给用户,再异步转账。如图:

在这里插入图片描述

记得当时压测的时候,高并发情况,这1000笔明细入库,耗时都比较大。所以我转换了一下思路,把批量的明细转账记录保存的文件服务器,然后记录一笔转账总记录到数据库即可。接着异步再把明细下载下来,进行转账和明细入库。最后优化后,性能提升了十几倍

优化后,流程图如下:

在这里插入图片描述

模拟大数据导入解析

我们可能会经常收到运营同学提过来的excel数据导入需求,比如:将某一大类下的所有子类一次性导入系统,或者导入一批新的供应商数据等等。

我们以导入供应商数据为例,它所涉及的业务流程很长,比如:

  1. 调用天眼查接口校验企业名称和统一社会信用代码。
  2. 写入供应商基本表
  3. 写入组织表
  4. 给供应商自动创建一个用户
  5. 给该用户分配权限
  6. 自定义域名
  7. 发站内通知

如果在程序中,解析完excel,读取了所有数据之后。用单线程一条条处理业务逻辑,可能耗时会非常长。

为了提升excel数据导入效率,非常有必要使用多线程来处理。

当然在java中实现多线程的手段有很多种,下面重点聊聊java8中最简单的实现方式:parallelStream

伪代码如下:

supplierList.parallelStream().forEach(x -> importSupplier(x));

parallelStream是一个并行执行的流,它默认通过ForkJoinPool实现的,能提高你的多线程任务的速度。

ForkJoinPool处理的过程会分而治之,它的核心思想是:将一个大任务切分成多个小任务。每个小任务都能单独执行,最后它会把所用任务的执行结果进行汇总。

下面用一张图简单介绍一下ForkJoinPool的原理:

在这里插入图片描述

当然除了excel导入之外,还有类似的读取文本文件,也可以用类似的方法处理。

温馨的提醒一下,如果一次性导入的数据非常多,用多线程处理,可能会使系统的cpu使用率飙升,需要特别关注。

查询接口优化 - 串行改并行

假设我们设计一个APP首页的接口,它需要查用户信息、需要查banner信息、需要查弹窗信息等等。如果是串行一个一个查,比如查用户信息200ms,查banner信息100ms、查弹窗信息50ms,那一共就耗时350ms了,如果还查其他信息,那耗时就更大了。

在这里插入图片描述

其实我们可以改为并行调用,即查用户信息、查banner信息、查弹窗信息,可以同时并行发起

在这里插入图片描述

最后接口耗时将大大降低

public UserInfo getUserInfo(Long id) throws InterruptedException, ExecutionException {
    final UserInfo userInfo = new UserInfo();
    CompletableFuture userFuture = CompletableFuture.supplyAsync(() -> {
        getRemoteUserAndFill(id, userInfo);
        return Boolean.TRUE;
    }, executor);

    CompletableFuture bonusFuture = CompletableFuture.supplyAsync(() -> {
        getRemoteBonusAndFill(id, userInfo);
        return Boolean.TRUE;
    }, executor);

    CompletableFuture growthFuture = CompletableFuture.supplyAsync(() -> {
        getRemoteGrowthAndFill(id, userInfo);
        return Boolean.TRUE;
    }, executor);
    CompletableFuture.allOf(userFuture, bonusFuture, growthFuture).join();

    userFuture.get();
    bonusFuture.get();
    growthFuture.get();

    return userInfo;
}

以阿里云开发社区举例:

应急定位场景下,A系统调用B系统获取诊断结论,TR超时时间是500ms,对于一个异常ID事件,需要执行多个诊断项服务,并记录诊断流水;每个诊断的耗时大概在100ms以内,随着业务的增长,超过5个诊断项,计算耗时累加到500ms+,这时候服务会出现高峰期短暂不可用。

在这里插入图片描述

将这段代码改成异步执行,这样执行诊断的时间是耗时最大的诊断服务

// 提交future任务并发执行
futures = executor.invokeAll(tasks, timeout, timeUnit);
// 遍历读取结果
for (Future<Res> future : futures) {
    try {
        // 获取结果
        Res singleResult = future.get();
        if (singleResult != null) {
            result.add(singleResult);
        }
    } catch (Exception e) {
        LogUtil.error(e, logger, "并发执行发生异常!,poolName={0}.", threadPoolName);
    }
}

通过上面的两个场景举例,可以看出,实际上针对一些耗时较长的任务运行,适当地利用,可以达到加速的效果。但是凡事都是双刃剑,有利有弊。

线上对响应时间要求较高的场合尽量少用多线程,尤其是服务线程需要等待任务线程的场合(很多重大事故就是和这个息息相关),如果一定要用,可以对服务线程设置一个最大等待时间。

这句话的核心是在线上高响应时间的场景下,需要谨慎使用多线程,特别是当服务线程需要等待任务线程时。因为在多线程环境中,线程间的调度和同步可能会引入额外的等待时间,这可能导致响应时间增加,影响系统的性能

举个例子来说,假设我们有一个在线服务,它需要处理大量的用户请求。每个用户请求都会被分配到一个服务线程去处理。为了提高处理速度,每个服务线程可能会启动多个任务线程去并行执行一些计算密集型的任务。这种情况下,服务线程就需要等待所有的任务线程完成才能继续执行。

然而,由于操作系统的线程调度策略,任务线程可能并不会立即执行。此外,如果任务线程的数量超过了CPU的核心数,那么这些线程就需要在CPU核心之间进行切换,这也会引入额外的等待时间。这些都可能导致服务线程需要花费更多的时间等待任务线程,从而导致响应时间增加。

因此,这句话的建议是,在这种场景下,最好尽量少用多线程,或者至少要对服务线程设置一个最大等待时间,以防止服务线程无限期地等待任务线程。这样可以避免因为线程同步和调度问题导致的性能下降,保证在线服务的响应时间。

当然,这并不是说多线程就一定会导致性能下降。如果使用得当,多线程还是可以大大提高系统的性能的。但是在高响应时间的场景下,我们需要更加谨慎地使用多线程,以防止潜在的性能问题。

以Redis举例:

Redis 6.0 之后的版本开始选择性使用多线程模型。

Redis 选择使用单线程模型处理客户端的请求主要还是因为 CPU 不是 Redis 服务器的瓶颈,使用多线程模型带来的性能提升并不能抵消它带来的开发成本和维护成本,系统的性能瓶颈也主要在网络 I/O 操作上;

而 Redis 引入多线程操作也是出于性能上的考虑,对于一些大键值对的删除操作,通过多线程非阻塞地释放内存空间也能减少对 Redis 主线程阻塞的时间,提高执行的效率。

凡事不能有绝对,寻找到适中的平衡点最重要!

ThreadLocal

Mysql应用

场景构建

这里我们先构建一个简单的转账场景: 有一个数据表account,里面有两个用户Jack和Rose,用户Jack 给用户Rose 转账。

案例的实现主要用mysql数据库,JDBC 和 C3P0 框架。

在这里插入图片描述

dao层代码 : AccountDao

public class AccountDao {
public void out(String outUser, int money) throws SQLException {
    String sql = "update account set money = money - ? where name = ?";

    Connection conn = JdbcUtils.getConnection();
    PreparedStatement pstm = conn.prepareStatement(sql);
    pstm.setInt(1,money);
    pstm.setString(2,outUser);
    pstm.executeUpdate();

    JdbcUtils.release(pstm,conn);
}

public void in(String inUser, int money) throws SQLException {
    String sql = "update account set money = money + ? where name = ?";

    Connection conn = JdbcUtils.getConnection();
    PreparedStatement pstm = conn.prepareStatement(sql);
    pstm.setInt(1,money);
    pstm.setString(2,inUser);
    pstm.executeUpdate();

    JdbcUtils.release(pstm,conn);
}
} 

service层代码 : AccountService

public class AccountService {

    public boolean transfer(String outUser, String inUser, int money) {
        AccountDao ad = new AccountDao();
        try {
            // 转出
            ad.out(outUser, money);
            // 转入
            ad.in(inUser, money);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }
}

工具类 : JdbcUtils

public class JdbcUtils { 
public static void commitAndClose(Connection conn) {
    try {
        if(conn != null){
            //提交事务
            conn.commit();
            //释放连接
            conn.close();
        }
    } catch (SQLException e) {
        e.printStackTrace();
    }
}

public static void rollbackAndClose(Connection conn) {
    try {
        if(conn != null){
            //回滚事务
            conn.rollback();
            //释放连接
            conn.close();
        }
    } catch (SQLException e) {
        e.printStackTrace();
    }
}
} 
引入事务

案例中的转账涉及两个DML操作: 一个转出,一个转入。这些操作是需要具备原子性的,不可分割。不然就有可能出现数据修改异常情况。

public class AccountService {
    public boolean transfer(String outUser, String inUser, int money) {
        AccountDao ad = new AccountDao();
        try {
            // 转出
            ad.out(outUser, money);
            // 模拟转账过程中的异常
            int i = 1/0;
            // 转入
            ad.in(inUser, money);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }
}

所以这里就需要操作事务,来保证转出和转入操作具备原子性,要么同时成功,要么同时失败。

JDBC中关于事务的操作的api

Connection接口的方法作用
void setAutoCommit(false)禁用事务自动提交(改为手动)
void commit();提交事务
void rollback();回滚事务

开启事务的注意点:

  • 为了保证所有的操作在一个事务中,案例中使用的连接必须是同一个: service层开启事务的connection需要跟dao层访问数据库的connection保持一致
  • 线程并发情况下, 每个线程只能操作各自的 connection
常规解决方案
常规方案的实现

基于上面给出的前提, 大家通常想到的解决方案是 :

  • 传参: 从service层将connection对象向dao层传递
  • 加锁

以下是代码实现修改的部分:

AccountService 类

public class AccountService {
    public boolean transfer(String outUser, String inUser, int money) {
        AccountDao ad = new AccountDao();
        //线程并发情况下,为了保证每个线程使用各自的connection,故加锁
        synchronized (AccountService.class) {

            Connection conn = null;
            try {
                conn = JdbcUtils.getConnection();
                //开启事务
                conn.setAutoCommit(false);
                // 转出
                ad.out(conn, outUser, money);
                // 模拟转账过程中的异常
//            int i = 1/0;
                // 转入
                ad.in(conn, inUser, money);
                //事务提交
                JdbcUtils.commitAndClose(conn);
            } catch (Exception e) {
                e.printStackTrace();
                //事务回滚
                JdbcUtils.rollbackAndClose(conn);
                return false;
            }
            return true;
        }
    }
}

AccountDao 类 (这里需要注意的是: connection不能在dao层释放,要在service层,不然在dao层释放,service层就无法使用了)

public class AccountDao {
    public void out(Connection conn, String outUser, int money) throws SQLException{
        String sql = "update account set money = money - ? where name = ?";
        //注释从连接池获取连接的代码,使用从service中传递过来的connection
//        Connection conn = JdbcUtils.getConnection();
        PreparedStatement pstm = conn.prepareStatement(sql);
        pstm.setInt(1,money);
        pstm.setString(2,outUser);
        pstm.executeUpdate();
        //连接不能在这里释放,service层中还需要使用
//        JdbcUtils.release(pstm,conn);
        JdbcUtils.release(pstm);
    }

    public void in(Connection conn, String inUser, int money) throws SQLException {
        String sql = "update account set money = money + ? where name = ?";
//        Connection conn = JdbcUtils.getConnection();
        PreparedStatement pstm = conn.prepareStatement(sql);
        pstm.setInt(1,money);
        pstm.setString(2,inUser);
        pstm.executeUpdate();
//        JdbcUtils.release(pstm,conn);
        JdbcUtils.release(pstm);
    }
}
常规方案的弊端

上述方式我们看到的确按要求解决了问题,但是仔细观察,会发现这样实现的弊端:

  • 直接从service层传递connection到dao层, 造成代码耦合度提高
  • 加锁会造成线程失去并发性,程序性能降低
ThreadLocal方案的实现

像这种需要在项目中进行数据传递线程隔离的场景,我们不妨用ThreadLocal来解决:

工具类的修改: 加入ThreadLocal

public class JdbcUtils {
    //ThreadLocal对象 : 将connection绑定在当前线程中
    private static final ThreadLocal<Connection> tl = new ThreadLocal();

    // c3p0 数据库连接池对象属性
    private static final ComboPooledDataSource ds = new ComboPooledDataSource();

    // 获取连接
    public static Connection getConnection() throws SQLException {
        //取出当前线程绑定的connection对象
        Connection conn = tl.get();
        if (conn == null) {
            //如果没有,则从连接池中取出
            conn = ds.getConnection();
            //再将connection对象绑定到当前线程中
            tl.set(conn);
        }
        return conn;
    }

    //释放资源
    public static void release(AutoCloseable... ios) {
        for (AutoCloseable io : ios) {
            if (io != null) {
                try {
                    io.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void commitAndClose() {
        try {
            Connection conn = getConnection();
            //提交事务
            conn.commit();
            //解除绑定 及时释放
            tl.remove();
            //释放连接
            conn.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public static void rollbackAndClose() {
        try {
            Connection conn = getConnection();
            //回滚事务
            conn.rollback();
            //解除绑定 及时释放
            tl.remove();
            //释放连接
            conn.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

AccountService类的修改:不需要传递connection对象

public class AccountService {
    public boolean transfer(String outUser, String inUser, int money) {
        AccountDao ad = new AccountDao();

        try {
            Connection conn = JdbcUtils.getConnection();
            //开启事务
            conn.setAutoCommit(false);
            // 转出 : 这里不需要传参了 !
            ad.out(outUser, money);
            // 模拟转账过程中的异常
//            int i = 1 / 0;
            // 转入
            ad.in(inUser, money);
            //事务提交
            JdbcUtils.commitAndClose();
        } catch (Exception e) {
            e.printStackTrace();
            //事务回滚
           JdbcUtils.rollbackAndClose();
            return false;
        }
        return true;
    }
}

AccountDao类的修改:照常使用

TraceId日志 - 传递参数

使用 MDC 传递参数!

MDC是什么?

MDCorg.slf4j包下的一个类,它的全称是Mapped Diagnostic Context,我们可以认为它是一个线程安全的存放诊断日志的容器。

MDC的底层是用了ThreadLocal来保存数据的。

例如现在有这样一种场景:我们使用RestTemplate调用远程接口时,有时需要在header中传递信息,比如:traceId,source等,便于在查询日志时能够串联一次完整的请求链路,快速定位问题。

这种业务场景就能通过ClientHttpRequestInterceptor接口实现,具体做法如下:

第一步,定义一个LogFilter拦截所有接口请求,在MDC中设置traceId:

public class LogFilter implements Filter {
    @Override
    public void init(FilterConfig filterConfig) throws ServletException {
    }

    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        MdcUtil.add(UUID.randomUUID().toString());
        System.out.println("记录请求日志");
        chain.doFilter(request, response);
        System.out.println("记录响应日志");
    }

    @Override
    public void destroy() {
    }
}

第二步,实现ClientHttpRequestInterceptor接口,MDC中获取当前请求的traceId,然后设置到header中:

实现ClientHttpRequestInterceptor接口是指创建一个类,并让该类实现org.springframework.http.client.ClientHttpRequestInterceptor接口。这是Spring框架中用于拦截客户端发起的HTTP请求的接口。

通过实现ClientHttpRequestInterceptor接口,你可以在发起HTTP请求之前或之后对请求进行拦截和处理。这样的拦截器通常被用于添加、修改或者记录HTTP请求的头部信息、请求体内容等。在实际应用中,这个功能可以被用来实现诸如鉴权、日志记录、统一添加请求头、异常处理等功能。

public class RestTemplateInterceptor implements ClientHttpRequestInterceptor {

    @Override
    public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
        request.getHeaders().set("traceId", MdcUtil.get());
        return execution.execute(request, body);
    }
}

第三步,定义配置类,配置上面定义的RestTemplateInterceptor类:

@Configuration
public class RestTemplateConfiguration {

    @Bean
    public RestTemplate restTemplate() {
        RestTemplate restTemplate = new RestTemplate();
        restTemplate.setInterceptors(Collections.singletonList(restTemplateInterceptor()));
        return restTemplate;
    }

    @Bean
    public RestTemplateInterceptor restTemplateInterceptor() {
        return new RestTemplateInterceptor();
    }
}

其中MdcUtil其实是利用MDC工具在ThreadLocal中存储和获取traceId

public class MdcUtil {

    private static final String TRACE_ID = "TRACE_ID";

    public static String get() {
        return MDC.get(TRACE_ID);
    }

    public static void add(String value) {
        MDC.put(TRACE_ID, value);
    }
}

当然,这个例子中没有演示MdcUtil类的add方法具体调的地方,我们可以在filter中执行接口方法之前,生成traceId,调用MdcUtil类的add方法添加到MDC中,然后在同一个请求的其他地方就能通过MdcUtil类的get方法获取到该traceId。

能使用MDC保存traceId等参数的根本原因是,用户请求到应用服务器,Tomcat会从线程池中分配一个线程去处理该请求。

那么该请求的整个过程中,保存到MDCThreadLocal中的参数,也是该线程独享的,所以不会有线程安全问题。

过滤器 和 拦截器详解

过滤器与拦截器相同点
  • 拦截器与过滤器都是体现了AOP的思想,对方法实现增强,都可以拦截请求方法。
  • 拦截器和过滤器都可以通过Order注解设定执行顺序
过滤器与拦截器区别

在Java Web开发中,过滤器(Filter)和拦截器(Interceptor)都是常见的用于在请求和响应之间进行处理的组件。它们的主要区别如下:

  • 运行位置不同:过滤器是运行在Web服务器和Servlet容器之间的组件,可以拦截所有进出该容器的请求和响应(包括静态资源);而拦截器则是针对具体的控制器方法进行拦截处理的,只在控制器内部执行
  • 执行顺序不同:过滤器的执行顺序是由其在web.xml文件中声明的顺序决定的,按照声明的顺序依次执行;而拦截器的执行顺序是根据其在配置文件中的声明顺序决定的。
  • 功能不同:过滤器主要用于对请求进行预处理和过滤,例如设置字符集、登录验证、日志记录等操作;而拦截器则主要用于对请求进行流程控制,例如权限验证、参数注入、异常处理等操作。
  • 依赖框架不同:过滤器是基于Servlet规范实现的,不依赖任何特定的框架;而拦截器则通常是针对特定的框架或库实现的,例如Spring MVC框架中的拦截器。

综上所述,过滤器和拦截器在实现方式、功能和使用场景等方面都有一定的差异,开发者可以根据具体需求选择适合的组件。

在这里插入图片描述

过滤器 与 拦截器经典问题?
1.过滤器和拦截器的区别是什么?

过滤器(Filter)是在Servlet容器中用于对请求进行预处理和过滤的组件,可以实现过滤、验证、压缩等功能。而拦截器(Interceptor)是在Spring MVC框架中用于对请求进行拦截和处理的组件,可以实现权限验证、日志记录、异常处理等功能。过滤器是在Servlet容器中执行的,而拦截器是在Spring MVC框架中执行的。

2.过滤器和拦截器的执行顺序是怎样的?

在Java Web应用程序中,过滤器和拦截器的执行顺序都是由它们在配置文件中的声明顺序决定的。一般来说,先声明的过滤器或拦截器会先执行,后声明的过滤器或拦截器会后执行。

3.过滤器和拦截器的作用有哪些?

过滤器和拦截器都可以对请求进行处理和控制,实现一系列的功能,例如请求过滤、身份验证、数据加密、日志记录等。过滤器主要用于对请求进行预处理和过滤操作,而拦截器主要用于对请求进行拦截处理,在控制器方法执行之前或之后进行拦截和处理。

4.过滤器和拦截器的使用场景有哪些?

过滤器和拦截器都可以用于实现一系列的控制和管理功能。例如,过滤器可以用于身份验证、数据加密和解密、请求过滤和压缩等场景;而拦截器可以用于权限验证、日志记录、异常处理等场景。

5.如何在Java Web应用程序中使用过滤器和拦截器?

在Java Web应用程序中,要使用过滤器和拦截器,需要在配置文件中进行声明和注册。对于过滤器,可以通过在web.xml文件中添加和标签来完成;对于拦截器,可以通过在Spring MVC配置文件中添加mvc:interceptors标签来完成。同时,在声明和注册过滤器和拦截器时,还需要指定其执行顺序以及拦截路径等相关信息。

TransmittableThreadLocal

观察下面的代码请你判断代码的输出:

public class TestCase1 {

    private static ThreadLocal<String> threadLocal = new ThreadLocal<>();

    public static void main(String[] args) {
        case1();
    }

    public static void case1() {

        ExecutorService executorService = Executors.newFixedThreadPool(1);
        threadLocal.set("Hello");

        Runnable task1 = new Runnable() {
            @Override
            public void run() {
                System.out.println("ThreadLocal value in task1: " + threadLocal.get());
                threadLocal.set("Task1");
            }
        };

        Runnable task2 = new Runnable() {
            @Override
            public void run() {
                System.out.println("ThreadLocal value in task2: " + threadLocal.get());
                threadLocal.set("Task2");
            }
        };

        executorService.submit(task1);
        sleep(100);
        executorService.submit(task2);
        sleep(100);

        System.out.println("ThreadLocal value in mainThread: " + threadLocal.get());

        executorService.shutdown();
    }

    public static void sleep(int val){
        try {
            Thread.sleep(val);
        } catch (InterruptedException ignored) {
        }
    }

}

分析这段代码的输出并不难,实际输出如下:

ThreadLocal value in task1: null
ThreadLocal value in task2: Task1
ThreadLocal value in mainThread: Hello

因为我们线程池中只有一个线程,当第一个任务执行完成之后,这个线程池的线程的ThreadLocal便设置上了Task1,之后第二个任务执行时获取到的ThreadLocal中的值便是Task1,但是主线程和子线程是不同的线程,所以无论子线程如何修改ThreadLocal的内容对主线程都是无影响的。

线程池场景下ThreadLocal的值传递问题

有这么一个需求,用户登录之后,在所有的请求接口中,通过某个公共方法,就能获取到当前登录用户的信息?

获取的用户上下文,我们以CurrentUser为例。

CurrentUser内部包含了一个ThreadLocal对象,它负责保存当前线程的用户上下文信息。当然为了保证在线程池中,也能从用户上下文中获取到正确的用户信息,这里用了阿里的TransmittableThreadLocal。伪代码如下:

@Data
public class CurrentUser {
    private static final TransmittableThreadLocal<CurrentUser> THREA_LOCAL = new TransmittableThreadLocal<>();
    
    private String id;
    private String userName;
    private String password;
    private String phone;
    ...
    
    public statis void set(CurrentUser user) {
      THREA_LOCAL.set(user);
    }
    
    public static void getCurrent() {
      return THREA_LOCAL.get();
    }
}

这里为什么用了阿里的TransmittableThreadLocal,而不是普通的ThreadLocal呢?在线程池中,由于线程会被多次复用,导致从普通的ThreadLocal中无法获取正确的用户信息。父线程中的参数,没法传递给子线程,而TransmittableThreadLocal很好解决了这个问题。

再换一个例子来详细讲解!

ThreadLocal的值传递需求,往往发生在需要标识一条链路的情景下,例如我们查找日志时,往往携带一个TraceId去查找当时的这次请求链路下的所有日志,但是如果我们的业务代码中使用了线程池,如果不做处理你会发现这个线程池中执行任务的线程打印的日志的TraceId和我们搜索的请求的TraceId并不相同,所以为了定位问题方便我们往往需要保证一个请求的TraceId在异步任务中继续保持一致性,这就涉及了ThreadLocal的值传递。

那么常见的解决该办法的思路呢?

复写submit或者execute方法

一个比较朴素的想法是,在线程池执行任务的时候,把需要传递的值注入进去,因为投放任务的时候是“主线程”做的事情,执行任务是子线程执行的。所以可以这样简单实现:

public class TraceIdTransmitThreadPool extends ThreadPoolTaskExecutor {

    @Override
    public void execute(Runnable task) {
        String traceId = getTraceIdFromContext();
        super.execute(()->{
            // 其实本质上就是 使用ThreadLocalUtils类将traceId设置到线程的ThreadLocal变量中。
            ThreadLocalUtils.set(traceId);
            try{
                task.run();
            }finally {
                ThreadLocalUtils.clear(traceId);
            }
        });
    }

    private String getTraceIdFromContext() {
        return ThreadLocalUtils.get();
    }
}
利用InheritableThreadLocals

Thread类中除了threadLocals还有个inheritableThreadLocals 这个ThreadLocalMap的局部变量,这个东西实际作用是什么呢?实际作用是在子线程创建的时候,父线程会把threadLocal拷贝到子线程中。下面我们用一个例子来解释下这个东西的作用

ThreadLocal<String> local = new InheritableThreadLocal<>();
//ThreadLocal<String> local = new ThreadLocal<>();
local.set("hello");
new Thread(() -> {
    // 仅使用ThreadLocal 这里将取到NUll值
    // 使用InheritableThreadLocal 这里将取到主线程设置的线程局部变量
    System.out.println("子线程:" + local.get());
}).start();

sleep(1000)

上面的代码输出为

子线程:hello

可以看出来确实主线程中设置的值被带进到子线程中了。下面简单分析下原理,翻开new Thread的构造方法源码时我们会找到下面这行代码:

if (inheritThreadLocals && parent.inheritableThreadLocals != null)
    this.inheritableThreadLocals =
        ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);

然后在InheritableThreadLocal类的实现源码中发现其最主要的就是复写了getMap的实现

ThreadLocalMap getMap(Thread t) {
   return t.inheritableThreadLocals;
}

所以能进行值传递的原因很简单,就是会把父进程的inheritableThreadLocals 进行值拷贝,然后get/set方法在取值的时候不再从Thread类的threadLocals中取值,而是从inheritableThreadLocals, 但是我们线程池这种环境下面核心线程一般不会频繁的反复销毁重新创建,所以这种方案其实并不适合线程池的环境,此外可能是jdk官方也觉得这种方式设计的不好在jdk9之后就直接拿掉了这个inheritableThreadLocals局部变量

利用TransmittableThreadLocal

TransmittableThreadLocal是一个开源项目,gitee地址位于: gitee.com/mirrors/tra…

在本文开头的时候举了一个例子,现在我们将ThreadLocal更改为TransmittbaleThreadLocal,就可以直接体会到两者的区别,代码如下:

public static void case1() {

    //ThreadLocal<String> threadLocal = new ThreadLocal<>();
    TransmittableThreadLocal<String> threadLocal = new TransmittableThreadLocal<>();
    ExecutorService executorService = Executors.newFixedThreadPool(1);
    // 这里是核心
    executorService = TtlExecutors.getTtlExecutorService(executorService);

    Runnable task1 = () -> {
        System.out.println("ThreadLocal value in task1: " + threadLocal.get());
        threadLocal.set("Task1");
    };

    Runnable task2 = () -> {
        System.out.println("ThreadLocal value in task2: " + threadLocal.get());
        threadLocal.set("Task2");
    };

    threadLocal.set("Hello");

    executorService.submit(task1);
    sleep(100);
    executorService.submit(task2);

    System.out.println("ThreadLocal value in mainThread: " + threadLocal.get());

    executorService.shutdown();
}

最终代码运行的时候如下:

ThreadLocal value in task1: Hello
ThreadLocal value in mainThread: Hello
ThreadLocal value in task2: Hello

从代码的运行结果可以看出子线程和主线程的线程局部变量的实现了统一,并且很神奇的一点是线程1中执行第一个任务之后对线程局部变量做了修改,丝毫不影响这个线程在执行第二个任务中线程局部变量的值,在执行第二个任务的时候仍然可以取到父线程中的值

那么这个究竟是怎么实现的呢?其实主要就是上面的代码中 executorService = TtlExecutors.getTtlExecutorService(executorService); 这行代码进行了装饰作用

其实TransmittbaleThreadLocal(简称TTL)的源码设计就是一个装饰者设计模式的典型范例
任务修饰:使用TtlRunnableTtlCallable来修饰传入线程池的RunnableCallable
线程池修饰:使用 getTtlExecutorService来包装和修饰接口ExecutorService

我们先从TtlRunnable类开始进行分析,核心也就是看下run方法怎么实现的(这属于框架的基准内容比较重要)

public void run() {
    /**
     * capturedRef就是主线程传递下来的ThreadLocal的值。
     */
    Object captured = capturedRef.get();
    if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
        throw new IllegalStateException("TTL value reference is released after run!");
    }
    /**
     * 1. backup(备份)是子线程已经存在的ThreadLocal变量;
     * 这也是做到了上面说的一个线程执行两次任务,从父线程中拿到的局部变量值也不会互相影响的关键
     * 2. 将captured的ThreadLocal值在子线程中set进去;
     */
    Object backup = replay(captured);
    try {
        // 修饰的目标
        runnable.run();
    } finally {
        /**
         *  在子线程任务中,ThreadLocal可能发生变化,该步骤的目的是
         *  回滚{@code runnable.run()}进入前的ThreadLocal的线程
         */
        restore(backup);
    }
}

从上面的代码来看最重要的就是要知道captured这个变量的值到底是怎么get出来的,首先我们要知道从继承路线来看TransmittableThreadLocal 继承了InheritableThreadLocal所以自然有InheritableThreadLocal的全部能力

在这里插入图片描述

captured这个变量实际上是从这个capture方法返回的,这个方法返回的快照然后会被传递到replay方法中进行应用。

源码位置:com.alibaba.ttl.TransmittableThreadLocal.Transmitter#capture
@NonNull
public static Object capture() {
    return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}

这个Snapshot看名字就知道是个快照,这个快照到底怎么实现的呢

// 抓取 TransmittableThreadLocal 的快照
private static WeakHashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
    WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
    // 从 TransmittableThreadLocal 的 holder 中,遍历所有有值的 TransmittableThreadLocal,将 TransmittableThreadLocal 取出和值复制到 Map 中。
    for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
        ttl2Value.put(threadLocal, threadLocal.copyValue());
    }
    return ttl2Value;
}

//  抓取注册的 ThreadLocal。
private static WeakHashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
    final WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value = new WeakHashMap<ThreadLocal<Object>, Object>();
    // 从 threadLocalHolder 中,遍历注册的 ThreadLocal,将 ThreadLocal 和 TtlCopier 取出,将值复制到 Map 中。
    for (Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry : threadLocalHolder.entrySet()) {
        final ThreadLocal<Object> threadLocal = entry.getKey();
        final TtlCopier<Object> copier = entry.getValue();

        threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));
    }
    return threadLocal2Value;
}

上面源码的注释中提到了“注册”过程,这个注册行为则发生在TransmittableThreadLocal的get/set方法内部实现中。 现在我们有了快照,但是我们怎么将快照中的数据内容传递到子线程中呢 这就是TtlRunnable类中run方法中调用的replay方法所做的事情了。其实仔细看源码就会知道这个方法的核心目标就是要把快照中的数据给设置到当前线程的上下文中,这样你在子线程中调用get方法才能取到对应的值。

@NonNull
public static Object replay(@NonNull Object captured) {
    final Snapshot capturedSnapshot = (Snapshot) captured;
    return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
}

// 重播 TransmittableThreadLocal,并保存执行线程的原值
@NonNull
private static WeakHashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> captured) {
    WeakHashMap<TransmittableThreadLocal<Object>, Object> backup = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
  
    for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
        TransmittableThreadLocal<Object> threadLocal = iterator.next();
        backup.put(threadLocal, threadLocal.get());
        if (!captured.containsKey(threadLocal)) {
            iterator.remove();
            threadLocal.superRemove();
        }
    }

    // 关键: 将 map 中的值,设置到 ThreadLocal 中。
    setTtlValuesTo(captured);

    // TransmittableThreadLocal 的回调方法,在任务执行前执行。
    doExecuteCallback(true);

    return backup;
}

所以总结下就是 get/set 方法中完成了TransmittableThreadLocal的注册,然后在执行run方法的时候通过TtlRunnable进行了方法包装,在调用之前进行快照形成,并应用快照到当前线程中,最后在线程执行结束之后,run方法内部对线程局部变量做的修改则会被还原,这也是本节举例中最后三次打印都是一个结果的主要原因

所以TransmittableThreadLocal 就比较适合在多线程环境下作为线程局部变量进行类似traceId这样的参数的传参,此外TransmittableThreadLocal 还支持javaAgent方式启动,这样就不需要在代码中显式的去包装线程池了。

java -javaagent:path/to/transmittable-thread-local-2.x.y.jar \
    -cp classes \
    com.alibaba.demo.ttl.agent.AgentDemo

模拟高并发

有时候我们写的接口,在低并发的场景下,一点问题都没有。

但如果一旦出现高并发调用,该接口可能会出现一些意想不到的问题。

为了防止类似的事情发生,一般在项目上线前,我们非常有必要对接口做一下压力测试

当然,现在已经有比较成熟的压力测试工具,比如:JmeterLoadRunner等。

如果你觉得下载压测工具比较麻烦,也可以手写一个简单的模拟并发操作的工具,用CountDownLatch就能实现,例如:

public static void concurrenceTest() {
    /**
     * 模拟高并发情况代码
     */
    final AtomicInteger atomicInteger = new AtomicInteger(0);
    final CountDownLatch countDownLatch = new CountDownLatch(1000); // 相当于计数器,当所有都准备好了,再一起执行,模仿多并发,保证并发量
    final CountDownLatch countDownLatch2 = new CountDownLatch(1000); // 保证所有线程执行完了再打印atomicInteger的值
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    try {
        for (int i = 0; i < 1000; i++) {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        countDownLatch.await(); //一直阻塞当前线程,直到计时器的值为0,保证同时并发
                    } catch (InterruptedException e) {
                        log.error(e.getMessage(),e);
                    }
                    //每个线程增加1000次,每次加1
                    for (int j = 0; j < 1000; j++) {
                        atomicInteger.incrementAndGet();
                    }
                    countDownLatch2.countDown();
                }
            });
            countDownLatch.countDown();
        }

        countDownLatch2.await();// 保证所有线程执行完
        executorService.shutdown();
    } catch (Exception e){
        log.error(e.getMessage(),e);
    }
}

处理消息队列消息

在高并发的场景中,消息积压问题,可以说如影随形,真的没办法从根本上解决。表面上看,已经解决了,但后面不知道什么时候,就会冒出一次。

参考 《苏三说技术》所举的实际情况

有天下午,产品过来说:有几个商户投诉过来了,他们说菜品有延迟,快查一下原因。

这次问题出现得有点奇怪。

为什么这么说?

首先这个时间点就有点奇怪,平常出问题,不都是中午或者晚上用餐高峰期吗?怎么这次问题出现在下午?

根据以往积累的经验,我直接看了kafkatopic的数据,果然上面消息有积压,但这次每个partition都积压了十几万的消息没有消费,比以往加压的消息数量增加了几百倍。这次消息积压得极不寻常。

我赶紧查服务监控看看消费者挂了没,还好没挂。又查服务日志没有发现异常。这时我有点迷茫,碰运气问了问订单组下午发生了什么事情没?他们说下午有个促销活动,跑了一个JOB批量更新过有些商户的订单信息。

这时,我一下子如梦初醒,是他们在JOB中批量发消息导致的问题。怎么没有通知我们呢?实在太坑了。

虽说知道问题的原因了,倒是眼前积压的这十几万的消息该如何处理呢?

此时,如果直接调大partition数量是不行的,历史消息已经存储到4个固定的partition,只有新增的消息才会到新的partition。我们重点需要处理的是已有的partition。

直接加服务节点也不行,因为kafka允许同组的多个partition被一个consumer消费,但不允许一个partition被同组的多个consumer消费,可能会造成资源浪费。

解释:

在Kafka中,一个消费者组(Consumer Group)可以消费同一个主题(Topic)下的多个分区(Partition),但是同一个分区不能被多个消费者组消费。这是因为在Kafka中,分区是消费者组之间的数据分片单元,每个分区只能被一个消费者组的一个消费者实例消费,以保证数据的一致性和顺序性。如果一个分区被多个消费者组消费,可能会导致以下问题:

  • 资源浪费:多个消费者实例会竞争消费同一个分区的消息,可能会导致资源浪费,如CPU、内存等。
  • 数据一致性:如果多个消费者实例同时消费同一个分区的消息,可能会导致数据的不一致性。例如,如果一个消费者实例在处理消息时出现故障,其他消费者实例可能会重复处理相同的消息,从而导致数据不一致。
  • 性能问题:多个消费者实例同时消费同一个分区的消息,可能会导致网络和磁盘I/O负载增加,从而影响整体性能。

因此,为了保证Kafka系统的稳定性和性能,不建议将同一个分区分配给多个消费者组。在实际应用中,可以根据实际需求和消费者组的数量,合理调整分区数量,以提高系统的并发处理能力和负载能力。

看来只有用多线程处理了。

为了紧急解决问题,我改成了用线程池处理消息,核心线程和最大线程数都配置成了50

大致用法如下:

  • 先定义一个线程池:
@Configuration
public class ThreadPoolConfig {

    @Value("${thread.pool.corePoolSize:5}")
    private int corePoolSize;

    @Value("${thread.pool.maxPoolSize:10}")
    private int maxPoolSize;

    @Value("${thread.pool.queueCapacity:200}")
    private int queueCapacity;

    @Value("${thread.pool.keepAliveSeconds:30}")
    private int keepAliveSeconds;

    @Value("${thread.pool.threadNamePrefix:ASYNC_}")
    private String threadNamePrefix;

    @Bean("messageExecutor")
    public Executor messageExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAliveSeconds);
        executor.setThreadNamePrefix(threadNamePrefix);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}
  • 再定义一个消息的consumer:
@Service
public class MyConsumerService {
    @Autowired
    private Executor messageExecutor;
    
    @KafkaListener(id="test",topics={"topic-test"})
    public void listen(String message){
        System.out.println("收到消息:" + message);
        messageExecutor.submit(new MyWork(message);
    }
}
  • 在定义的Runable实现类中处理业务逻辑:
public class MyWork implements Runnable {
    private String message;
    
    public MyWork(String message) {
       this.message = message;
    }

    @Override
    public void run() {
        System.out.println(message);
    }
}

果然,调整之后消息积压数量确实下降的非常快,大约半小时后,积压的消息就非常顺利的处理完了。

统计数量

剑指JUC原理-10.并发编程大师的原子累加器底层优化原理(与人类的优秀灵魂对话)-CSDN博客

在这里只着重介绍一下 longadder 的 sum操作。

当我们最终获取计数器值时,我们可以使用LongAdder.longValue()方法,其内部就是使用sum方法来汇总数据的。

java.util.concurrent.atomic.LongAdder.sum():

public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

实现很简单,遍历cells数组中的值,然后累加.

AtomicLong可以弃用了吗?

看上去LongAdder的性能全面超越了AtomicLong,而且阿里巴巴开发手册也提及到 推荐使用 LongAdder 对象,比 AtomicLong 性能更好(减少乐观 锁的重试次数),但是我们真的就可以舍弃掉LongAdder了吗?

当然不是,我们需要看场景来使用,如果是并发不太高的系统,使用AtomicLong可能会更好一些,而且内存需求也会小一些。

我们看过sum()方法后可以知道LongAdder在统计的时候如果有并发更新,可能导致统计的数据有误差。

而在高并发统计计数的场景下,才更适合使用LongAdder

总结

LongAdder中最核心的思想就是利用空间来换时间,将热点value分散成一个Cell列表来承接并发的CAS,以此来提升性能。

LongAdder的原理及实现都很简单,但其设计的思想值得我们品味和学习。

参考文章

[1] 聊聊并发编程的12种业务场景 - 掘金 (juejin.cn)

[2] 后端接口性能优化分析-程序结构优化-CSDN博客

[3] 剑指JUC原理-15.ThreadLocal-CSDN博客

[4] 剑指JUC原理-13.线程池-CSDN博客

[5] ThreadLocal项目实战-TraceId日志 - 掘金 (juejin.cn)

[6] 一文读懂Java中的过滤器和拦截器:实例详解,逐步掌握 - 掘金 (juejin.cn)

[7] 从ThreadLocal到TransmittableThreadLocal,彻底学透ThreadLocal的设计 - 掘金 (juejin.cn)

[8] 比AtomicLong更优秀的LongAdder确定不来了解一下吗? - 掘金 (juejin.cn)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1224172.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CTF-PWN-tips

文章目录 overflowscanfgetreadstrcpystrcat Find string in gdbgdbgdb peda Binary ServiceFind specific function offset in libc手工自动 Find /bin/sh or sh in library手动自动 Leak stack addressFork problem in gdbSecret of a mysterious section - .tlsPredictable …

【Python数学练习1】

一、题目 中文描述&#xff1a; 给出正整数N&#xff0c;输出满足条件的数对(a,b)的个数&#xff0c;满足gcd(a,b)b, a,b < n 数学描述&#xff1a; 二、解法 解法1&#xff1a; 对应Python代码&#xff1a; def num_fact(n):num 0for i in range(1, n 1):if n % i …

Python程序打包指南:手把手教你一步步完成

最近感兴趣想将开发的项目转成Package&#xff0c;研究了一下相关文章&#xff0c;并且自己跑通了&#xff0c;走了一下弯路&#xff0c;这里记录一下如何打包一个简单的Python项目&#xff0c;展示如何添加必要的文件和结构来创建包&#xff0c;如何构建包&#xff0c;以及如何…

视频剪辑技巧:简单步骤,批量剪辑并随机分割视频

随着社交媒体平台的广泛普及和视频制作需求的急剧增加&#xff0c;视频剪辑已经成为了当今社会一项不可或缺的技能。然而&#xff0c;对于许多初学者来说&#xff0c;视频剪辑可能是一项令人望而生畏的复杂任务。可能会面临各种困难&#xff0c;如如何选择合适的软件和硬件、如…

栈和队列的初始化,插入,删除,销毁。

目录 题外话 顺序表和链表优缺点以及特点 一.栈的特点 二. 栈的操作 2.1初始化 2.2 栈的销毁 2.3 栈的插入 2.3 输出top 2.4 栈的删除 2.5 输出栈 题外话 顺序表和链表优缺点以及特点 特点&#xff1a;顺序表&#xff0c;逻辑地址物理地址。可以任意访问&#xff0c…

systemverilog:interface中端口方向、Clocking block的理解

1、interface中端口方向的理解 &#xff08;1&#xff09;从testbench的角度看&#xff0c;tb中信号的输入输出方向与interface中信号输入输出方向一致&#xff1a; &#xff08;2&#xff09;从DUT角度看&#xff0c;DUT中信号输入输出方向与interface中信号输入输出方向相反…

数据库的分库分表 详解

前言 一个系统随着用户量上升&#xff0c;产生的数据也越来越多&#xff0c;到达一定程度&#xff0c;数据库就会产生瓶颈。 首先单机数据库所能承载的连接数&#xff0c;io和吞吐量都是有限的&#xff0c;并发量上来数据库就渐渐顶不住了。 如果单表的数据量过大&#xff0…

腾讯云新用户优惠活动有哪些可以参加?腾讯云新人服务器优惠活动

腾讯云作为国内领先的云服务提供商&#xff0c;不仅为用户提供稳定可靠的云服务器&#xff0c;还为新用户带来了一系列的优惠活动和代金券&#xff0c;以降低购买成本&#xff0c;提高业务效益。在这里&#xff0c;我们将为您详细介绍腾讯云服务器的新人优惠活动及代金券&#…

JavaEE——简单认识HTML

文章目录 一、简单解释 HTML二、认识 HTML 的结构三、了解HTML中的相关标签1.注释标签2.标题标签3.段落标签 p4. 换行标签 br5.格式化标签6.图片标签解释 src解释 alt解释其他有关 img 标签的属性 7.超链接标签 a8.表格标签9.列表标签10.input 标签11. select 下拉菜单以及 div…

Flink1.17 DataStream API

目录 一.执行环境&#xff08;Execution Environment&#xff09; 1.1 创建执行环境 1.2 执行模式 1.3 触发程序执行 二.源算子&#xff08;Source&#xff09; 2.1 从集合中读取数据 2.2 从文件读取数据 2.3 从 RabbitMQ 中读取数据 2.4 从数据生成器读取数据 2.5 …

Vue3 toRef函数和toRefs函数

当我们在setup 中的以读取对象属性单独交出去时&#xff0c;我们会发现这样会丢失响应式&#xff1a; setup() {let person reactive({name: "张三",age: 18,job: {type: "前端",salary:10}})return {name: person.name,age: person.age,type: person.jo…

​软考-高级-系统架构设计师教程(清华第2版)【第17章 通信系统架构设计理论与实践(P614~646)-思维导图】​

软考-高级-系统架构设计师教程&#xff08;清华第2版&#xff09;【第17章 通信系统架构设计理论与实践&#xff08;P614~646&#xff09;-思维导图】 课本里章节里所有蓝色字体的思维导图

Egress Gateway

目录 文章目录 目录本节实战Egress Gateway访问外部服务1.Envoy 转发流量到外部服务2.控制对外部服务的访问3.直接访问外部服务总结 Egress 出口网关1.用 Egress gateway 发起 HTTP 请求2.用 Egress gateway 发起 HTTPS 请求 关于我最后 本节实战 实战名称&#x1f6a9; 实战&…

MAXScript实现简单的碰撞检测教程

在本教程中&#xff0c;我们将创建一个使轮子在地形上跟随的脚本。此脚本将没有任何UI。并且仅适用于特定对象。 因此&#xff0c;第一步是创建一个新的脚本。打开侦听器窗口&#xff0c;然后在文件菜单下选择“新建脚本…”。 我们首先需要创建与场景中的对象相对应的3个变量…

对分过层后的类进行可视化

变量是&#xff1a; std::vector<pcl::PointCloud<pcl::PointXYZRGB>::Ptr> clusters_k_upper std::vector<pcl::PointCloud<pcl::PointXYZRGB>::Ptr> clusters_k_lower std::vector<pcl::PointCloud<pcl::PointXYZRGB>::Ptr> clusters_un…

Win10关机设置里没有睡眠选项的解决方法

用户想给自己的Win10电脑设置睡眠模式&#xff0c;但是在关机设置里面找不到睡眠选项&#xff0c;导致自己不能顺利完成睡眠模式的设置。接下来小编给大家带来解决Win10关机设置里没有睡眠选项的简单方法&#xff0c;解决后用户就可以看到Win10电脑关机设置中有睡眠选项了。 Wi…

Scrum框架中的Sprint

上图就是sprint里要做的事。Sprint是scrum框架的核心&#xff0c;是所有的想法、主意转换为价值的地方。所有实现产品目标的必要工作都在sprint里完成&#xff0c;这些工作主要包括Sprint 计划&#xff08;Sprint planning&#xff09;、每日站会&#xff08;Daily Scrum&#…

Linux CentOS7 添加网卡

一台主机中安装多块网卡&#xff0c;有许多优势。可以实现多项功能。 为了学习网卡参数的设置&#xff0c;可以为主机添加多块网卡。与添加磁盘一样&#xff0c;要在VMware中设置。利用图形化方式或命令行查看或设置网卡。本文仅作一初步讨论。有关网络参数的设置不在讨论之列…

Linux系统编程学习 NO.9——git、gdb

前言 本篇文章简单介绍了Linux操作系统中两个实用的开发工具git版本控制器和gdb调试器。 git 什么是git&#xff1f; git是一款开源的分布式版本控制软件。它不仅具有网络功能&#xff0c;还是服务端与客户端一体的软件。它可以高效的处理程序项目中的版本管理。它是Linux内…