RocketMq-秒杀应用场景

news2025/1/23 3:48:03

1、介绍mq

2、秒杀介绍

-----redis配置

3、生产者-消费者搭建

1、介绍mq

在这里插入图片描述

在这里插入图片描述
消息存储架构图中主要有下面三个跟消息存储相关的文件构成。

(1) CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G, 文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;

(2) ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M;

(3) IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是:$HOME/store/index/{fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故RocketMQ的索引文件其底层实现为hash索引。(通过消息的key或者id查找到的消息,hash的时间复杂度O(n)+O(1))

数据结构知识:

方法一:
在这里插入图片描述
方法二:
在这里插入图片描述
另一位老师讲解:也可以这样记hash, y轴,是一个弹簧,每个key压弹簧,使用同样的力度压弹簧,落的位置,就是存储的位置。

H(key)=hash(key), hash有对用的计算方法(比如直接定址,除留取余法,平方取中法)。

如果出现hash冲突(线性探测,二次探测,随机探测, 再哈希法, 链地址法)

时间复杂度:
链表: O(n)
冒泡: O(n^2)最坏情况下 ,最好O(n)
B+tree: O(logn)

在上面的RocketMQ的消息存储整体架构图中可以看出,RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。RocketMQ的混合型存储结构(多个Topic的消息实体内容都存储于一个CommitLog中)针对Producer和Consumer分别采用了数据和索引部分相分离的存储结构,Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。只要消息被刷盘持久化至磁盘文件CommitLog中,那么Producer发送的消息就不会丢失。正因为如此,Consumer也就肯定有机会去消费这条消息。当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker允许等待30s的时间,只要这段时间内有新消息到达,将直接返回给消费端。这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。

1.2 页缓存与内存映射

页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache。对于数据的写入,OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。

在RocketMQ中,ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取,在page cache机制的预读取作用下,Consume Queue文件的读性能几乎接近读内存,即使在有消息堆积情况下也不会影响性能。而对于CommitLog消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO调度算法,比如设置调度算法为“Deadline”(此时块存储采用SSD的话),随机读的性能也会有所提升。

另外,RocketMQ主要通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减少了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)。

2、秒杀介绍

秒杀在很短的时间内要处理大量请求,需要高并发。
并发:多个任务在同一时间段内执行
并行:多个任务在同一时刻执行

这里主要目的是从接口上减少相应时间:
1、能异步就异步
2、减少IO(统一查询,统一写)
3、尽早return(做校验…)。
4、加锁粒度尽可能小
5、事务控制粒度尽可能小

在这里插入图片描述

两个程序(一个生产者,一个消费者)
在这里插入图片描述
流程:
1、用户进入秒杀接口,redis setnx 进行用户去重(判断用户是否买个这个商品),如果不存在,则进行预库减;如果存在则返回客户端。
进行预库减之后,会把相应的uk(唯一值uniqueKey=userId+goodsId)存到 redis中,供下次 用户去重进行判断。【redis 写8w左右,读11w左右。实际情况写6w左右,读8w左右】
2、用户秒杀到的订单存放到mq 进行异步处理。
3、消费服务,从mq拿到信息,进行数据库的扣减操作。
4、每次启动项目时,要进行同步redis库存。

redis 配置

redis配置:

package com.example.config;


import com.alibaba.fastjson.support.spring.FastJsonRedisSerializer;
import com.example.utils.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
@Slf4j
public class RedisConfiguration {
    @Autowired
    private final RedisConnectionFactory redisConnectionFactory;

    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(this.redisConnectionFactory);
        FastJsonRedisSerializer<Object> fastJsonRedisSerializer = new FastJsonRedisSerializer(Object.class);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(fastJsonRedisSerializer);
        redisTemplate.setHashValueSerializer(fastJsonRedisSerializer);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

    @Bean
    public StringRedisTemplate stringRedisTemplate() {
        StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(this.redisConnectionFactory);
        stringRedisTemplate.afterPropertiesSet();
        return stringRedisTemplate;
    }

    @Bean
    public RedisUtil redisUtil(RedisTemplate<String, Object> redisTemplate, StringRedisTemplate stringRedisTemplate) {
        RedisUtil redisUtil = new RedisUtil();
        redisUtil.setRedisTemplate(redisTemplate);
        redisUtil.setStringRedisTemplate(stringRedisTemplate);
        log.info("组件加载完成:redis工具类 ");
        return redisUtil;
    }

    public RedisConfiguration(final RedisConnectionFactory redisConnectionFactory) {
        this.redisConnectionFactory = redisConnectionFactory;
    }

    public RedisConnectionFactory getRedisConnectionFactory() {
        return this.redisConnectionFactory;
    }

//    public String toString() {
//        return "RedisConfiguration(redisConnectionFactory=" + this.getRedisConnectionFactory() + ")";
//    }
}

redis 工具

package com.example.utils;


import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Component
public class RedisUtil {
    private RedisTemplate<String, Object> redisTemplate;
    private StringRedisTemplate stringRedisTemplate;
    public static final Integer GENERAL_CACHE_EXPIRE_DURATION = 2592000;


    public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public void setStringRedisTemplate(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    public void set(String k, Object v, long time) {
        if (v instanceof String && this.stringRedisTemplate != null) {
            this.stringRedisTemplate.opsForValue().set(k, (String) v);
        } else {
            this.redisTemplate.opsForValue().set(k, v);
        }

        if (time > 0L) {
            this.redisTemplate.expire(k, time, TimeUnit.SECONDS);
        }

    }

    public void set(String k, Object v) {
        this.set(k, v, -1L);
    }

    public boolean contains(String key) {
        return this.redisTemplate.hasKey(key);
    }

    public String get(String k) {
        return this.stringRedisTemplate != null ? (String) this.stringRedisTemplate.opsForValue().get(k) : (String) this.redisTemplate.opsForValue().get(k);
    }

    public <T> T getObject(String k) {
        ValueOperations<String, Object> valueOps = this.redisTemplate.opsForValue();
        return (T) valueOps.get(k);
    }

    public void remove(String key) {
        this.redisTemplate.delete(key);
    }

    public long getExpire(String key) {
        return this.redisTemplate.getExpire(key);
    }

    public Set<String> keys(String pattern) {
        return this.redisTemplate.keys(pattern);
    }

    public Long increment(String key, long delta, long time) {
        Long val = this.redisTemplate.opsForValue().increment(key, delta);
        if (val.equals(1L) && time > 0L) {
            this.redisTemplate.expire(key, time, TimeUnit.SECONDS);
        }

        return val;
    }

    public Double increment(String key, double delta) {
        return this.redisTemplate.opsForValue().increment(key, delta);
    }

    public boolean hset(String key, Map<String, Object> map) {
        try {
            this.redisTemplate.opsForHash().putAll(key, map);
            return true;
        } catch (Exception var4) {
            var4.printStackTrace();
            return false;
        }
    }

    public Object hget(String key, String item) {
        return this.redisTemplate.opsForHash().get(key, item);
    }

    public Map<Object, Object> getKeys(String key) {
        return this.redisTemplate.opsForHash().entries(key);
    }

    /**
     * 获取list缓存的长度
     *
     * @param key 键
     * @return
     */
    public long lgetSize(String key) {
        return this.redisTemplate.opsForList().size(key);
    }

    /**
     * 获取list缓存的内容
     *
     * @param key   键
     * @param start 开始下标
     * @param end   结束下标  当等于-1时,表示所有
     * @return
     */
    public List<Object> lget(String key, long start, long end) {
        return this.redisTemplate.opsForList().range(key, start, end);
    }

    /**
     * 设置list缓存
     *
     * @param key   键
     * @param value 值
     * @param time  失效时间
     */
    public void lset(String key, Object value, long time) {
        this.redisTemplate.opsForList().rightPush(key, value);
        if (time > 0L) {
            this.redisTemplate.expire(key, time, TimeUnit.SECONDS);
        }
    }

}

3、生产者-消费者搭建

pom

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <version>2.6.13</version>
        </dependency>

        <!--        使用的一个对象池-->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.11.1</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.3</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.17</version>
        </dependency>

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.5.3.1</version>
        </dependency>

数据库:

在这里插入图片描述

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for goods
-- ----------------------------
DROP TABLE IF EXISTS `goods`;
CREATE TABLE `goods`  (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `goods_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL,
  `price` decimal(10, 2) NULL DEFAULT NULL,
  `stocks` int(255) NULL DEFAULT NULL,
  `status` int(255) NULL DEFAULT NULL,
  `pic` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL,
  `create_time` datetime(0) NULL DEFAULT NULL,
  `update_time` datetime(0) NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 4 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of goods
-- ----------------------------
INSERT INTO `goods` VALUES (1, '小米12s', 4999.00, 1000, 2, 'xxxxxx', '2023-02-23 11:35:56', '2023-02-23 16:53:34');
INSERT INTO `goods` VALUES (2, '华为mate50', 6999.00, 10, 2, 'xxxx', '2023-02-23 11:35:56', '2023-02-23 11:35:56');
INSERT INTO `goods` VALUES (3, '锤子pro2', 1999.00, 100, 1, NULL, '2023-02-23 11:35:56', '2023-02-23 11:35:56');

-- ----------------------------
-- Table structure for order_records
-- ----------------------------
DROP TABLE IF EXISTS `order_records`;
CREATE TABLE `order_records`  (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) NULL DEFAULT NULL,
  `order_sn` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL,
  `goods_id` int(11) NULL DEFAULT NULL,
  `create_time` datetime(0) NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;

生产者:

配置文件yml

# 应用服务 WEB 访问端口
server:
    port: 8081
tomcat:
    threads:
#        最大线程数: 400
        max: 400
rocketmq:
    name-server: 192.168.101.128:9876
    producer:
        #        发送消息超时时间: 3s
        send-message-timeout: 3000
        #失败重试次数同步次数
        retry-times-when-send-failed: 2
        # 失败重试次数同步次数异步
        retry-times-when-send-async: 2
        #        4194304  4M,发送最大消息字节
        max-message-size: 4194304
        #在内部发送失败时是否重试其他代理,这个参数在有多个broker时才生效
        retry-next-server: true
        access-key: rocketmq2
        secret-key: 12345678

spring:
    datasource:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://127.0.0.1:3306/seckill?serverTimezone=UTC
        username: root
        password: 123456
    #配置redis
    redis:
        #    超时时间
        timeout: 10000ms
        #    连接的主机ip
        host: 127.0.0.1
        port: 6379
        #    redis使用的0号数据库
        database: 0
        password:
    main:
        allow-bean-definition-overriding: true


测试类:

package com.example.controller;

import com.alibaba.cola.dto.Response;
import com.alibaba.cola.dto.SingleResponse;
import com.alibaba.fastjson.JSON;
import com.example.service.GoodsService;
import com.example.utils.RedisUtil;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 1、用户去重,
 * 2、预减库存
 * 3、存放到mq中
 * @date 2024/10/6 18:24
 */
@RestController
@RequestMapping("/seckill")
public class SeckillController {

    @Autowired
    private RedisTemplate redisTemplate;
    @Autowired
    private RedisUtil redisUtil;

    @Autowired
    private GoodsService goodsService;


    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @RequestMapping("/doSeckill")
    public Response doSeckill(Integer goodsId,Integer userId) {
        String uk=userId+"-"+goodsId;
        //去重处理
        Boolean flag = redisTemplate.opsForValue().setIfAbsent(uk, "");
        if (!flag) {
            System.out.println("已经秒杀过了");
            return Response.buildFailure("-1","已经秒杀过了,请您参与其他商品抢购");
        }
        //TODO 预减库存,查-改-更新 不安全,这里直接采用原子性减库存
        //假设已经同步到数据库了,key=goodsId:xxx,value=库存量,这是redis中的库存量
        Long count = redisTemplate.opsForValue().decrement("goodsId:" + goodsId);
        if (count < 0) {
         return Response.buildFailure("-1","该商品已经被抢完,请下次早点来哦O(∩_∩)O");
        }
        //TODO:存放到mq中
        //进行json发送到消费者
        String json = JSON.toJSONString(uk);
        //异步发送
        rocketMQTemplate.asyncSend("seckillTopic", json, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("消息发送成功");
            }
            @Override
            public void onException(Throwable throwable) {
                System.out.println("消息发送失败"+throwable.getMessage());
            }
        });
        return SingleResponse.of("拼命抢购中,请稍后去订单中心查看");
    }
}
package com.example.config;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.example.domain.Goods;
import com.example.service.GoodsService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;

/**
 * @author GJ
 * @date 2024/10/7 17:10
 */
@Component
public class DataSynConfig  implements InitializingBean {
    @Autowired
    private GoodsService goodsService;
    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * 搞一个定时任务,每天10点开启执行一次。
     *
     *
     * spring bean的生命周期[实例化-new,属性赋值,初始化(前 PostConstruct/中  InitializingBean 接口/ 后  BeanPostProcessor),使用,销毁]
     * 在当前对象 实例化完以后
     * 属性注入以后
     * 执行 PostConstruct 注解的方法
     */

    // 初始化方法,启动自动加载和这个方法
   @PostConstruct
    public void afterPropertiesSet(){
       //查询数据库,加载数据到缓存中
       LambdaQueryWrapper<Goods> queryWrapper = new LambdaQueryWrapper<>();
       queryWrapper.eq(Goods::getStatus,1);

       List<Goods> goodsList = goodsService.list(queryWrapper);
       System.out.println("数据库查询出来数据--->"+goodsList);
       System.out.println(">>>>>>>>>>>>>>>>>>>>开始加载数据到redis中>>>>>>>>>>>>>>>>>>>>>>>>");
       for (Goods goods : goodsList) {
           redisTemplate.opsForValue().set("goodsId:"+goods.getId(),goods.getStocks());
       }
       System.out.println(">>>>>>>>>>>>>>>>>>>>结束加载数据到redis中>>>>>>>>>>>>>>>>>>>>>>>>");
    }
}

消费者:

配置文件yml

server:
    port: 8899
tomcat:
    threads:
        #        最大线程数: 400
        max: 400
spring:
    datasource:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://127.0.0.1:3306/seckill?serverTimezone=UTC
        username: root
        password: 123456
    #配置redis
    redis:
        #    超时时间
        timeout: 10000ms
        #    连接的主机ip
        host: 127.0.0.1
        port: 6379
        #    redis使用的0号数据库
        database: 0
        password:
    main:
        allow-bean-definition-overriding: true


rocketmq:
    name-server: 127.0.0.1:9876
    consumer:
        access-key: rocketmq2
        secret-key: 12345678


redis 配置和工具和生产者一样


消费监听

package com.example.listener;

import com.example.service.GoodsService;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;

import javax.annotation.Resource;

/**
 * @author GJ
 * @date 2024/10/8 17:06
 * description: 秒杀监听
 */
@RocketMQMessageListener(topic = "seckill-topic-consumer",
        consumerGroup = "group-order",
        messageModel = MessageModel.CLUSTERING,
        consumeThreadNumber = 24)
public class SeckillListener implements RocketMQListener<MessageExt> {
    @Resource
    private GoodsService goodsService;

    @Override
    public void onMessage(MessageExt message) {
        System.out.println("收到消息:" + message);
        byte[] body = message.getBody();
        String msg = new String(body);
//        将msg转换为json对象,并解析
        String[] split = msg.split("-");
        Integer userId = Integer.valueOf(split[0]);
        Integer goodsId = Integer.valueOf(split[1]);
        System.out.println("用户id:" + userId + ",商品id:" + goodsId);
        // 调用业务层方法,完成秒杀操作

            goodsService.kill(userId, goodsId);

    }
    }

将主键自增的id,初始化:

TRUNCATE ‘order’

package com.example.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.domain.Goods;
import com.example.domain.OrderRecords;
import com.example.mapper.GoodsMapper;
import com.example.mapper.OrderRecordsMapper;
import com.example.service.GoodsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.Date;

/**
 *
 */
@Service
public class GoodsServiceImpl extends ServiceImpl<GoodsMapper, Goods>
        implements GoodsService {
    @Autowired
    private OrderRecordsMapper orderRecordsMapper;
    @Autowired
    private GoodsMapper goodsMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void kill(Integer userId, Integer goodsId) {
        /**
         * 1、判断库存是否充足
         * 2、扣减库存
         * 3、创建订单
         */
        synchronized (this) {
            LambdaQueryWrapper<Goods> queryWrapper = new LambdaQueryWrapper<>();
            queryWrapper.eq(Goods::getId, goodsId)
                    .eq(Goods::getStatus, 1)
                    .gt(Goods::getStocks, 0);
            Goods goods = goodsMapper.selectOne(queryWrapper);
            Integer stocks = goods.getStocks();
            if (stocks <= 0) {
                throw new RuntimeException("库存不足");

            }
            // 扣减库存
            goods.setStocks(--stocks);
            goods.setUpdate_time(new Date());
            int i = goodsMapper.updateById(goods);
            // 创建订单
            if (i > 0) {
                OrderRecords orderRecords = new OrderRecords();
                orderRecords.setUser_id(userId);
                orderRecords.setGoods_id(goodsId);
                orderRecords.setCreate_time(new Date());
                orderRecordsMapper.insert(orderRecords);
            }
        }
    }
}


可重复度:会将数据拍个照。
事务之间相互隔离。

A 事务先释放锁,然后在提交。释放锁时,B进来,读取原来数据,后来A 提交的事务-导致幻读。
(解决这个问题,先提交然后再释放锁 即 事务由锁包裹)。

方案一:

package com.example.listener;

import com.example.service.GoodsService;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 
 * @date 2024/10/8 17:06
 * description: 秒杀监听
 */
@Component
@RocketMQMessageListener(topic = "seckill-topic",
        consumerGroup = "group-order",
        messageModel = MessageModel.CLUSTERING,
        consumeThreadNumber = 24)
public class SeckillListener implements RocketMQListener<MessageExt> {
    @Resource
    private GoodsService goodsService;

    @Override
    public void onMessage(MessageExt message) {
        System.out.println("收到消息:" + message);
        byte[] body = message.getBody();
        String msg = new String(body);
//        将msg转换为json对象,并解析
        String[] split = msg.split("-");
        Integer userId = Integer.valueOf(split[0]);
        Integer goodsId = Integer.valueOf(split[1]);
        System.out.println("用户id:" + userId + ",商品id:" + goodsId);
        // 调用业务层方法,完成秒杀操作
    synchronized (this) {
        goodsService.kill(userId, goodsId);
    }

    }
    }

结果:

使用jemter进行压测:
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
满足不了分布式需求

方案二:

package com.example.listener;

import com.example.service.GoodsService;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @date 2024/10/8 17:06
 * description: 秒杀监听
 */
@Component
@RocketMQMessageListener(topic = "seckill-topic",
        consumerGroup = "group-order",
        messageModel = MessageModel.CLUSTERING,
        consumeThreadNumber = 24)
public class SeckillListener implements RocketMQListener<MessageExt> {
    @Resource
    private GoodsService goodsService;

    @Override
    public void onMessage(MessageExt message) {
        System.out.println("收到消息:" + message);
        byte[] body = message.getBody();
        String msg = new String(body);
//        将msg转换为json对象,并解析
        String[] split = msg.split("-");
        Integer userId = Integer.valueOf(split[0]);
        Integer goodsId = Integer.valueOf(split[1]);
        System.out.println("用户id:" + userId + ",商品id:" + goodsId);
        // 调用业务层方法,完成秒杀操作
        goodsService.kill(userId, goodsId);

    }
    }

把锁方法放到数据层面。

a=a-1 操作数据库时,mysql行锁。

package com.example.service.impl;

import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.domain.Goods;
import com.example.domain.OrderRecords;
import com.example.mapper.GoodsMapper;
import com.example.mapper.OrderRecordsMapper;
import com.example.service.GoodsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.Date;

/**
 *
 */
@Service
public class GoodsServiceImpl extends ServiceImpl<GoodsMapper, Goods>
        implements GoodsService {
    @Autowired
    private OrderRecordsMapper orderRecordsMapper;
    @Autowired
    private GoodsMapper goodsMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void kill(Integer userId, Integer goodsId) {
        /**
         * 1、判断库存是否充足
         * 2、扣减库存
         * 3、创建订单
         */
        //使用mysql行锁,
        LambdaUpdateWrapper<Goods> updateWrapper = new LambdaUpdateWrapper<>();
        updateWrapper
                .set(Goods::getUpdate_time, new Date())
                .setSql("stocks=stocks-1  where stocks-1>=0  and id=" + goodsId);
        int i = goodsMapper.update(null, updateWrapper);
        System.out.println("更新成功" + i);
        // 创建订单
        if (i > 0) {
            OrderRecords orderRecords = new OrderRecords();
            orderRecords.setUser_id(userId);
            orderRecords.setGoods_id(goodsId);
            orderRecords.setCreate_time(new Date());
            orderRecordsMapper.insert(orderRecords);
        }
    }

}

结果:
在这里插入图片描述
在这里插入图片描述

不适合用于 并发量特别大的场景,几百, 1000左右并发量 可以使用方案二。

在这里插入图片描述

方案三:

并发量很大呢?
使用redisson 锁

package com.example.listener;

import com.example.service.GoodsService;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @author GJ
 * @date 2024/10/8 17:06
 * description: 秒杀监听
 */
@Component
@RocketMQMessageListener(topic = "seckill-topic",
        consumerGroup = "group-order",
        messageModel = MessageModel.CLUSTERING,
        consumeThreadNumber = 24)
public class SeckillListener implements RocketMQListener<MessageExt> {
    @Resource
    private GoodsService goodsService;

    @Resource
            private RedisTemplate redisTemplate;

    int  maxTime=20000;

    @Override
    public void onMessage(MessageExt message) {
        System.out.println("收到消息:" + message);
        byte[] body = message.getBody();
        String msg = new String(body);
//        将msg转换为json对象,并解析
        String[] split = msg.split("-");
        Integer userId = Integer.valueOf(split[0]);
        Integer goodsId = Integer.valueOf(split[1]);
        System.out.println("用户id:" + userId + ",商品id:" + goodsId);
        int currentTime = 0;
        //最外层的就是自旋锁,最内层就是加锁机制;currentTime> maxTime写成true 也可以,一直自旋。目前写的是20000/200=100次 也可以在
      //currentTime +=200,睡几秒钟。自旋的慢点
        while(currentTime> maxTime){
            Boolean flag = redisTemplate.opsForValue().setIfAbsent("lock:" + goodsId, "lock");
            // 加锁
            if (flag){
                try{
                    // 调用业务层方法,完成秒杀操作
                    goodsService.kill(userId, goodsId);
                } finally {
                    // 释放锁
                    redisTemplate.delete("lock:" + goodsId);
                }
            }else {
                currentTime +=200;
            }
        }
    }
    }
package com.example.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.domain.Goods;
import com.example.domain.OrderRecords;
import com.example.mapper.GoodsMapper;
import com.example.mapper.OrderRecordsMapper;
import com.example.service.GoodsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.Date;

/**
 *
 */
@Service
public class GoodsServiceImpl extends ServiceImpl<GoodsMapper, Goods>
        implements GoodsService {
    @Autowired
    private OrderRecordsMapper orderRecordsMapper;
    @Autowired
    private GoodsMapper goodsMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void kill(Integer userId, Integer goodsId) {
        /**
         * 1、判断库存是否充足
         * 2、扣减库存
         * 3、创建订单
         */
        LambdaQueryWrapper<Goods> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(Goods::getId, goodsId)
                .eq(Goods::getStatus, 1)
                .gt(Goods::getStocks, 0);
        Goods goods = goodsMapper.selectOne(queryWrapper);
        Integer stocks = goods.getStocks();
        if (stocks <= 0) {
            throw new RuntimeException("库存不足");

        }
        // 扣减库存
        goods.setStocks(--stocks);
        goods.setUpdate_time(new Date());
        int i = goodsMapper.updateById(goods);
        // 创建订单
        if (i > 0) {
            OrderRecords orderRecords = new OrderRecords();
            orderRecords.setUser_id(userId);
            orderRecords.setGoods_id(goodsId);
            orderRecords.setCreate_time(new Date());
            orderRecordsMapper.insert(orderRecords);
        }
    }

}

结果:

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2201291.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

微服务实战——登录(普通登录、社交登录、SSO单点登录)

登录 1.1. 用户密码 PostMapping("/login")public String login(UserLoginVo vo, RedirectAttributes redirectAttributes, HttpSession session){R r memberFeignService.login(vo);if(r.getCode() 0){MemberRespVo data r.getData("data", new Type…

价值5000元完整版GOD引擎手机客户端三端引擎源码 编译完整版

5000元完整版GOD引擎手机客户端三端引擎源码 支持三端互通&#xff1a;电脑端&#xff0c;安卓端&#xff0c;苹果端 GOD引擎全套源码及手游客户端源码&#xff08;苍穹引擎源码及修改教程&#xff09; 服务端代码为Delphir&#xff0c;手游客户端代码为cocos2dx的&#xff08;…

DAMA数据管理知识体系(第14章 大数据和数据科学)

课本内容 14.1 引言 概要 从数据中探究、研发预测模型、机器学习模型、规范性模型和分析方法并将研发结果进行部署供相关方分析的人&#xff0c;被称为数据科学家业务驱动 期望抓住从多种流程生成的数据集中发现的商机&#xff0c;是提升一个组织大数据和数据科学能力的最大业务…

论文阅读(十二):Attention is All You Need

文章目录 一、循环神经网络1.1RNN模型1.1.1RNN模型简介1.1.2RNN基本结构1.1.3权重共享机制1.1.4RNN局限性&#xff1a;长期依赖问题与梯度消失 1.2LSTM模型1.2.1LSTM核心思想1.2.2遗忘门1.2.3输入门1.2.4更新细胞状态1.2.5输出门1.2.6参数更新 二、Seq2Seq机制2.1RNN结构的局限…

react 知识点汇总(非常全面)

React 是一个用于构建用户界面的 JavaScript 库&#xff0c;由 Facebook 开发并维护。它的核心理念是“组件化”&#xff0c;即将用户界面拆分为可重用的组件。 React 的组件通常使用 JSX&#xff08;JavaScript XML&#xff09;。JSX 是一种 JavaScript 语法扩展&#xff0c;…

house_of_muney

house_of_muney 首先介绍一下house of muney 这个利用原理&#xff1a; 在了解过_dl_runtime_resolve的前提下&#xff0c;当程序保护开了延迟绑定的时候&#xff0c;程序第一次调用相关函数的时候会执行下面的命令 push n push ModuleID jmp _dl_runtime_resolve 这里的n…

OCR+PDF解析配套前端工具开源详解!

目录 一、项目简介 TextIn为相关领域的前端开发提供了优秀的范本。 目前项目已在Github上开源&#xff01; 二、性能特色 三、安装使用 安装依赖启动项目脚本命令项目结构 四、效果展示 面对日常生活和工作中常见的OCR识别、PDF解析、翻译、校对等场景&#xff0c;配套的…

洛谷P5648

洛谷P5648 这题花了很长时间&#xff0c;是在线段树题单里找到的&#xff08; &#xff09;。有线段树做法&#xff0c;但是我感觉可能比倍增做法更难看懂。以后有空再看看吧。感觉线段树现在只会板子题&#xff0c;绿稍微难点可能就不会。 花了很久时间之后&#xff0c;就觉得…

打造直播美颜平台的关键技术:视频美颜SDK的深度解析

本篇文章&#xff0c;小编将深入解析视频美颜SDK的关键技术&#xff0c;探讨其在打造直播美颜平台中的作用。 一、视频美颜SDK的定义与功能 视频美颜SDK是一套专门为实时视频处理而设计的软件开发工具包。其主要功能包括人脸检测、肤色美化、瑕疵修复、虚化背景、实时滤镜等。…

Python对PDF文件的合并操作

在处理 PDF 文件时&#xff0c;合并多个 PDF 文件为一个单一文件或者将某个单一文件插入某个PDF文件是一个常见的需求。Python 提供了多种库来实现这一功能&#xff0c;其中 PyPDF2 是一个非常流行的选择。该库提供了简单易用的接口&#xff0c;包括 merge() 方法&#xff0c;可…

CRE6281B1 (宽VCC:8-45V PWM电源芯片)

CRE6281B1 是一款外驱功率管的高度集成的电流型PWM 控制 IC&#xff0c;为高性能、低待机功率、低成本、高效率的隔离型反激式开关电源控制器。在满载时&#xff0c;CRE6281B1工作在固定频率(65kHz)模式。在负载较低时&#xff0c;采用节能模式&#xff0c;实现较高的功率转换效…

关于Allegro导出Gerber时的槽孔问题

注意点一&#xff1a; 如果设计的板子中有 槽孔和通孔(俗称圆孔)&#xff0c;不仅要NC Drill, 还要 NC Route allegro导出的槽孔文件后缀是 .rou 圆型孔后缀 是 .drl &#xff0c;出gerber时需要看下是否有该文件。 注意点二&#xff1a; 导出钻孔文件时&#xff0c;设置参…

Hi3061M开发板——系统时钟频率

这里写目录标题 前言MCU时钟介绍PLLCRG_ConfigPLL时钟配置另附完整系统时钟结构图 前言 Hi3061M使用过程中&#xff0c;AD和APT输出&#xff0c;都需要考虑到时钟频率&#xff0c;特别是APT&#xff0c;关系到PWM的输出频率。于是就研究了下相关的时钟。 MCU时钟介绍 MCU共有…

22.1 K8S之KubeSphere实现中间件高可用集群

22.1 K8S之KubeSphere实现中间件高可用集群 一. 章节概述二. WordPress1. WordPress 简介---------------------------------------------------------------------------------------------------一. 章节概述 二. WordPress 1. WordPress 简介 创建并部署 WordPress

MySQL 数据库的性能优化方法方法有哪些?

MySQL 数据库的性能优化方法方法有哪些&#xff1f; 从开发角度来看&#xff0c;一般可以从 SQL 和库表设计两部分优化性能。 SQL 优化 根据慢sql日志&#xff0c;找出需要优化的一些sql语句。 常见优化方向&#xff1a; 避免select *&#xff0c;只查询必要的字段&#x…

62 加密算法

62 加密算法 三种加密算法分类&#xff1a; 对称加密&#xff1a;密钥只有一个&#xff0c;解密、解密都是这个密码&#xff0c;加解密速度快&#xff0c;典型的对称加密有DES、AES、RC4等非对称加密&#xff1a;密钥成对出现&#xff0c;分别为公钥和私钥&#xff0c;从公钥…

sass学习笔记(1.0)

1.使用变量 sass可以像声明变量那样进行使用&#xff0c;这样同样的样式&#xff0c;就可以使用相同的变量来提高复用。 语法为&#xff1a;$ 变量名 在界面中也可以正常的显示 当然了&#xff0c;变量之间也可以相互引用&#xff0c;比如下面 div{$_color: #d45387;$BgColo…

kibana 删除es指定数据,不是删除索引

1 查询条件查询出满足条件的数据 GET /order_header_idx_202410/_search {"from":0,"size":10,"query":{"bool":{"filter":[{"term":{"oh_tenantId":{"value":"0211000001",&…

NeuVector部署、使用与原理分析

文章目录 前言1、概述2、安装与使用2.1、安装方法2.1.1、部署NeuVector前的准备工作2.1.1.1 扩容系统交换空间2.1.1.2 Kubernetes单机部署2.1.1.2.1 部署Docker2.1.1.2.2 部署Kubectl2.1.1.2.3 部署Minikube 2.1.1.3 Helm部署 2.1.2、使用Helm部署NeuVector 2.2、使用方法2.2.1…

YOLOv5改进——添加SimAM注意力机制

目录 一、SimAM注意力机制核心代码 二、修改common.py 三、修改yolo.py ​三、建立yaml文件 四、验证 一、SimAM注意力机制核心代码 在models文件夹下新建modules文件夹&#xff0c;在modules文件夹下新建一个py文件。这里为simam.py。复制以下代码到文件里面。 import…