一.关于RabbitMQ
1.RabbitMQ交换机类型
(1).FanoutExchange:广播交换机。消息发送到交换机后,会广播到所有队列,不需要经过路由。
(2).DirectExchange:直通交换机。生产者发送消息到交换机(Exchange),再经过路由(Routing Key),发送到绑定的队列(Queue),消费者接收队列消息。
(3).TopicExchange:交换机可以把消息绑定到匹配一个关键字(*),0个或多个关键字(#)的路由。
2.RabbitMQ确认消费机制
(1).生产者确认消息是否成功发送。
(2).消费者确认消息不丢失和被确认消费。
二.RabbitMQ实战
1.RabbitMQ的使用场景
(1)商品扣库存或者秒杀场景,前端的请求按顺序进入RabbitMQ普通队列中,再按顺序消费,实现某种程度的限流。这种适用于异步通知结果,类似大麦网预约抢票。
(2)用户下单后,将订单信息发送到RabbitMQ普通队列,异步将订单信息写入数据库。RabbitMQ普通队列消息设置过期不消费放入死信队列,不监听普通队列,监听死信队列,处理没有在指定时间付款的订单数据。
2.搭建SpringBoot项目
(1)maven pom文件
引入SpringBoot、AMQP、MyBatis-Plus、Redis等组件。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.12.RELEASE</version>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<spring-boot-dependencies.version>2.3.12.RELEASE</spring-boot-dependencies.version>
<spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!-- MYSQL -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.18</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.5</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
(2)yaml文件
包含Redis、RabbitMQ、数据库连接、交换机/队列/路由名称配置。
server:
port: 8089
shutdown: graceful
spring:
application:
name: rabbitMq
redis:
port: 31091
host: 112.74.54.29
password:
timeout: 600000
datasource:
password: root
username: root
url: jdbc:mysql://x.x.x.x:31090/rabbit_mq?useUnicode=true;characterEncoding=UTF-8&nullCatalogMeansCurrent=true
driver-class-name: com.mysql.cj.jdbc.Driver
# type: com.alibaba.druid.pool.DruidDataSource
# name: druidDataSource
rabbitmq:
virtual-host: /
host: 127.0.0.1
port: 5672
username: guest
password: guest
publisher-confirm-type: correlated
listener:
simple:
acknowledge-mode: manual
template:
mandatory: true
rabbitmq:
seckill:
queue:
name: rabbitmq.seckill.queue
exchange:
name: rabbitmq.seckill.exchange
routing:
key:
name: rabbitmq.seckill.routing.key
dead:
queue:
name: rabbitmq.seckill.dead.queue
exchange:
name: rabbitmq.seckill.dead.exchange
routing:
key:
name: rabbitmq.seckill.dead.routing.key
logging:
level:
org:
springframework:
jdbc:
core:
JdbcTemplate: DEBUG
web:
servlet:
DispatcherServlet: DEBUG
mybatis-plus:
mapper-locations: mapper/*.xml
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
map-underscore-to-camel-case: true
type-aliases-package: gdut.entity
(3)RabbitMQ配置
定义RabbitMQ发送消费消息配置。创建普通交换机、普通队列、关联普通交换机和普通队列的路由、死信交换机、死信队列、关联死信交换机和死信队列的路由。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import java.util.HashMap;
@Configuration
public class RabbitMQConfiguration {
private static final Logger log = LoggerFactory.getLogger(RabbitMQConfiguration.class);
@Autowired
private Environment environment;
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory factory) {
log.info("rabbitTemplate factory is:{}", factory.getChannelCacheSize());
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack) {
log.info("rabbitTemplate success correlationData is:{}, b is:{}, s is:{}", correlationData, ack, cause);
} else {
log.info("rabbitTemplate failed correlationData is:{}, b is:{}, s is:{}", correlationData, ack, cause);
}
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
log.info("rabbitTemplate message is:{}, s is:{}, s1 is:{}, s2 is:{}", message.getBody(), s, s1, s1);
}
});
return rabbitTemplate;
}
@Bean("simpleRabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(CachingConnectionFactory factory) {
SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
containerFactory.setMaxConcurrentConsumers(1);
containerFactory.setConcurrentConsumers(1);
containerFactory.setPrefetchCount(1);
containerFactory.setConnectionFactory(factory);
return containerFactory;
}
@Bean("seckillQueue")
public Queue seckillQueue() {
HashMap<String, Object> args = new HashMap<>();
// 声明队列绑定的死信交换机
args.put("x-dead-letter-exchange", environment.getProperty("rabbitmq.seckill.dead.exchange.name"));
// 声明队列的属性路由key
args.put("x-dead-letter-routing-key", environment.getProperty("rabbitmq.seckill.dead.routing.key.name"));
// args.put("x-message-ttl", 60000);
return QueueBuilder.durable(environment.getProperty("rabbitmq.seckill.queue.name")).withArguments(args).build();
}
@Bean("seckillExchange")
public DirectExchange seckillExchange() {
return new DirectExchange(environment.getProperty("rabbitmq.seckill.exchange.name"), true, false);
}
@Bean("seckillRoutingKey")
public Binding seckillRoutingKey() {
return BindingBuilder.bind(seckillQueue()).to(seckillExchange()).with(environment.getProperty("rabbitmq.seckill.routing.key.name"));
}
//死信交换机
@Bean("seckillDeadExchange")
public DirectExchange seckillDeadExchange() {
return new DirectExchange(environment.getProperty("rabbitmq.seckill.dead.exchange.name"));
}
// 声明死信队列
@Bean("seckillDeadQueue")
public Queue deadLetterQueue() {
return new Queue(environment.getProperty("rabbitmq.seckill.dead.queue.name"));
}
// 设置死信队列的绑定关系
@Bean("seckillDeadRoutingKey")
public Binding seckillDeadRoutingKey() {
return BindingBuilder.bind(deadLetterQueue()).to(seckillDeadExchange()).with(environment.getProperty("rabbitmq.seckill.dead.routing.key.name"));
}
}
RabbitMQ生产者,指定交换机、路由和消息不消费超时时间。消息过期会被转发到死信队列。
import cn.hutool.json.JSONUtil;
import gdut.entity.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component("seckillProducer")
@Slf4j
public class SeckillProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public String sendSeckillMsg(String exchange, String routingKey, Customer customer) {
String rabbitId = UUID.randomUUID().toString();
customer.setRabbitId(rabbitId);
log.info("sendSeckillMsg customer is:{}", JSONUtil.toJsonStr(customer));
rabbitTemplate.convertAndSend(exchange, routingKey, customer, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setCorrelationId(rabbitId);
messageProperties.setExpiration("60000");
return message;
}
});
log.info("sendSeckillMsg success:{}", rabbitId);
return rabbitId;
}
}
RabbitMQ消费者,监听死信队列。要保证幂等性,只有在未付款和用户取消状态下,回退Redis库存,同时把数据库数据修改为过期。正常执行上面操作向RabbitMQ提交消费标识,发生异常将消息抛弃,设置消息不重新进入队列。
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.rabbitmq.client.Channel;
import gdut.entity.Customer;
import gdut.entity.Product;
import gdut.service.CustomerService;
import gdut.service.ProductService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
@Component("seckillConsumer")
@Slf4j
@Transactional
public class SeckillConsumer {
@Autowired
private CustomerService customerService;
@Autowired
private ProductService productService;
@RabbitListener(queues = {"rabbitmq.seckill.dead.queue"}, containerFactory = "simpleRabbitListenerContainerFactory")
public void consumeMessage(Message message, Channel channel, @Payload Customer customer) {
log.info("consumeMessage success:{}", customer.getRabbitId());
MessageProperties messageProperties = message.getMessageProperties();
try {
//判断是否付费,已付费,直接通过。
//未付费,取消订单。
Customer newCustomer = customerService.getOne(new QueryWrapper<Customer>().lambda()
.eq(Customer::getRabbitId, customer.getRabbitId()));
if(2 == newCustomer.getIsPayed() || 0 == newCustomer.getIsPayed()) {
//加库存
customerService.incrby(customer.getProductUuid(), 1);
//数据库修改为取消
customerService.update(new UpdateWrapper<Customer>().lambda()
.eq(Customer::getRabbitId, customer.getRabbitId())
.set(Customer::getIsPayed, 3));
}
channel.basicAck(messageProperties.getDeliveryTag(), true);
} catch (Exception e) {
log.error("consumeMessage error", e);
try {
channel.basicReject(messageProperties.getDeliveryTag(), false);
} catch (IOException ioException) {
log.error("ioException error", ioException);
}
}
}
}
(4)Redis配置
Redis模板配置。主要做key和value值的序列化,没有序列化会影响从redis取值操作。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisAutoConfiguration {
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<Object, Object>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
return redisTemplate;
}
}
(5)Controller控制器
HTTP请求包括新增商品、查询商品数量、购买商品、付款。
import gdut.basic.SeckillProducer;
import gdut.entity.Customer;
import gdut.entity.SeckillParams;
import gdut.entity.SeckillPayParams;
import gdut.entity.SeckillProductParams;
import gdut.service.CustomerService;
import gdut.service.ProductService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/seckill")
@Slf4j
public class SeckillController {
@Autowired
private CustomerService customerService;
@Autowired
private ProductService productService;
@PostMapping("/insertProduct")
public String insertProduct(@RequestBody SeckillProductParams seckillProductParams) {
return productService.insertProduct(seckillProductParams);
}
@PostMapping("/getProductTotal")
public int getProductTotal(@RequestBody SeckillProductParams seckillProductParams) {
return productService.getProductTotal(seckillProductParams);
}
@PostMapping("/buyProduct")
public String sendRedPacket(@RequestBody SeckillParams seckillParams) {
return customerService.buyProduct(seckillParams);
}
@PostMapping("/payProduct")
public String payProduct(@RequestBody SeckillPayParams seckillPayParams) {
return customerService.payProduct(seckillPayParams);
}
}
(6)实体类
用户类和商品类
import com.baomidou.mybatisplus.annotation.*;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.time.LocalDateTime;
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName("customer")
public class Customer implements Serializable {
@TableId(value = "id", type = IdType.AUTO)
private int id;
@TableField("user_id")
private int userId;
@TableField("product_uuid")
private String productUuid;
@TableField("product_count")
private int productCount;
@TableField("rabbit_id")
private String rabbitId;
@TableField("create_time")
private LocalDateTime createTime;
@TableField("enable_flag")
@TableLogic(delval = "0", value = "1")
private String enableFlag;
@TableField("is_payed")
private int isPayed;
}
import com.baomidou.mybatisplus.annotation.*;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.time.LocalDateTime;
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName("product")
public class Product implements Serializable {
@TableId(value = "id", type = IdType.AUTO)
private int id;
@TableField("product_uuid")
private String productUuid;
@TableField("product_name")
private String productName;
@TableField("product_total")
private int productTotal;
@TableField("create_time")
private LocalDateTime createTime;
@TableField("enable_flag")
@TableLogic(delval = "0", value = "1")
private String enableFlag;
}
import lombok.Data;
import java.io.Serializable;
@Data
public class SeckillParams implements Serializable {
private int userId;
private String productUuid;
private int count;
}
import lombok.Data;
import java.io.Serializable;
@Data
public class SeckillPayParams implements Serializable {
private String rabbitId;
}
import lombok.Data;
@Data
public class SeckillProductParams {
private String productName;
private int productTotal;
private String productUuid;
}
(7)数据库表
CREATE TABLE `customer` (
`id` int(0) NOT NULL AUTO_INCREMENT COMMENT '主键',
`user_id` int(0) NULL DEFAULT NULL COMMENT '用户唯一标识',
`product_uuid` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '商品唯一标识',
`product_count` int(0) NULL DEFAULT NULL COMMENT '商品数量',
`rabbit_id` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '发送给RabbitMQ的唯一标识',
`create_time` timestamp(0) NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`enable_flag` char(1) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '1' COMMENT '是否有效',
`is_payed` int(0) NULL DEFAULT 0 COMMENT '是否支付 1已支付 2用户取消 0未支付 3过期取消',
PRIMARY KEY (`id`) USING BTREE
)
CREATE TABLE `product` (
`id` int(0) NOT NULL AUTO_INCREMENT,
`product_uuid` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '商品唯一标识',
`product_name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '商品名称',
`product_total` int(0) NULL DEFAULT NULL COMMENT '库存数量',
`create_time` timestamp(0) NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`enable_flag` char(1) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '1' COMMENT '是否有效',
`update_time` timestamp(0) NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
)
(8)商品服务
新增商品:传入商品数量和商品名称,后台生成商品唯一标识。写入数据库,将商品唯一标识和商品数量存入Redis。
获取商品数量:根据商品唯一标识从Redis获取商品数量。
import com.baomidou.mybatisplus.extension.service.IService;
import gdut.entity.Product;
import gdut.entity.SeckillProductParams;
public interface ProductService extends IService<Product> {
int getProductTotal(SeckillProductParams seckillProductParams);
String insertProduct(SeckillProductParams seckillProductParams);
}
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import gdut.entity.Product;
import gdut.entity.SeckillProductParams;
import gdut.mapper.ProductMapper;
import gdut.service.ProductService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.Random;
import java.util.UUID;
@Service
public class ProductServiceImpl extends ServiceImpl<ProductMapper, Product> implements ProductService {
@Autowired
private RedisTemplate redisTemplate;
public int getProductTotal(SeckillProductParams seckillProductParams) {
String productTotalKey = seckillProductParams.getProductUuid() + ":total";
if(!redisTemplate.hasKey(productTotalKey)) {
return 0;
}
int total = (int) redisTemplate.opsForValue().get(productTotalKey);
return total;
}
public String insertProduct(SeckillProductParams seckillProductParams) {
Product product = new Product();
String uuid = UUID.randomUUID().toString();
product.setProductUuid(uuid);
product.setProductName(seckillProductParams.getProductName());
product.setProductTotal(seckillProductParams.getProductTotal());
this.save(product);
String productTotalKey = uuid + ":total";
redisTemplate.opsForValue().set(productTotalKey, seckillProductParams.getProductTotal());
return uuid;
}
}
(9)用户服务
原子自增/自减商品数量:lua脚本实现多个redis命令的原子操作。
购买商品:判断是否还有库存,有库存,发送消息给RabbitMQ,异步线程写入订单数据到数据库。
商品付款:付款时要保证幂等,只有在订单是未付款的状态才能付款,如果订单已经过期或者用户取消了,都不能付款。
import com.baomidou.mybatisplus.extension.service.IService;
import gdut.entity.Customer;
import gdut.entity.SeckillParams;
import gdut.entity.SeckillPayParams;
import gdut.entity.SeckillProductParams;
public interface CustomerService extends IService<Customer> {
String buyProduct(SeckillParams seckillParams);
boolean incrby(String productUuid, int productCount);
boolean decrby(String productUuid, int productCount);
String payProduct(SeckillPayParams seckillPayParams);
}
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import gdut.basic.SeckillProducer;
import gdut.entity.*;
import gdut.mapper.CustomerMapper;
import gdut.service.CustomerService;
import gdut.service.ProductService;
import io.netty.util.concurrent.CompleteFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Collections;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@Service
@Slf4j
public class CustomerServiceImpl extends ServiceImpl<CustomerMapper, Customer> implements CustomerService {
@Autowired
private ProductService productService;
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private Environment environment;
@Autowired
private SeckillProducer seckillProducer;
public String buyProduct(SeckillParams seckillParams) {
//判断商品是否存在 TODO
//设置用户ID 用于jmeter测试
seckillParams.setUserId(new Random().nextInt(1000));
//扣库存
boolean buy = decrby(seckillParams.getProductUuid(), 1);
if(!buy) {
return "抢光了!";
}
return sendMsg(seckillParams);
}
public String sendMsg(SeckillParams seckillParams) {
String exchangeName = environment.getProperty("rabbitmq.seckill.exchange.name");
String routingKeyName = environment.getProperty("rabbitmq.seckill.routing.key.name");
Customer customer = new Customer();
customer.setUserId(seckillParams.getUserId());
customer.setProductUuid(seckillParams.getProductUuid());
customer.setProductCount(1);
String result = seckillProducer.sendSeckillMsg(exchangeName, routingKeyName, customer);
CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
//向数据库写记录 待付款
save(customer);
}
});
return result;
}
public boolean incrby(String productUuid, int productCount) {
String productTotalKey = productUuid + ":total";
String redisScript = "if redis.call('exists', KEYS[1])" +
" then " +
" redis.call('incrby', KEYS[1], ARGV[1])" +
" return 1 " +
" else " +
" return 0 " +
" end ";
DefaultRedisScript defaultRedisScript = new DefaultRedisScript(redisScript, Boolean.class);
boolean result = (boolean) redisTemplate.execute(defaultRedisScript, Collections.singletonList(productTotalKey), productCount);
return result;
}
public boolean decrby(String productUuid, int productCount) {
String productTotalKey = productUuid + ":total";
String redisScript = "if redis.call('exists', KEYS[1]) and tonumber(redis.call('get', KEYS[1])) > 0" +
" then " +
" redis.call('decrby', KEYS[1], ARGV[1])" +
" return 1 " +
" else " +
" return 0 " +
" end ";
DefaultRedisScript defaultRedisScript = new DefaultRedisScript(redisScript, Boolean.class);
boolean result = (boolean) redisTemplate.execute(defaultRedisScript, Collections.singletonList(productTotalKey), productCount);
return result;
}
@Override
public String payProduct(SeckillPayParams seckillPayParams) {
update(new UpdateWrapper<Customer>().lambda()
.eq(Customer::getRabbitId, seckillPayParams.getRabbitId())
.eq(Customer::getIsPayed, 0)
.set(Customer::getIsPayed, 1));
return "success";
}
}
(9)数据库操作
用到的数据库操作都不复杂,用Mybatis-Plus自带的操作就够了。
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import gdut.entity.Customer;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface CustomerMapper extends BaseMapper<Customer> {
}
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import gdut.entity.Product;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface ProductMapper extends BaseMapper<Product> {
}
(10)启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication(proxyBeanMethods = false)
@ComponentScan("gdut")
public class RabbitMQApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMQApplication.class, args);
}
}
(11)效果展示
新增商品 http://localhost:8089/seckill/insertProduct
{
"productTotal": 10,
"productName": "圆珠笔"
}
获取商品数量 http://localhost:8089/seckill/getProductTotal
{
"productUuid": "3c0e787d-7fcd-44cf-842a-a2014aef1f16"
}
购买商品 http://localhost:8089/seckill/buyProduct
{
"productUuid": "3c0e787d-7fcd-44cf-842a-a2014aef1f16"
}
订单付款 http://localhost:8089/seckill/payProduct
{
"rabbitId":"5c1cb90b-6e2e-47e3-a6e7-e5ab936c19b4"
}
jmeter测试:并发调用购买商品请求,可以实现不会超卖,同时商品未付款,也能回退库存。
3.参考
Windows下的RabbitMq安装(图文教学)
RabbitMq(Erlang环境)安装