Redis消息队列与thinkphp/queue操作

news2024/12/28 19:49:20

业务场景

场景一

用户完成注册后需要发送欢迎注册的问候邮件、同时后台要发送实时消息给用户对应的业务员有新的客户注册、最后将用户的注册数据通过接口推送到一个营销用的第三方平台。

遇到两个问题:
  1. 由于代码是串行方式,流程大致为:开启数据库事务回滚->数据入库准备->发邮件->发实时消息->推送第三方平台->提交写入数据库。但是后续的3个步骤任意一个流程出了问题都会影响用户的注册结果。
  2. 发送邮件使用的不是成熟的第三方产品,而是利用phpmaile自写代码实现的,然而这个过程耗时相对较长且偶尔有失败的情况;另外通过接口推送注册用户数据到的第三方平台是一个国外的产品接口通讯时间很长且一样有失败的情况。
以上两个问题就会导致用户的注册交互流程时间很长产品体验感非常差;且发送邮件、发送消息、推送数据任意一个步骤由于特殊情况导致执行失败都不能终止用户注册这样就只能通过日志捕获相应的失败情况。

场景二

用户在Shopify平台(一个跨境电商平台)付款下单后,商家会将订单同步到我的系统中,在我的系统中完成询价、报价、付款后我需要再将订单数据推送到第三方配货发货的平台。平台发货完成后通过设置好的回调地址通知我的系统发货的物流信息数据,我需要将物流信息数据存入到我的数据库后再将物流信息同步给Shopify平台用以展示给真实下单用户查看物流轨迹。

遇到一个问题:
  1. 正常情况下的回调代码逻辑是将物流信息写入数据库,再同步物流数据给Shopify。但是由于各种原因后者(同步物流数据给Shopify)有一定概率会失败。
这样就出现了我系统内成功展示了物流信息而Shopify反馈没有成功同步物流轨迹的订单出现。而回调又是一次性的我只能自查数据库进行回补。

英雄登场(消息队列Redis)

官方介绍:消息队列中间件是大型系统中的重要组件,已经逐渐成为企业系统内部通信的核心手段。它具有松耦合、异步消息、流量削峰、可靠投递、广播、流量控制、最终一致性等一系列功能,已经成为异步RPC的主要手段之一。

Redis安装

我用的宝塔安装的方便快捷,软件商品搜索Redis然后点击系统对应php版本的立即前往
在这里插入图片描述
再后续弹窗中安装redis扩展即可
在这里插入图片描述
后续Redis中的队列数据也可以通过宝塔进行查看:
在这里插入图片描述

thinkphp/queue

扩展这个内置了 Redis、Database、Topthink、Sync四种驱动,这里我用的Redis。think-queue 队列消息可以进行任务的发布、获取、执行、删除、重新发布、延迟发布、超时控制等操作。

thinkphp/queue引入扩展

composer require topthink/think-queue

thinkphp/queue配置文件

我使用的是TP5要再application/extra目录下新增queue.php文件,文件内容如下(视各自情况调整哈):

return [
    'connector'  => 'Redis', // 驱动类型
    'expire'     => 60, // 任务的过期时间,默认为60秒; 如果任务执行时间超过此时间将会被认为是过期,将不会被执行
    'default'    => 'default', // 默认的队列名称
    'host'       => '127.0.0.1', // Redis 主机地址
    'port'       => 6379, // Redis 端口
    'password'   => '', // Redis 密码
    'select'     => 0, // 使用哪一个 Redis 数据库
    'timeout'    => 0, // 连接超时时间
    'persistent' => false, // 是否长连接
];

解决方案(注册部分)

引入消息队列后就是将原来串行方式改为并行,用户注册逻辑代码中关于后三个步骤只要单纯的推送队列即可。而后三者采用并行方式(也就是异步)执行对应的逻辑。这样既提高了注册的速度又可以通过队列将出错的数据多次执行提高成功率

注册逻辑代码

	public static function doSaveRegister($postParam)
    {
        db()->startTrans();
        try {
            $first_name = trim(outputstr($postParam, "first_name"));
            $last_name = trim(outputstr($postParam, "last_name"));
            $email = trim(outputstr($postParam, "email"));
            $password = trim(outputstr($postParam, "password"));

           	//注册部分就展示部分代码了
           	
            $info = new UserModel();
            $info->id = Uuid::uuid4();
            $info->number = createUserNumber();
            $info->short_name = substr($email, 0, strripos($email, "@"));
            $info->email = $email;
            if ($info->save() === false) {
                throw new Exception('Operation error!');
            }
            
            //发送用户注册成功的问候邮件,将要发送邮件的邮箱推送到消息队列
            $result = RedisUtils::redisQueueSendForSendRegisterWelcomeEmail('common\job\UserRegisterJob@sendRegisterSuccessWelcomeEmail', $email);
            if ($result['success'] === false) {
                throw new Exception('redis queue error!');
            }

			//这里只展示发送邮件的代码示例其他都是一样的道理

            db()->commit();
            $result = result_success('Register successful!');
        } catch (Exception $e) {
            db()->rollback();
            $result = result_error($e->getMessage());
        }
        return $result;
    }

推送发送邮件消息队列(生产者)

	/**
     * 用户注册成功需要发送问候邮件的用户数据加入队列
     * @param string $job 处理该任务的任务名
     * @param string $data 加入队列的数据-邮箱号
     * @param string $queue_name 队列名,可以不写
     */
    public static function redisQueueSendForSendRegisterWelcomeEmail($job, $data, $queue_name = 'user_register_email')
    {
        //此处做了延时推送,原因是邮件服务是自己写程序实现的避免高并发导致发送失败,所以延时推送一下
        $isPushed = Queue::later(5, $job, $data, $queue_name);
        if ($isPushed !== false) {
            $result = result_success('队列加入成功');
        } else {
            $result = result_error('队列加入失败');
        }
        return $result;
    }

消息队列处理逻辑(消费者)

<?php

namespace common\job;

use common\utils\email\EmailUtils;
use common\utils\gateway\GatewaysUtils;
use common\utils\log\LoggerUtils;
use common\utils\systemMessage\SystemMessageUtils;
use GuzzleHttp\Client;
use GuzzleHttp\Exception\ClientException;
use think\queue\Job;

/**
 * 处理所有用户注册方面的队列数据,代码逻辑写在这里,运行方式是命令行执行的
 * Class UserRegister
 * @package common\job
 */
class UserRegisterJob
{
    /**
     * 处理发送用户注册成功邮件的队列
     * @param Job $job
     * @param string $data 要发送邮件的邮箱
     */
    public function sendRegisterSuccessWelcomeEmail(Job $job, $data)
    {
        $result = EmailUtils::sendUserRegisterSuccessEmail($data);
        if ($result['success'] === false) {
            //判断一下发送失败的次数,超过3次剔除队列
            $attempts = $job->attempts();
            if ($attempts > 3) {
                //发送失败,写进日志,邮件通知开发者
                $message = '新用户注册发送问候邮件失败,程序错误内容:' . $result['msg'] . ',数据源:' . $data;
                LoggerUtils::systemErrorLog()->info($message);
                EmailUtils::sendSystemErrorEmailToDeveloper($message);
                $job->delete();
            }
        } else {
            //发送成功,剔除队列
            $job->delete();
        }
    }
}

启动队列监听

进入项目根目录执行

php think queue:work --queue 队列名1,队列名2

多个队列可以用逗号拼接一次性监听

这个进行一般都要后台运行且开机自启动,自己写的脚本如下:

#!/bin/bash
#启动Redis队列监听
cd /www/wwwroot/english-e-commerce/ && php think queue:listen --queue user_register_email,user_register_workman_message,sync_user_to_tidio,order_sync_mabang_track,fulfillment_shopify_order &

开机启动方法根据不同linux系统有很多种此处不做记录

不喜勿喷,也是初学。记录一下方便后面查找

参考链接

  • ThinkPHP 使用 think-queue 实现 redis 消息队列(超详细)
  • 消息队列使用的四种场景介绍

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1544005.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

关于网格数据导出指定格式的测试(以Gmsh导出nas格式为例)

本文主要讲述Gmsh如何导出nas格式的网格数据&#xff0c;众所周知&#xff0c;Gmsh可以导出多种网格数据格式&#xff0c;比如大家熟悉的msh、stl、inp、cgns&#xff08;似乎不完善&#xff09;等等&#xff0c;但是gmsh不支持nas格式的导出&#xff0c;只支持nas格式的导入&a…

基于java+springboot+vue实现的图书借阅系统(文末源码+Lw+ppt)23-328

摘 要 伴随着我国社会的发展&#xff0c;人民生活质量日益提高。于是对系统进行规范而严格是十分有必要的&#xff0c;所以许许多多的信息管理系统应运而生。此时单靠人力应对这些事务就显得有些力不从心了。所以本论文将设计一套“期待相遇”图书借阅系统&#xff0c;帮助商…

Harmony(鸿蒙)Stage模型综述

设计思想 ​Stage模型的设计&#xff0c;是为了提供给开发者一个更好的开发方式&#xff0c;更好的适用于多设备、分布式场景。 ​Stage模型的设计思想如下图所示。 ​Stage模型的设计基于如下三个出发点&#xff1a; 应用进程的有序管理 随着设备的内存越来越大&#xff0…

【docker】查看并拷贝容器内文件

一、查询容器 查询所有容器 docker ps查询名为os11的容器 docker ps | grep os11查询名为os11的容器&#xff08;包含不运行的&#xff09; docker ps -a| grep os11 docker ps [option] 显示结果介绍如下&#xff1a; 参考&#xff1a;[https://blog.51cto.com/u_15009374/31…

详解华为软件研发管理IPD

IPD,即集成产品开发(Integrated Product Development),是一种综合多种管理模型和理论、企业最佳实践的管理体系。旨在帮助企业快速适应市场变化,缩短产品上市时间,减少资源浪费,并提高生产力,以实现商业成功。 IPD的核心是跨部门团队的合作,涉及市场、研发、制造、服…

java项目将静态资源中的文件转为浏览器可访问的http地址

新增一个类叫啥无所谓&#xff0c;主要是实现 WebMvcConfigurer 加上注解 Configuration项目启动时加入bean中 只操作addResourceHandlers这一个方法 其他都没用 文章下方附带一个简易的上传图片代码 package cn.exam.config;import org.springframework.context.annotati…

Java代码基础算法练习-字符串反转-2024.03.25

任务描述&#xff1a; 输入一个字符串&#xff0c;然后将此字符串反转&#xff08;字符串最长不超过25个字符&#xff09; 任务要求&#xff1a; 代码示例&#xff1a; package M0317_0331;import java.util.Scanner;public class m240325_1 {public static void main(String…

Mysql锁及适用场景

一、mysql中的锁有哪些&#xff1f; 1.1 锁的类型 &#xff08;1&#xff09;共享锁&#xff08;Shared Lock&#xff09;&#xff1a; 共享锁允许事务读取数据&#xff0c;但不允许其他事务修改数据。多个事务可以同时持有共享锁。 -- 事务A获取共享锁 START TRANSACTION; …

Matlab|基于模型预测控制(MPC)的微电网调度优化的研究

目录 1 主要内容 2 程序难点及问题说明 3 部分程序 4 下载链接 1 主要内容 该程序分为两部分&#xff0c;日前优化部分——该程序首先根据《电力系统云储能研究框架与基础模型》上面方法&#xff0c;根据每个居民的实际需要得到响应储能充放电功率&#xff0c;优化得到整…

网络通信VLAN学习篇

拓扑图 如上图&#xff0c;pc3&#xff0c;pc5同一网络&#xff0c;pc4&#xff0c;pc6同一网络&#xff0c;vlan的划分就是虚拟局域网&#xff0c;局域网的理解就是同一vlan下的设备可以相互通信&#xff0c;不同vlan不可以通信&#xff08;通过三层交换机可以实现通信的&…

一分钟学习Markdown语法

title: 一分钟学习Markdown语法 date: 2024/3/24 19:33:29 updated: 2024/3/24 19:33:29 tags: MD语法文本样式列表结构链接插入图片展示练习实践链接问题 欢迎来到Markdown语法的世界&#xff01;Markdown是一种简单而直观的标记语言&#xff0c;让文本排版变得轻松有趣。接下…

RK3568驱动指南|第十三篇 输入子系统-第154章 固定usb设备的设备节点实验

瑞芯微RK3568芯片是一款定位中高端的通用型SOC&#xff0c;采用22nm制程工艺&#xff0c;搭载一颗四核Cortex-A55处理器和Mali G52 2EE 图形处理器。RK3568 支持4K 解码和 1080P 编码&#xff0c;支持SATA/PCIE/USB3.0 外围接口。RK3568内置独立NPU&#xff0c;可用于轻量级人工…

Linux调试器-gdb

一、背景 程序的发布方式有两种&#xff0c;debug模式和release模式 debug模式&#xff1a;编译器形成可执行程序的时候会给可执行程序添加调试信息 程序员调试时使用debug模式&#xff0c;而release模式用于测试 而gcc/g默认编译&#xff0c;采用release模式 用gcc/g使用…

windows11 openssh服务开启;第三方ping不通局域网windows电脑;ssh连接内部ubuntu系统

参考&#xff1a;https://blog.csdn.net/2301_77554343/article/details/134328867 1、windows11 openssh开启 1&#xff09;我这边可选功能在设置-系统里面&#xff1b;其他网上看在应用下&#xff1b;添加可选openssh服务器安装 2&#xff09;安装后打开&#xff0c;管理员…

Python对文件的常见操作用法

在 Python 中&#xff0c;操作文件通常涉及打开文件、读取内容、写入内容以及关闭文件等步骤。下面是一些常见的文件操作及其示例&#xff1a; 1. 打开文件 使用内置的 open() 函数来打开文件。这个函数接受两个主要参数&#xff1a;文件名和模式。模式指定了文件应如何打开&a…

前端-html-01

1.HTML的标签分类 1.1常用排版标签 标签名语义和功能属性单标签还是双标签h1 ~ h6一级标题~六级标题无双标签p段落无双标签hr分隔线无单标签br换行无单标签pre原格式显示无双标签div无语义&#xff0c;用于页面布局无双标签 1.1.1h标题标签 <!DOCTYPE html> <htm…

nav仿真(2)

开启仿真和建图 打开第一个窗口启动仿真&#xff1a; source devel/setup.bash export TURTLEBOT3_MODELburger roslaunch turtlebot3_gazebo turtlebot3_world.launch # 启动仿真打开第二个窗口&#xff0c;开始建图&#xff1a; source devel/setup.bash export TURTLEBOT3_…

Nginx 内存池

目录 零、基本框架 一、基础结构 二、对外接口 三、函数实现 1、ngx_create_pool 2、ngx_destroy_pool 3、ngx_reset_pool 4、ngx_palloc 5、ngx_pnalloc 6、ngx_pmemalign 7、ngx_pfree 8、ngx_pcalloc 9、ngx_pool_cleanup_add 10、ngx_pool_run_cleanup_file…

【spring】@Component注解学习

Component介绍 Component 是 Spring 框架中的一个注解&#xff0c;用于将一个类标记为 Spring 上下文中的一个组件。当一个类被标记为 Component 时&#xff0c;Spring 容器会在启动时自动扫描并实例化这个类&#xff0c;并将其注册到 Spring 上下文中。 Component 注解可以用…

Gorm连接Mysql数据库及其语法

Gorm连接Mysql数据库及其语法 文章目录 Gorm连接Mysql数据库及其语法前期工作找到Gorm的github项目简单了解相关MySQL语法 启动数据库定义数据库模型注意点Gorm Model定义结构体标签(tag)支持的结构体标记&#xff08;Struct tags&#xff09;关联相关标记&#xff08;tags&…