智能售货机
- 概述
- 项目
- 使用springcloudalibaba中提供的短信服务
- 图形验证码生成
- 多端登录/网关统一鉴权
- 对象存储服务
- 代码的自动填充
- 微服务集成emq,发送emq
- 工单业务流
- 接收工单
- 拒绝工单
- 运维工单
- 补货工单
- 使用xxl-job进行任务调度
- lkd集成xxl-job
- 自动创建维修工单
- 自动创建补货工单
- 分片广播任务
- c端用户商品列表
- 商品详情
- 公众号/微信号的唯一标识
- 对接微信支付
- 售货机出货并发控制
- 分布式锁
- Lbs基于位置服务
- 新增es数据/新增
- 查询附近的售货机/查询
- 实现订单数据同步 logstash 实现数据同步
- 查询当前用户的历史订单/多条件查询
- 选品智能推荐/分组查询----复杂mysql查询
- 合作商分账查询
- 合作商后台分账查询
- 合作商分账结算
- 订单数据定时汇总
- 对账数据导出
- 折线图
- 商品的导入
- 并发异步编程
- 实现人效统计
- 工单状态按日统计
- 人小排名月度统计
- 销售额趋势图
- 售货机端功能描述
- EMQ安全解决方案
- 嵌入式数据库H2
- 售货机服务搭建
- 商品与货道数据同步
- 商品价格同步
- 出货通知
- 出货上报
- 补偿处理
- 补货协议
MQTT
MQTT(消息队列遥测传输协议),是一种基于发布/订阅模式的“轻量级”通讯协议,构建与tcp/ip协议上
MQTT协议的实现方式
发布者、代理(broker、服务器)、订阅者
MQTT协议传输消息分为:主题、负载
主题可以理解为消息的类型,订阅者订阅消息后,就会搜到主题的消费内容
- 负载可以理解为消费内容,是指订阅者具体要使用的内容
MQTT的消息服务质量
- QoS0:消息最多传递一次,当客户端不可用,消息会丢失--------------------------------->发布者发布消息,发生至broker,发布者删除本地消息,broker发生至订阅者时
- QoS1:消息传递至少一次------------>发布者发布消息,发生至broker,broker保存消息,发送给订阅者并发送给发布者,让发布者删除消息,订阅者接收消息后,发送消息到broker,broker删除消息,否则broker继续发布消息
- QoS2:消息仅传递一次(确保消息到达一次)----------------------->发布者发送消息先存储在发送,发送到Broker,Broker保存消息,broker向发布者发送消息已被接收,发布者向broker发送我已收到消息,broker发送消息到订阅者,broker发送发布者消息我已发送,发布者删除消息,订阅者收到消息进行存储,订阅者告诉broker我收到消息,broker发送我已收到你收到消息到订阅者,订阅者通知上层应用通知broker我已完成消息处理,broker删除存储的消息,订阅者删除消息
emqx
emqx是MQTT消息服务器
目录结构:
发布
订阅消息
发布 / 接收
延迟发布消息
共享订阅
负载均衡,当订阅者相同时,发送相同的订阅者,订阅者负载均衡的收到消息
不带群主的共享订阅 每次发送消息,随机发送给任意一位订阅者
修改策略
带群主订阅
代理订阅
开启
定制规则 自动接收发布者发送的消息
订阅范围 符合规则的可以接收到消息 c·
保留消息
发布者发布消息,将消息保留,订阅者订阅后可以获取到发布者发送的最后一条消息
认证方式 客户端连接时,判断是否正确
HTTP认证
uername / password 是否通过校验
判断当前是否是admin用户,是的话不受acl限制
不是admin用户,判断当前用户是否看一眼发布,订阅
<div id = ‘2’ / >项目介绍
使用cloudalibaba集成的短信服务
图形验证码生成
<dependency>
<groupId>com.github.penggle</groupId>
<artifactId>kaptcha</artifactId>
<version>2.3.2</version>
</dependency>
多端登录 统一鉴权
登录统一判断
管理员登录
- 校验当前用户是否错误
运营/运维人员校验
合作商进行登录
网关统一鉴权 gateway
管理员/运维/运营人员/合作商登录时,都要发返jwt封装的token给前端,如果访问其他信息,前端请求头携带token进行访问,前端访问地址不带token时,进行拦截,前端访问请求带令牌时,进行放行
- 首先根据当前请求,是否存在放行列表,存在则放行,不存在进行拦截,进行后续验证,判断token是否为空,根据手机号+生成策略校验token是否正确,正确放行,错误拦截
MinLo对象存储服务
- 替代阿里云储存服务
- bucket / 目录
- Object / 文件
- Keys / 文件名
特点 :
- 1.依赖
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>7.1.0</version>
</dependency>
- 2.配置中心
- 3.匿名访问策略
代码编写
- 注入实体类对象
- 编写上传文件到MinIo
- controller调用
代码的自动填充
- 公共字段自动填充
2. 定义公共字段
- 实体类继承公共类,完成代码的自动填充
C:\Program Files\Java\jdk-11\
微服务集成emq,发送emq
1. 生产者进行发送
- 配置类 -> 封装了EMQ的连接配置和客户端对象
- 声明发送者,注入mqttclient对象,进行信息的发送
- 发送信息
EMQ消息订阅 接收者
- 消息到达之后制动触发消息
- 订阅主题 ->将文件中订阅的主题进行订阅
3. 将方法存入MqttClient中
消息自动分发处理架构
原理:项目启动时,加载所有的协议,
- 发布
2. 发布到对应的协议进行消息的接收
工单业务流
创建工单
- 对工单进行校验,验证通过则进行对工单表进行插入数据
- 校验工单是否符合规则
3. 判断同一台设下是否有未完成的订单
创建工单
-
是否符合条件
-
创建工单
接收工单
拒绝工单
- controller
- service
投放工单
投放工单完成,发送emq消息到售货机,更改售货机状态
售货机服务接收工单
<div id = ‘11’ / >补货工单
补货工单改变当前售货机的库存数,将设备的商品信息,货道配置信息,补货的容量下发到售货机端。
- 工单判断是否是补货工单,将货道和数量打包成补货协议,指定主题,发送emq
- 售货机微服务订阅主题,接收发送的消息,更新售货机的库存,再次发送补货协议到emq,售货机端接收到消息进行本地的跟更新
判断是否为补货工单
**将数据发送emq **
根据内容中的协议,受到目标信息处理
使用xxl-job进行任务调度
每一个微服务就可以把她看作一个执行器,执行器通过注册线程注册到服务中。
调度器-核心组件:
调度器触发执行器进行操作,(设置某个时间节点执行任务),由调度器执行远程调用 。
调度器和执行器服务进行调用,请求进入调度请求队列,执行任务,调用日志服务,检查实时日志,可以进行回调
使用:
阻塞处理状态
处理器执行不过来时,执行的策略
lkd集成xxl-job
- 加入依赖
- 配置中心加入配置
- 声明配置类,完成配置的自动注入
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* xxl-job config
*
* @author xuxueli 2017-04-28
*/
@Configuration
@Slf4j
public class XxlJobConfig {
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
//@Value("${xxl.job.executor.logpath}")
//private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
log.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
//xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}
- 注册任务,完成调度
自动创建维修工单
- 售货机发生故障故障,将当前信息上报emq,微服务订阅主题接收信息
- 分配给区域内当日分配最少的人
- 每日2点执行定时任务初始化明天的订单信息时,昨天的redis存储的订单信息就会过期
- 封装方法 新增工单 + 1 取消工单 -1
3. 取出当前区域中,工单最少的一位
售货机发生故障后,会主动上报自己的错误信息
设备出现故障,多个微服务都可能要获取发送到emq的错误信息,所以采用带群主的方式进行共享订阅 接状态信息创建维修工单
获取上报的订单信息,校验信息是否正确,根据编号查询售货机信息,调用订单
自动创建补货工单
根据缺货状态,进行自动补货工单的创建
每天定时执行一个定时任务,当低于警戒值时,自动补货工单的下发
系统通过每天的定时任务扫描所有在运营的售货机,只要某售货机任何一个货道商品数量小于等于这个比例,就会为这个售货机自动创建补货工单,把所有的货道全部补满。
售货机扫描任务的编写–根据定时的时间,查询每一个售货机是否缺货
- 添加配置信息
2. 添加依赖
3. 创建定时任务,每日发送缺货订单数据
查询售货机的缺货状态
- 在任务中发送补货工单
发送道工单服务
工单微服务接收服务请求
xxl-job发布补货工单 ,工单服务接收补货数据
创建工单
-
是否符合条件
-
创建工单
分片广播
自动补货工单要扫描所有运营状态的售货机,判断每一个售货机是否缺货,由一个节点对所有的售货机都轮询一遍的话,需要大量的时间,叫做串行,一个接着一个执行
采取并行的方式解决,部署多分售货机微服务,每一个节点扫描一部分,加起来就是全部。
xxl-job:将多个节点注册到xxl-job中,选着分片广播,一次任务调度将会广播触发集群中的所有执行器进行任务的执行,根据分片参数执行分片让任务
修改xxl-job
进行数据的取模运算:
c端用户商品列表
购买商品商品详情
公众号/微信号的唯一标识
** 用于识别用户**
1. 填写与app相关的信息,编写config读取配置信息,完成属性的赋值
2. 编写controller , 获取前端传递的jscode,调用service,完成openid的获取
对接微信支付
- 使用集成的工具包,填写配置类
2. 读取微信支付配置信息
3. 添加微信支付sdk需要实现的配置类WxPaySdkConfig…
内网穿透工具
售货机出货并发控制
判断库存
添加对库存的判断
发送出库通知
处理出货流程 售货机发货,上报当前发货结果,订单/售货机微服务以群主的方式订阅该服务
更新订单状态
减少货道库存
售货机在同一个时间,只能处理一个请求 ,分布式锁
使用consul完成分布式锁的使用
- 封装的实体类
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.kv.model.PutParams;
import com.ecwid.consul.v1.session.model.NewSession;
import com.ecwid.consul.v1.session.model.Session;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.concurrent.*;
@Slf4j
public class DistributedLock{
private ConsulClient consulClient;
private String sessionId;
private final CountDownLatch countDownLatch = new CountDownLatch(1);
/**
*
* @param consulHost consul的Agent主机名或IP
* @param consulPort 端口
*/
public DistributedLock(String consulHost,int consulPort){
consulClient = new ConsulClient(consulHost,consulPort);
}
/**
* 获取锁
* @param lockName 锁的名称(key)
* @param ttlSeconds 锁的超时时间
* @return
*/
public LockContext getLock(String lockName,int ttlSeconds){
LockContext lockContext = new LockContext();
if(ttlSeconds<10 || ttlSeconds > 86400) ttlSeconds = 60;
String sessionId = createSession(lockName,ttlSeconds);
boolean success = lock(lockName,sessionId);
if(!success){
countDownLatch.countDown();
consulClient.sessionDestroy(sessionId,null);
lockContext.setGetLock(false);
return lockContext;
}
lockContext.setSession(sessionId);
lockContext.setGetLock(true);
return lockContext;
}
/**
* 释放锁
*/
private void releaseLock(){
releaseLock(sessionId);
}
public void releaseLock(String sessionId){
countDownLatch.countDown();
consulClient.sessionDestroy(sessionId,null);
}
private String createSession(String lockName,int ttlSeconds){
autoDeregister(ttlSeconds);
NewSession session = new NewSession();
session.setBehavior(Session.Behavior.DELETE);
session.setName("session-"+lockName);
session.setLockDelay(1);
session.setTtl((ttlSeconds+5) + "s"); //锁时长
sessionId = consulClient.sessionCreate(session,null).getValue();
return sessionId;
}
private void autoDeregister(int timeoutSeconds){
ExecutorService executor = Executors.newFixedThreadPool(50);
CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
try{
countDownLatch.await(timeoutSeconds, TimeUnit.SECONDS);
}catch (Exception e){
log.error("释放锁失败",e);
}finally{
this.releaseLock();
}
},executor);
}
private boolean lock(String lockName,String sessionId){
PutParams putParams = new PutParams();
putParams.setAcquireSession(sessionId);
return consulClient.setKVValue(lockName,"lock:"+ LocalDateTime.now(),putParams).getValue();
}
/**
* 锁上下文对象
*/
@Data
public class LockContext{
private String session;
private boolean isGetLock;
}
}
- 调用方法进行加锁
超时订单处理
实现思路:
购买商品时,创建了订单,但是尚未付款乱,当前订单状态为创建中,将当前订单的信息发送到emq中,设置延迟处理时间为10分钟,订单服务订阅该主题,待10分钟后,查询该订单,如果订单的状态还是创建中,则将订单设置为无效状态
emq中的延迟发送
1. 创建订单时,设置延迟发送消息
2. 订单接收延迟发送的消息,并且校验消息是否正确,完成不正确完成订单的更新
Lbs 基于位置服务
投放工单完成后,将坐标保存到es中
- 配置文件设置es的地址
新增es数据##
@Override
public Boolean setVMDistance(VMDistance vmDistance) {
// 根据软编号,查询售货机
var byInnerCode = this.findByInnerCode(vmDistance.getInnerCode()); // 查询售货机
if (byInnerCode == null){
throw new LogicException("该设备编号不存在"+vmDistance.getInnerCode());
}
// 向es存入
IndexRequest request = new IndexRequest("vm");
// 将id存入 (唯一标识)
request.id(vmDistance.getInnerCode());
request.source( // 构造存入es的参数
"addr",byInnerCode.getNode().getAddr(), // 位置信息
"innerCode",byInnerCode.getInnerCode(), // 售货机软编号
"nodeName",byInnerCode.getNode().getName(),//点位名称
"location",vmDistance.getLat()+","+vmDistance.getLon(),// 拼接经/纬度
"typeName",byInnerCode.getType().getName()); // 类型名
try {
client.index(request, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("保存售货机位置信息失败",e);
return false ;
}
//前端传递的经纬度,赋值给售货机对象/向数据库存入
byInnerCode.setLatitude(vmDistance.getLat());
byInnerCode.setLongitudes(vmDistance.getLon());
this.updateById(byInnerCode);
return true;
}
搜索附近的售货机
- 定义前端传递的视图模型
- 定义后端响应数据
3. 定义service方法,根据条件查询售货机
4. 编写controller,调用service
5. 实体服务加入fegin进行,小程序服务进行远程调用,使用fegin进行远程调用
- 小程序微服务进行远程调用
实现订单数据同步 logstash 实现数据同步
对订单数据进行和统计
搭建
代码实现
- 编写ElasticSearch对应的订单索引库
PUT order
{
"mappings": {
"properties": {
"sku_name": {
"type": "keyword"
},
"region_name": {
"type": "keyword"
},
"business_name": {
"type": "keyword"
},
"inner_code": {
"type": "keyword"
},
"node_name": {
"type": "keyword"
},
"order_no": {
"type": "keyword"
},
"third_no": {
"type": "keyword"
},
"addr": {
"type": "text"
},
"open_id": {
"type": "keyword"
},
"pay_type": {
"type": "keyword"
}
}
}
}
- 编写文件
input {
stdin { }
jdbc {
#设置jdbc的数据库连接字符串
jdbc_connection_string => "jdbc:mysql://192.168.200.128:3306/lkd_order?serverTimezone=Asia/Shanghai"
#设置数据库的用户名
jdbc_user => "root"
#设置数据库的密码
jdbc_password => "root123"
#设置数据程序的驱动jar包路径
jdbc_driver_library => "/resource/mysql-connector-java-8.0.18.jar"
#设置驱动类
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
#设置数据库的日期时间格式
jdbc_default_timezone => "Asia/Shanghai"
#设置记录上次的值
record_last_run => "true"
#是否指定追踪的字段
use_column_value => true
#追踪的字段
tracking_column => "update_time"
#追踪列的类型
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/last_values/order_info.txt"
clean_run => "false"
#开启分页查询
jdbc_paging_enabled => true
#分页大小
jdbc_page_size => "5000"
statement => "SELECT * FROM tb_order where update_time > :sql_last_value order by update_time asc"
#设置运行周期
schedule => "*/2 * * * * *"
}
}
output {
stdout {
#设置以json的格式输出
codec => json_lines
}
#设置输出到elasticsearch
elasticsearch {
hosts => "192.168.200.128:9200"
#输出到的索引
index => "order"
#索引文档的id取采集源中的id列的值
document_id => "%{id}"
}
}
#过滤器配置
filter {
ruby{
#转换update_time,这样符合北京时间。通过event.get取出字段update_time中的值加8小时,然后再通过event.set将得到的值设置回update_time字段
code => "event.set('update_time',event.get('update_time').time.localtime + 8*60*60)"
}
ruby{
#转换create_time,这样符合北京时间
code => "event.set('create_time',event.get('create_time').time.localtime + 8*60*60)"
}
}
查询当前用户的历史订单
- 订单微服务加入依赖
<!--es相关依赖-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.7.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.7.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.7.1</version>
</dependency>
<!--es相关依赖结束-->
-
订单微服务加入配置信息
-
多条件查询
4.编写controller,调用service
5. 编写feight,进行远程调用
6. app微服务实现远程调用
选品智能推荐
加入时间范围进行选品
考虑商圈
当用户要往某商圈下投放售货机时,系统能够通过点位的商圈属性给出同商圈下近3个月内销量最好的前10个商品
当前结果解析
合作商分账结算
lt ge
订单数据定时汇总
根据定时任务完成昨天对今天的任务汇总 。
lt ge
合作商分账查询
lt ge
合作商后台分账查询
对账数据导出 <id =‘26’ />
- 查询导出的数据
2. 定义导出数据的对象
3. 编写controller,编写导出方法 9
折线图<div id = ''27 " />1. 定义实体类
定义xy轴的实体对象
2. 根据前端传递的值进行查询,查询出对应的数据,赋值修改为xy轴的实体对象
商品的导入
- 创建导入对象的实体类
2. controller进行读取,监听器监听消息,完成方法的调用
通用插入类的使用
注意:通用注入也需要实体类 / controller 调用方法
并发异步编程
实现人效统计
在数据进行统计时,需要查询多个表,如果定义多个接口的话,接口数量太多,不利于维护,如果定义一个接口,一个接口要执行多个方法,整体响应时间较长,影响客户体验,使用CompletableFuture,后端同时执行8个结果的查询
@Test
public void Funture() throws ExecutionException, InterruptedException {
var aFuture = CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});
var bFuture = CompletableFuture.supplyAsync(()->{
return 2;
});
var cFuture = CompletableFuture.supplyAsync(()->{
return 3;
});
//并行处理
CompletableFuture
.allOf(aFuture,
bFuture,
cFuture)
.join();
//取值
var a= aFuture.get();
var b= bFuture.get();
var c= cFuture.get();
System.out.println(a);
System.out.println(b);
System.out.println(c);
}
- 构建多个多并发方法
2. 执行定义的方法 , 完成并发编程
3. 获取方法值,完成对象的赋值
用户工作量详情
这个要写在user服务中
- 使用fegin进行远程调用
- controller调用service方法
调用的page方法
工单状态按日统
实现步骤
- 在订单微服务,引入依赖,配置中心加入配置
- 编写配置类,定义方法
方法的实现
工单当前状态统计
人小排名月度统计<div= ‘32’ />
根据userid进行总数据查询
销售额趋势图
销售数据
1是按照天份进行显示 2是按照月份进行显示
销售额分布 es
售货机端功能分析
售货机端口同样也是微服务系统,部署在每一个售货机上。
售货机本地存储了商品、货道、缺货数量等数据。
售货机端的技术方案
商品表 tb_sku
字段名 | 字段类型 | 字段长度 | 字段描述 |
---|---|---|---|
sku_id | VARCHAR | 64 | 商品ID |
sku_name | VARCHAR | 64 | 商品名称 |
image | VARCHAR | 64 | 图片地址 |
price | int | 11 | 商品原价 |
real_price | int | 11 | 商品真实售价 |
class_id | int | 11 | 商品类别Id |
class_name | VARCHAR | 64 | 商品类别名称 |
discount | int | 1 | 是否打折 |
unit | VARCHAR | 32 | 商品净含量 |
index | int | 11 | 商品排序索引 |
货道表 tb_channel
字段名 | 字段类型 | 字段长度 | 字段描述 |
---|---|---|---|
channel_id | VARCHAR | 64 | 货道id |
sku_id | VARCHAR | 64 | 商品id |
capacity | int | 11 | 库存 |
版本信息表 tb_version
字段名 | 字段类型 | 字段长度 | 字段描述 |
---|---|---|---|
version_id | int | 11 | 版本ID |
channel_version | int | 11 | 货道版本 |
sku_version | int | 11 | 商品版本 |
sku_price_version | int | 11 | 商品价格版本 |
出货记录表 tb_vendout_order
order_no | VARCHAR | 64 | 出货订单号 |
---|---|---|---|
pay_type | int | 11 | 支付方式 |
channel_id | VARCHAR | 64 | 货道id |
sku_id | VARCHAR | 64 | 商品id |
pay_price | int | 11 | 价格 |
out_time | TIMESTAMP | 售货机出货时间 | |
result_code | int | 11 | 出货结果编号 |
出货结果编号 0-成功,1-货道售空,2-设备故障,3-机器出货中,4-连续支付,5-服务器超时
EMQ安全解决方案
在真实的生产环境中,EMQ是可以公网访问连接的。如果不进行访问控制,任何设备或终端都可以连接EMQ进行通讯,那必然会有很大的安全隐患。
采用ACL,放问控制列表,为EMQ提供安全的解决方案。通过开启ACL插件,可以实现连接的认证、超级用户的判断以及对发布订阅控制。
使用缓存提高emq的性能
代码实现 通过方法校验,是否能连接emq
- 连接ACL
先判断当前是否具备连接条件,通过连接验证的,都是符合规则的
3. 判断是不是超级用户 对符合的条件进行再次验证,mqtt/moitor 是超级管理员,跳过认证,每个售货机还需要继续认证
-
客户端可拥有“超级用户”身份,超级用户拥有最高权限不受 ACL 限制。
-
认证鉴权插件启用超级用户功能后,发布订阅时 EMQ X 将优先检查客户端超级用户身份
-
客户端为超级用户时,通过授权并跳过后续 ACL 检查
- 发布订阅控制
控制售货机的订阅消息与发布消息
嵌入式数据库H2
JDBC方式操作 h2 快速入门
Mybatis的方法操作h2数据库
- springboot的启动类–包扫描
- service 和 实体类
- Test执行新增操作
售货机服务搭建
- 工程中加入h2的数据表信息
- 集成emqx
- (1)在lkd_client工程pom.xml 加入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!--emq 客户端包-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.11</version>
</dependency>
(2)在lkd_client 的application.yml中添加
(3)com.lkd.client.config包下新建EmqConfig 类
(4)com.lkd.client.emq下新建EmqClient
(5)新建回调处理类EmqMsgCallback
- 消息协议封装
(1)封装了各种协议的枚举对象
(2)各种协议的数据封装对象,从emq/msg包下拷贝所有类
商品与货道数据同步
** 售货机启动时候根据当前售货机记录的版本信息,请求服务器端,服务器会最新货道,商品信息信息发送给售货机**
初始化------向服务器发送版本信息
(1)当工程启动连接到emq服务器成功后,读取售货机本机tb_version信息发送信息给 服务器
## 向服务器发送版本信息
(2)售货机在EmqMsgCallback的messageArrived方法中监听服务器返回信息
(3)售货机同步货道信息时候直接删除原有tb_channel信息,再插入最新的货道信息,然后修改tb_version中货道版本信息
(4)售货机同步商品信息直接删除原有tb_sku信息,插入最新的商品信息,然后修改tb_version中商品版本信息
商品价格同步
EmqMsgCallback 中添加msgType=skuPrice 类型监控,收到价格变动通知后,修改tb_sku 中价格信息,同时修改tb_version中skuPriceVersion 版本信息
出货通知]<div id = ‘40’ / >
(1)整个流程对于售货机,接受服务器出货通知,调用硬件出货(省略),然后记录出货信息,上报服务器
(2)记录出货信息目的在于防止售货机宕机还没来及上报,这样售货机和服务器数据就不一致了
(3)出货时候,一个商品可能存在多个货道,可以随机获取一个货道或者从最多商品货道出货即可,出货需要对 tb_channel的capacity字段-1
- 封装emq出货对象
2. 接收协议内容,并修改
- 协议中传输的内容不为空时,赋值进行,保存出货信息,硬件出完货 本地数据库修改库存记录
@Override
public void vendoutReq(VendoutReq vendoutResp) {
if(vendoutResp.getVendoutData()!=null){
VendoutOrder vendoutOrder=new VendoutOrder();
String skuId= vendoutResp.getVendoutData().getSkuId();
vendoutOrder.setSkuId(skuId);
BeanUtils.copyProperties(vendoutResp.getVendoutData(),vendoutOrder);
vendoutOrder.setOutTime(LocalDateTime.now());
try {
//硬件出完货 本地数据库修改库存记录
QueryWrapper<Channel> channelQueryWrapper=new QueryWrapper<Channel>();
//查询还存在该商品货道信息,并且按照容量降序
channelQueryWrapper.lambda().eq(Channel::getSkuId,skuId).ge(Channel::getCapacity,1).
orderByDesc(Channel::getCapacity);
List<Channel> channelList= channelMapper.selectList(channelQueryWrapper);
if(channelList==null||channelList.isEmpty()){
//货道没有商品,出货失败
vendoutOrder.setResultCode(1); //货道为空
vendoutOrder.setSuccess(false);
vendoutOrderMapper.insert(vendoutOrder);
return;
}
//出货前先把货道容量-1
Channel channel= channelList.get(0);
channel.setCapacity(channel.getCapacity()-1);
channelMapper.updateById(channel);
//todo :调用串口进行出货。
log.info("vendoutOrder {} ",vendoutOrder);
//出货成功
vendoutOrder.setResultCode(0);
vendoutOrder.setChannelId(channel.getChannelId());
vendoutOrder.setSuccess(true);
} catch (Exception e) {
log.info("出货失败",e);
vendoutOrder.setResultCode(2); //硬件错误
vendoutOrder.setSuccess(false);
}
vendoutOrderMapper.insert(vendoutOrder);
}
}
出货上报
服务端正确收到了消息,删除售货机本地的记录
补偿处理
上报过程中如果突然宕机或者网络抖动,上报会出现各种异常,这样服务器端和售货机端数据会不一致,可以在售货机启动时候扫描tb_vendout_order,从新上报,做补偿处理
补货协议<div id = ‘43’ / >