高并发幂等计数器的设计与实现

news2025/1/22 17:42:48

🌷🍁 博主猫头虎 带您 Go to New World.✨🍁
🦄 博客首页——猫头虎的博客🎐
🐳《面试题大全专栏》 文章图文并茂🦕生动形象🦖简单易学!欢迎大家来踩踩~🌺
🌊 《IDEA开发秘籍专栏》学会IDEA常用操作,工作效率翻倍~💐
🌊 《100天精通Golang(基础入门篇)》学会Golang语言,畅玩云原生,走遍大小厂~💐

🪁🍁 希望本文能够给您带来一定的帮助🌸文章粗浅,敬请批评指正!🍁🐥

文章目录

  • 高并发幂等计数器的设计与实现
    • 摘要
    • 引言
    • 问题描述:
    • 依赖组件
    • 实现思路
    • Go 代码示例
    • Java 代码示例
      • 简单代码:
      • 详细代码:
      • 思路解析
      • 优化问题
      • 解决方法
        • 解决方案一(不使用Redis):
        • 方案二: 使用Redis
    • Python 代码示例
    • 拓展:
      • 1. 大量请求同时到来
        • 扩展性:
        • 限流:
        • 缓存:
        • 异步处理:
      • 2. 合适的过期时间
      • 如果不用redis呢?
      • 1. 数据库唯一索引
      • 2. 应用内存
      • 3. 分布式锁
      • 4. 消息队列
      • 5. 文件系统
    • 总结
  • 原创声明

高并发幂等计数器的设计与实现

在这里插入图片描述

摘要

本文探讨了如何实现一个高并发、幂等的计数器服务,该服务用于处理外部的 inc 请求以增加特定视频的播放计数。考虑到网络延迟和重试等因素,该服务需要确保每个请求至少被处理一次,同时避免重复计数。我们使用了 MySQL 用于持久化存储计数数据,并用 Redis 进行幂等性检查。本文通过 Go、Java 和 Python 三种编程语言展示了具体的实现代码,并对核心逻辑进行了详细解释。Java 代码部分更是进行了全流程的展示,包括幂等性检查、数据库更新和已处理请求的记录。这样的设计不仅确保了高并发处理能力,还实现了请求的幂等性。

引言

在分布式系统中,高并发和幂等性是两个非常关键的问题。本文将探讨如何实现一个高并发、幂等的计数器服务。该服务接受外部的 inc 请求,用于增加特定视频的播放计数。由于网络延迟和请求重试等原因,多个相同或不同的 inc 请求可能并发到达服务。因此,服务需要确保每个请求至少被处理一次(at least once),同时避免重复计数。我们将使用 Go、Java 和 Python 来分别演示这一实现。

问题描述:

高并发幂等计数器题目
问题描述:
1.实现一个计数器服务
2.服务接收外部的 inc 请求,每个请求具有全局唯一 request id 和视频 id
3.因为网络和重试的原因,请求可能会重复的到达
4.时序上,多个重复的请求可能并发达到,两次重复请求之间的间隔不可预期
5.需要保证 at least once ,计数值不能丢失
6.可以依赖一些外部组件, mysql redis

依赖组件

  • MySQL: 用于持久化存储计数器的数据。
  • Redis: 用于高速缓存和临时存储已经接收到的 request id。

实现思路

  1. 接收请求: 使用 Web 框架接收 inc 请求,并提取其中的 request_idvideo_id
  2. 幂等检查: 使用 Redis 查询该 request_id,如果已存在,则该请求已被处理。
  3. 队列或缓存: 如果是新的 request_id,则将其存入 Redis,并进行数据库更新操作。
  4. 计数逻辑: 从 MySQL 中获取当前计数,然后加 1,并更新回数据库。

Go 代码示例

// 导入相应的包
import (
	"github.com/go-redis/redis/v8"
	"database/sql"
	// 其他必要的包
)

func incHandler(requestID string, videoID string) string {
	if isProcessed(requestID) {
		return "Already Processed"
	}

	// 更新数据库
	updateCounter(videoID)
	
	return "OK"
}

func isProcessed(requestID string) bool {
	// Redis 检查
	val, _ := redisClient.Get(ctx, requestID).Result()
	return val != ""
}

func updateCounter(videoID string) {
	// MySQL 更新
	// 省略具体实现
}

Java 代码示例

简单代码:

import redis.clients.jedis.Jedis;
import org.springframework.jdbc.core.JdbcTemplate;
// 其他必要的导入

@RestController
public class CounterController {

	@Autowired
	Jedis jedis;

	@Autowired
	JdbcTemplate jdbcTemplate;

	@RequestMapping("/inc")
	public String inc(@RequestParam String requestId, @RequestParam String videoId) {
		if (isProcessed(requestId)) {
			return "Already Processed";
		}

		// 更新数据库
		updateCounter(videoId);

		return "OK";
	}

	public boolean isProcessed(String requestId) {
		// Redis 检查
		return jedis.exists(requestId);
	}

	public void updateCounter(String videoId) {
		// MySQL 更新
		// 省略具体实现
	}
}

详细代码:


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.bind.annotation.*;
import redis.clients.jedis.Jedis;

@RestController
public class CounterController {

    @Autowired
    private Jedis jedis;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @GetMapping("/inc")
    public String inc(@RequestParam String requestId, @RequestParam String videoId) {
        // Step 1: 幂等性检查
        if (isProcessed(requestId)) {
            return "Already Processed";
        }

        // Step 2: 更新数据库
        if (updateCounter(videoId)) {
            // Step 3: 将 requestId 存入 Redis 以保证幂等性
            jedis.set(requestId, "true");
            return "OK";
        }
        
        return "Failed";
    }

    private boolean isProcessed(String requestId) {
        // 使用 Redis 检查 requestId 是否已处理
        return jedis.exists(requestId);
    }

    private boolean updateCounter(String videoId) {
        // 使用 JdbcTemplate 和 MySQL 更新计数器
        String query = "SELECT count FROM video_counter WHERE video_id = ?";
        Integer count = jdbcTemplate.queryForObject(query, new Object[]{videoId}, Integer.class);

        if (count == null) {
            // 如果视频尚未有计数,初始化为 1
            jdbcTemplate.update("INSERT INTO video_counter(video_id, count) VALUES(?, 1)", videoId);
        } else {
            // 如果视频已有计数,加 1
            jdbcTemplate.update("UPDATE video_counter SET count = ? WHERE video_id = ?", count + 1, videoId);
        }

        return true;
    }
}

思路解析

幂等性检查: 使用 Redis 进行了幂等性检查。如果请求已经被处理过(即在 Redis 中有记录),我们就直接返回。

数据库更新: 我们使用 Spring 的 JdbcTemplate 来与 MySQL 进行交互。如果这是一个全新的 video_id,我们将其添加到数据库并初始化计数为 1;否则,我们找到当前计数并加 1。

记录处理过的请求: 最后,我们将处理过的 request_id 添加到 Redis 中,以便进行未来的幂等性检查。

这样,我们就得到了一个高并发、幂等的计数器服务。

优化问题

如果两个重复的请求,进入到Step1 都没有查到,不是就都进入step2

解决方法

如果两个相同的 request_id 几乎同时到达,并且都通过了 Step 1 的幂等性检查,那么它们都会进入 Step 2,从而违反了我们想要的幂等性。

在不使用 Redis 或其他外部锁服务的情况下,这个问题变得更加复杂。一种可能的解决方案是使用数据库的原子操作和唯一约束来确保幂等性。具体来说,您可以使用数据库事务来解决这个问题。

解决方案一(不使用Redis):

  1. 创建一个新表(比如叫做 processed_requests)用于存储已经处理过的 request_id。该表有一个唯一约束在 request_id 上。

  2. 在处理一个新请求之前,尝试将 request_id 插入到 processed_requests 表中。如果插入成功,说明这是第一次处理该请求,然后再去更新视频的播放计数。如果插入失败(因为唯一约束),则说明该请求已经被处理过,直接返回。

  3. 这一切都应在一个数据库事务中完成,以确保操作的原子性。

这里是更新后的 Java 代码片段:

import org.springframework.transaction.annotation.Transactional;

// ...

@RestController
public class CounterController {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Transactional
    @GetMapping("/inc")
    public String inc(@RequestParam String requestId, @RequestParam String videoId) {
        // Step 1: 幂等性检查和插入
        try {
            jdbcTemplate.update("INSERT INTO processed_requests(request_id) VALUES(?)", requestId);
        } catch (DuplicateKeyException e) {
            return "Already Processed";
        }

        // Step 2: 更新数据库
        updateCounter(videoId);

        return "OK";
    }

    private void updateCounter(String videoId) {
        String query = "SELECT count FROM video_counter WHERE video_id = ?";
        Integer count = jdbcTemplate.queryForObject(query, new Object[]{videoId}, Integer.class);

        if (count == null) {
            jdbcTemplate.update("INSERT INTO video_counter(video_id, count) VALUES(?, 1)", videoId);
        } else {
            jdbcTemplate.update("UPDATE video_counter SET count = ? WHERE video_id = ?", count + 1, videoId);
        }
    }
}

这样,在并发环境中,由于数据库的唯一约束和事务,我们可以确保每个 request_id 只会被处理一次。这解决了您提到的问题。

方案二: 使用Redis

如果使用 Redis,一种常见的方法是利用 Redis 的原子操作来解决这个问题。具体来说,可以使用 SETNX (SET if Not eXists)命令来设置 request_id 的值,并检查操作是否成功。这个命令只有在键不存在时才会设置值,并返回一个标识(通常是 1 表示成功,0 表示失败)。因为 Redis 的这个操作是原子性的,所以即使有多个相同的请求几乎同时到达,也只有一个能够成功设置该 request_id

下面是 Java 代码中这部分逻辑的示例:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import redis.clients.jedis.Jedis;

@RestController
public class CounterController {

    @Autowired
    private Jedis jedis;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @GetMapping("/inc")
    public String inc(@RequestParam String requestId, @RequestParam String videoId) {
        // Step 1: 幂等性检查
        if (!setIfNotExists(requestId)) {
            return "Already Processed";
        }

        // Step 2: 更新数据库
        updateCounter(videoId);
        
        return "OK";
    }

    private boolean setIfNotExists(String requestId) {
        return jedis.setnx(requestId, "true") == 1;
    }

    private void updateCounter(String videoId) {
        String query = "SELECT count FROM video_counter WHERE video_id = ?";
        Integer count = jdbcTemplate.queryForObject(query, new Object[]{videoId}, Integer.class);

        if (count == null) {
            jdbcTemplate.update("INSERT INTO video_counter(video_id, count) VALUES(?, 1)", videoId);
        } else {
            jdbcTemplate.update("UPDATE video_counter SET count = ? WHERE video_id = ?", count + 1, videoId);
        }
    }
}

这里,setIfNotExists 方法使用 Redis 的 SETNX 命令来尝试设置一个 request_id。如果设置成功(返回 1),则继续更新数据库。如果设置失败(返回 0,表示该 request_id 已经存在),则直接返回 “Already Processed”,实现幂等性。

这样,在高并发环境下,即使多个相同的 request_id 几乎同时到达,也只有一个会被实际处理,其他的都会在 setIfNotExists 阶段被拦截,从而确保了幂等性。

Python 代码示例

from flask import Flask, request
import redis
import mysql.connector

app = Flask(__name__)
r = redis.Redis(host='localhost', port=6379, db=0)
conn = mysql.connector.connect(user='root', password='password', database='test')

@app.route('/inc', methods=['POST'])
def inc():
    request_id = request.form['request_id']
    video_id = request.form['video_id']

    if r.exists(request_id):
        return 'Already Processed'

    r.set(request_id, 1)
    
    cursor = conn.cursor()
    # 更新 MySQL
    # 省略具体实现

    return 'OK'

拓展:

面试官提问:1.我就是同一个时间段大量的请求到来了怎么办?
2.还有这个合适的过期时间怎么合适

1. 大量请求同时到来

当大量请求在同一时间段到来时,一般有以下几种应对方式:

扩展性:

  1. 横向扩展(水平扩展): 通过增加更多的服务器来分摊负载。
  2. 垂直扩展(纵向扩展): 通过增加单个服务器的计算资源(例如,CPU、RAM)。

限流:

  1. 令牌桶或漏桶算法: 控制进入到系统的请求速率。
  2. 优先级队列: 为不同类型的请求设置不同的优先级。

缓存:

  1. 读缓存: 对常读不变或少变的数据进行缓存。
  2. 写缓存(Write-Through、Write-Back): 缓存写操作,批量或异步地更新到数据库。

异步处理:

  1. 消息队列: 使用如 RabbitMQ、Kafka 等消息队列服务将处理请求和更新数据库的操作解耦。

2. 合适的过期时间

设置合适的过期时间主要依赖于业务需求和系统资源。以下几点可能有助于确定一个合适的过期时间:

  1. 业务逻辑: 如果请求是短暂的(比如秒杀活动),那么短的过期时间可能更合适。

  2. 资源限制: 如果 Redis 或其他缓存资源有限,较短的过期时间可以更快地回收资源。

  3. 数据一致性: 长的过期时间可能导致数据不一致,短的过期时间虽然可以减少这种风险,但会增加系统的复杂性。

  4. 网络延迟和重试策略: 如果网络延迟和重试可能导致长时间的重复请求,应设置相对长一些的过期时间。

综合以上因素,过期时间的设置通常需要在性能和准确性之间找到一个平衡点。这通常需要通过实验和性能测试来不断调整和优化。

如果不用redis呢?

如果不使用 Redis,我们有几种其他方法也可以实现高并发和幂等性。

1. 数据库唯一索引

可以在数据库中为 request_id 设置唯一索引。当重复请求到来时,尝试插入相同的 request_id 将会触发数据库错误,从而达到幂等性的目的。

private boolean isProcessed(String requestId) {
    try {
        jdbcTemplate.update("INSERT INTO request_ids(request_id) VALUES(?)", requestId);
    } catch (DuplicateKeyException e) {
        return true;
    }
    return false;
}

2. 应用内存

在应用级别维护一个已处理 request_id 的集合。这样不需要外部存储,但这种方式不适合分布式系统,因为每个实例都有自己的内存。

private Set<String> processedRequestIds = Collections.synchronizedSet(new HashSet<>());

private boolean isProcessed(String requestId) {
    if (processedRequestIds.contains(requestId)) {
        return true;
    }
    processedRequestIds.add(requestId);
    return false;
}

3. 分布式锁

使用数据库或者其他分布式锁机制确保同一时间只有一个请求在处理。这种方法可以确保幂等性但可能影响系统性能。

private boolean isProcessed(String requestId) {
    if (acquireLock(requestId)) {
        // 进行处理
        releaseLock(requestId);
        return false;
    }
    return true;
}

4. 消息队列

使用消息队列(如 Kafka、RabbitMQ 等)确保消息的幂等处理。大多数现代消息队列都提供了这种机制。

5. 文件系统

在文件系统中为每个 request_id 创建一个唯一文件。通过检查文件存在与否,来确定是否处理了该请求。但这种方法不适合高并发场景。

每种方法都有其优点和局限性。选择哪种方法取决于具体需求、可用资源以及你愿意接受的复杂性。

总结

通过使用 Redis 进行幂等性检查,以及使用 MySQL 进行持久化存储,我们成功地实现了一个高并发、幂等的计数器服务。这种设计能够在高并发条件下保证 at least once 语义,同时也实现了幂等性。

该设计还有进一步优化和扩展的空间,例如,可以加入更多的负载均衡和高可用性特性,或者使用消息队列来进一步解耦生产者和消费者。

感谢您的阅读,希望本文能为您提供有用的信息和启示。如有任何问题或建议,请随时留言。


作者: [猫头虎]
发布时间: [2023.08.30]

在这里插入图片描述

原创声明

======= ·

  • 原创作者: 猫头虎

作者wx: [ libin9iOak ]

学习复习

本文为原创文章,版权归作者所有。未经许可,禁止转载、复制或引用。

作者保证信息真实可靠,但不对准确性和完整性承担责任

未经许可,禁止商业用途。

如有疑问或建议,请联系作者。

感谢您的支持与尊重。

点击下方名片,加入IT技术核心学习团队。一起探索科技的未来,共同成长。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/948194.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

react css 污染解决方法

上代码 .m-nav-bar {background: #171a21;.content {height: 104px;margin: 0px auto;} }import React from "react"; import styles from ./css.module.scssexport default class NavBar extends React.Component<any, any> {constructor (props: any) {supe…

Linux常用命令——dd命令

在线Linux命令查询工具 dd 复制文件并对原文件的内容进行转换和格式化处理 补充说明 dd命令用于复制文件并对原文件的内容进行转换和格式化处理。dd命令功能很强大的&#xff0c;对于一些比较底层的问题&#xff0c;使用dd命令往往可以得到出人意料的效果。用的比较多的还是…

[ZenTao]源码阅读:加载自定义任务类型

www/index.php config/config.php framework/base/router.class.php tmp/model/common.php module/common/model.php framework/router.class.php

《动手学深度学习》-55循环神经网络

沐神版《动手学深度学习》学习笔记&#xff0c;记录学习过程&#xff0c;详细的内容请大家购买书籍查阅。 b站视频链接 开源教程链接 循环神经网络 潜变量自回归模型&#xff1a; 循环神经网络结构&#xff1a; 简单来说循环神经网络RNN就是在MLP中加了一项&#xff0c;使它可…

气传导蓝牙耳机哪个好?好用爆款气传导蓝牙耳机推荐

​对于气传导耳机&#xff0c;还有很多朋友觉得还是比较陌生&#xff0c;气传导工作原理是通过空气传播&#xff0c;由耳廓收集声音&#xff0c;然后以声波的形式引起鼓膜振动。气传导耳机除了拥有骨传导耳机优点之外&#xff0c;长时间佩戴没有震麻感&#xff0c;音质比骨传导…

alibabacloud的简单使用,nacos配置中心+服务中心。作者直接给自己写的源码

文章目录 依赖关键主要的程序启动文件配置文件bootstrap.yml依赖文件nacos配置中心上的文件截图 启动成功截图参考文档 依赖关键 SpringBoot版本和com.alibaba.cloud版本需要对应&#xff0c;不然会程序会启动失败作者使用的版本 SpringBoot: 2.1.6.RELEASE alibabacloud: 2.…

Android AGP8.1.0组件化初探

Android AGP8.1.0组件化初探 前言&#xff1a; 前面两篇完成了从AGP4.2到 AGP8.1.0的升级&#xff0c;本文是由于有哥们留言说在AGP8.0中使用ARouter组件化有问题&#xff0c;于是趁休息时间尝试了一下&#xff0c;写了几个demo&#xff0c;发现都没有问题&#xff0c;跳转和传…

循环的技巧和深入条件控制

这里对深入条件控制的知识点做一下测试&#xff1a;用作普通值而不是布尔值时&#xff0c;短路运算符的返回值通常是最后一个求了值的参数。 a2c5 for b in [0,1]:print((a and b and c))运行结果 E:\Python\Python38\python.exe D:/pythonprojects/python-auto-test/test/ti…

【MySQL】mysql connect

目录 一、准备工作 1、创建mysql用户 2、删除用户 3、修改用户密码 3.1、自己改自己密码 3.2、root用户修改指定用户的密码 4、数据库的权限 4.1、给用户授权 4.2、回收权限 二、连接mysql client 1、安装mysql客户端库 2、验证是否引入成功 三、 mysql接口 1、初…

Spring boot 整合 Okhttp3 并封装请求工具

一、 为什么要使用okHttp OkHttp是一个高效、灵活、易于使用的HTTP客户端库&#xff0c;优势如下&#xff1a; 性能更高&#xff1a;OkHttp在网络请求处理上采用了异步模型&#xff0c;并将连接池、压缩、网络协议等多种技术应用到其中&#xff0c;从而提高了网络请求的效率和…

c语言每日一练(12)

前言&#xff1a;每日一练系列&#xff0c;每一期都包含5道选择题&#xff0c;2道编程题&#xff0c;博主会尽可能详细地进行讲解&#xff0c;令初学者也能听的清晰。每日一练系列会持续更新&#xff0c;暑假时三天之内必有一更&#xff0c;到了开学之后&#xff0c;将看学业情…

MongoDB实验——在MongoDB集合中查找文档

在MongoDB集合中查找文档 一、实验目的二、实验原理三、实验步骤1.启动MongoDB数据库、启动MongoDB Shell客户端2.数据准备-->person.json3.指定返回的键4 .包含或不包含 i n 或 in 或 in或nin、$elemMatch&#xff08;匹配数组&#xff09;5.OR 查询 $or6.Null、$exists7.…

Vue3 学习 组合式API setup语法糖 响应式 指令 DIFF(一)

文章目录 前言一、Composition Api二、setup语法糖三、响应式refreactive 四、其他一些关键点v-prev-oncev-memov-cloak 五、虚拟Dom五、diff算法 前言 本文用于记录学习Vue3的过程 一、Composition Api 我觉得首先VUE3最大的改变就是对于代码书写的改变&#xff0c;从原来选择…

《自然语言处理》chapter7-预训练语言模型

这是阅读《自然语言处理-基于预训练模型的方法》的学习笔记&#xff0c;记录学习过程&#xff0c;详细的内容请大家购买书籍查阅。 同时参考沐神的两个视频&#xff1a; GPT&#xff0c;GPT-2&#xff0c;GPT-3 论文精读【论文精读】 BERT 论文逐段精读【论文精读】 概述 自然…

如何保证跨境传输的安全性?

随着互联网时代的到来&#xff0c;全球文件传输频率不断增加&#xff0c;市场经济的发展也对信息共享提出更高要求。传统电话交流已无法满足跨国企业的需求&#xff0c;企业内部诸如Web、电子邮件、企业资源计划&#xff08;ERP&#xff09;、网络电话&#xff08;VOIP&#xf…

SAP ABAP 代码调优检查工具及性能调优

一&#xff1a;代码检查工具 ABAP 测试仪表盘(ATC) 所有检查工具, 豁免处理, 结果存储的中心 代码检查器 (SCI) 提供给客户&#xff0c;合作伙伴和SAP的做代码相关检查的开放式架构 扩展程序检查(SLIN) 扩展的代码检查&#xff0c;用来分析源代码 SAP NetWeaver 应用服务器&a…

VMware虚拟机网络连接设置——NAT模式(Windows版)

首先参考VMware虚拟机网络连接设置——仅主机模式&#xff08;Windows版&#xff09;_vmware仅主机模式_Mr.LiuZB的博客-CSDN博客配置&#xff0c;网络还是不通&#xff0c;再结合Linux 虚拟机和主机互通 [万能方法]_linux虚拟机与主机网络连接_核桃胡子的博客-CSDN博客 配置&…

AD域中批量添加域用户

首先在C盘中建立一个文件&#xff0c;名字为file.csv 格式如下 根据CSV文件的ABCDE列来进行识别的 然后我们在cmd命令行中输入一下命令 for /f "tokens1,2,3,4,5 delims," %a in (C:file.csv) do dsadd user "cn%c,ou业务部,ou博迈科技,dcBMKJ,dccom" -s…

Qt应用开发(基础篇)——消息对话框 QMessageBox

一、前言 QMessageBox类继承于QDialog&#xff0c;是一个模式对话框&#xff0c;常用于通知用户或向用户提出问题并接收答案。 对话框QDialog QMessageBox消息框主要由四部分组成&#xff0c;一个主要文本text&#xff0c;用于提醒用户注意某种情况;一个信息文本informativeTex…

linux c编程之“hello world”一

文章目录 hello world开始学习汇编文件 hello.s第1行第2行第3行第4行第5行第6行第7行第8行第9行第10行第11行第12行第13行 X [注]&#xff1a;环境说明&#xff1a; OS&#xff1a;CentOS 7 GCC&#xff1a; 4.8.5 其他环境下的结果可能不尽相同。 声明&#xff1a;本文是我的一…