Thinkphp6使用RabbitMQ消息队列

news2025/1/23 6:17:25

Thinkphp6连接使用RabbitMQ(不止tp6,其他框架对应改下也一样),如何使用Docker部署RabbitMQ,在上一篇已经讲了->传送门<-。

部署环境

开始前先进入RabbitMQ的web管理界面,选择Queues菜单,点击底部的Add a new queue,新建一个test的队列。

安装thinkphp6框架

composer create-project topthink/think tp 

安装workerman扩展

composer require topthink/think-worker

安装rabbitmq扩展

composer require workerman/rabbitmq

代码编写

生产者

  • 在app目录下新建workerman目录,并在其下创建Send.php文件,$options数组中的host地址改成你的rabbitmq地址。
<?php
 
namespace app\workerman;
use Bunny\Channel;
use Workerman\RabbitMQ\Client;
use think\worker\Server;
class Send extends Server
{
    //websocket地址,一会用于测试。
    protected $socket = 'websocket://127.0.0.1:2345';
 
    /**
     * 收到信息
     * @param $connection
     * @param $data
     */
    public function onMessage($connection, $data)
    {
        //websocket发送过来的消息
        $connection->send('我收到你的信息了:'.$data);
        //rabbitMQ配置
        $options = [
            'host'=>'127.0.0.1',//rabbitMQ IP
            'port'=>5672,//rabbitMQ 通讯端口
            'user'=>'admin',//rabbitMQ 账号
            'password'=>'123456'//rabbitMQ 密码
        ];
        (new Client($options))->connect()->then(function (Client $client) {
            return $client->channel();
        })->then(function (Channel $channel) {
            /**
             * 创建队列(Queue)
             * name: ceshi         // 队列名称
             * passive: false      // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建
             * durable: true       // 是否持久化,设置false是存放到内存中RabbitMQ重启后会丢失,
             *                        设置true则代表是一个持久的队列,服务重启之后也会存在,因为服务会把持久化的Queue存放在硬盘上,当服务重启的时候,会重新加载之前被持久化的Queue
             * exclusive: false    // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除
             *  auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
             */
            return $channel->queueDeclare('ceshi', false, true, false, false)->then(function () use ($channel) {
                return $channel;
            });
        })->then(function (Channel $channel) use($data){
            echo "发送消息内容:".$data."\n";
 
            /**
             * 发送消息
             * body 发送的数据
             * headers 数据头,建议 ['content_type' => 'text/plain'],这样消费端是springboot注解接收直接是字符串类型
             * exchange 交换器名称
             * routingKey 路由key
             * mandatory
             * immediate
             * @return bool|PromiseInterface|int
             */
 
            return $channel->publish($data, ['content_type' => 'text/plain'], '', 'ceshi')->then(function () use ($channel) {
                return $channel;
            });
        })->then(function (Channel $channel) {
            //echo " [x] Sent 'Hello World!'\n";
            $client = $channel->getClient();
            return $channel->close()->then(function () use ($client) {
                return $client;
            });
        })->then(function (Client $client) {
            $client->disconnect();
        });
    }
 
    /**
     * 当连接建立时触发的回调函数
     * @param $connection
     */
    public function onConnect($connection)
    {
 
    }
 
    /**
     * 当连接断开时触发的回调函数
     * @param $connection
     */
    public function onClose($connection)
    {
 
    }
    /**
     * 当客户端的连接上发生错误时触发
     * @param $connection
     * @param $code
     * @param $msg
     */
    public function onError($connection, $code, $msg)
    {
        echo "error $code $msg\n";
    }
 
    /**
     * 每个进程启动
     * @param $worker
     */
    public function onWorkerStart($worker)
    {
 
 
    }
}
  • 在config/worker_server.php中设置worker_class值为'app\workerman\Send'

  • 启动这个生产者

php think worker:server
方式1:通过tcp发送数据
  • 发送数据
    通过在线网址发送数据(websocket方式),->传送门<-
    输入【ws://127.0.0.1:2345】后点击发送数据!
    在这里插入图片描述
  • 前往rabbitMQ控制台查看
    在这里插入图片描述
    至此,生产这一步就走完了,那么如果我不想通过websocket方式,想用tcp方式生产数据怎么办?
方式2:通过tcp发送数据

接口给内置服务器发消息->内置服务去发消息给rabbitMQ

  • 将Send.php中websocket:127.0.0.1改成tcp:127.0.0.1
  • 重启服务
  • 把controller目录中Index.php修改为以下内容
<?php
namespace app\controller;
 
use app\BaseController;
 
class Index extends BaseController
{
    public function index(string $msg)
    {
        //连接本地tcp服务
        $client = stream_socket_client('tcp://127.0.0.1:2345', $errno, $errmsg, 1);
        //发送字符串
        fwrite($client, $msg."\n");
        //断开服务
        fclose($client);
        return 'OK';
    }
 
}
  • 用Postman访问对应接口就好,也会有数据进入队列

消费者

同生产者一样新创建一个thinkphp6项目,注意端口别和生产者冲突!这里我设置的是2346端口

  • 在app目录下新建workerman目录,并在其下创建Receive.php文件,$options数组中的host地址改成你的rabbitmq地址。
<?php
 
namespace app\workerman;
use Bunny\Channel;
use Bunny\Message;
use Workerman\RabbitMQ\Client;
use think\worker\Server;
class Receive extends Server
{
    protected $socket = 'tcp://127.0.0.1:2346';
 
    /**
     * 收到信息
     * @param $connection
     * @param $data
     */
    public function onMessage($connection, $data)
    {
 
    }
 
    /**
     * 当连接建立时触发的回调函数
     * @param $connection
     */
    public function onConnect($connection)
    {
 
    }
 
    /**
     * 当连接断开时触发的回调函数
     * @param $connection
     */
    public function onClose($connection)
    {
 
    }
    /**
     * 当客户端的连接上发生错误时触发
     * @param $connection
     * @param $code
     * @param $msg
     */
    public function onError($connection, $code, $msg)
    {
        echo "error $code $msg\n";
    }
 
    /**
     * 每个进程启动
     * @param $worker
     */
    public function onWorkerStart($worker)
    {
        //rabbitMQ配置
        $options = [
            'host'=>'127.0.0.1',//rabbitMQ IP
            'port'=>5672,//rabbitMQ 通讯端口
            'user'=>'admin',//rabbitMQ 账号
            'password'=>'123456'//rabbitMQ 密码
        ];
        (new Client($options))->connect()->then(function (Client $client) {
            return $client->channel();
        })->then(function (Channel $channel) {
            /**
             * 创建队列(Queue)
             * name: ceshi         // 队列名称
             * passive: false      // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建
             * durable: true       // 是否持久化,设置false是存放到内存中RabbitMQ重启后会丢失,
             *                        设置true则代表是一个持久的队列,服务重启之后也会存在,因为服务会把持久化的Queue存放在硬盘上,当服务重启的时候,会重新加载之前被持久化的Queue
             * exclusive: false    // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除
             *  auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
             */
            return $channel->queueDeclare('ceshi', false, true, false, false)->then(function () use ($channel) {
                return $channel;
            });
        })->then(function (Channel $channel) {
            echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
            $channel->consume(
                function (Message $message, Channel $channel, Client $client) {
                    echo "接收消息内容:", $message->content, "\n";
                },
                'ceshi',
                '',
                false,
                true
            );
        });
 
    }
}
  • 在config/worker_server.php中设置worker_class值为'app\workerman\Receive',并将端口改为2346

  • 启动这个消费者

php think worker:server

到这里消费者也就结束啦!

使用

接下来我用cmd来启动两个服务,然后用接口发送消息和消费测试!
在这里插入图片描述




部分参考自:
https://www.workerman.net/doc/workerman/components/workerman-rabbitmq.html
https://blog.csdn.net/weixin_47723549/article/details/124493059

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/386628.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深度学习实战20(进阶版)-文件智能搜索系统,可以根据文件内容进行关键词搜索,快速找到文件

大家好&#xff0c;我是微学AI&#xff0c;今天给大家带来深度学习实战项目-文件智能搜索系统&#xff0c;文件智能搜索系统是一种能够帮助用户通过文件的内容快速搜索和定位文件的软件系统。 随着互联网和数字化技术的普及&#xff0c;数据和信息呈现爆炸式增长的趋势&#xf…

ubuntu 将jupyter-lab保存为桌面快捷方式和favourites

ubuntu: 将jupyter-lab保存为桌面快捷方式和favourites desktop shortcut 建立一个新的desktop文件 cd ~/Desktop touch Jupyter-lab.desktop将文件修改成如下&#xff1a; [Desktop Entry] Version1.0 NameJupyterlab CommentBack up your data with one click Exec/home/cjb/…

SpringCloud学习笔记(一)

单体应用架构 在诞⽣之初&#xff0c;拉勾的⽤户量、数据量规模都⽐较⼩&#xff0c;项目所有的功能模块都放在一个工程中编码、编译、打包并且部署在一个Tomcat容器中的架构模式就是单体应用架构。 优点&#xff1a; 高效开发&#xff1a;项⽬前期开发节奏快&#xff0c;团…

02-Oracle数据库的启动与关闭

本文章主要讲解Oracle数据库的启动与关闭方法&#xff0c;详细讲解启动Oracle的命令&#xff0c;三种启动数据库的方法及区别&#xff1b;关闭数据库的4种方法及他们的区别。 启动和关闭数据库 •数据库没启动前&#xff0c;只有拥有DBA权限或者以sysoper或sysdba身份才能连接到…

设计跳表(动态设置节点高度)

最近学习redis的zset时候&#xff0c;又看到跳表的思想&#xff0c;突然对跳表的设置有了新的思考 这是19年设计的跳表&#xff0c;在leetcode的执行时间是200ms 现在我对跳表有了新的想法 1、跳表的设计&#xff0c;类似二分查找&#xff0c;但是不是二分查找&#xff0c;比较…

基于Canal的数据同步

基于Canal的数据同步 一、 系统结构 该数据同步系统由Spring Boot和Canal共同组成。 Spring Boot 是一个流行的 Java Web 框架&#xff0c;而 Canal 则是阿里巴巴开源的 MySQL 数据库的数据变更监听框架。结合 Spring Boot 和 Canal&#xff0c;可以实现 MySQL 数据库的实时数…

ASGCN之图卷积网络(GCN)

文章目录前言1. 理论部分1.1 为什么会出现图卷积网络&#xff1f;1.2 图卷积网络的推导过程1.3 图卷积网络的公式2. 代码实现参考资料前言 本文从使用图卷积网络的目的出发&#xff0c;先对图卷积网络的来源与公式做简要介绍&#xff0c;之后通过一个例子来代码实现图卷积网络…

Linux命令行安装Oracle19c教程和踩坑经验

安装 下载 从 Oracle官方下载地址 需要的版本&#xff0c;本次安装是在Linux上使用yum安装&#xff0c;因此下载的是RPM。另外&#xff0c;需要说明的是&#xff0c;Oracle加了锁的下载需要登录用户才能安装&#xff0c;而用户是可以免费注册的&#xff0c;这里不做过多说明。 …

最新使用nvm控制node版本步骤

一、完全卸载已经安装的node、和环境变量 ①、打开控制面板的应用与功能&#xff0c;搜索node&#xff0c;点击卸载 ②、打开环境变量&#xff0c;将node相关的所有配置清除 ③、打开命令行工具&#xff0c;输入node-v&#xff0c;没有版本号则卸载成功 二、下载nvm安装包 ①…

SBUS的协议详解

SBUS 1.串口配置&#xff1a; 100k波特率&#xff0c; 8位数据位&#xff08;在stm32中要选择9位&#xff09;&#xff0c; 偶校验&#xff08;EVEN), 2位停止位&#xff0c; 无控流&#xff0c;25个字节&#xff0c; 2.协议格式&#xff1a; [startbyte] [data1][data2]……

单月涨粉超600w,直播销售额破5亿,2月的黑马都是谁?

2月的抖音&#xff0c;黑马多多&#xff0c;处处有爆点。有直播间热度不减&#xff0c;在带货领域持续位列前茅&#xff1b;有达人通过“抓马式”连麦直播&#xff0c;涨粉657w&#xff1b;有0.01元的低价商品&#xff0c;一天热卖超1000w个。那么&#xff0c;2月有哪些主播表现…

微服务实战03-注册数据服务

EurekaServer &#xff0c;它扮演的角色是注册中心&#xff0c;用于注册各种微服务&#xff0c;以便于其他微服务找到和访问。有了EurekaServer&#xff0c;还需要一些微服务&#xff0c;注册到EurekaServer上去。 这一节&#xff0c;我们来写一个注册微服务。为了简单起见&am…

【同步工具类:Phaser】

同步工具类:Phaser介绍特性动态调整线程个数层次Phaser源码分析state 变量解析构造函数对state变量赋值阻塞方法arrive()awaitAdvance()业务场景实现CountDownLatch功能代码测试结果实现 CyclicBarrier功能代码展示测试结果总结介绍 一个可重复使用的同步屏障&#xff0c;功能…

26- AlexNet和VGG模型分析 (TensorFlow系列) (深度学习)

知识要点 AlexNet 是2012年ISLVRC 2012竞赛的冠军网络。 VGG 在2014年由牛津大学著名研究组 VGG 提出。 一 AlexNet详解 1.1 Alexnet简介 AlexNet 是2012年ISLVRC 2012&#xff08;ImageNet Large Scale Visual Recognition Challenge&#xff09;竞赛的冠军网络&#xff0…

paddle推理部署(cpu)

我没按照官方文档去做&#xff0c;吐槽一下&#xff0c;官方文档有点混乱。。一、概述总结起来&#xff0c;就是用c示例代码&#xff0c;用一个模型做推理。二、示例代码下载https://www.paddlepaddle.org.cn/paddle/paddleinferencehttps://github.com/PaddlePaddle/Paddle-In…

Clion连接Docker,使用HElib库

文章目录需求Clion连接服务器内的DockerDockerCLionDocker内配置HElib库参考需求 HElib库是用C编写的同态加密开源库&#xff0c;一般在Linux下使用为了不混淆生产环境&#xff0c;使用Docker搭建HElib运行环境本地在Windows下开发&#xff0c;使用的IDE为Clion&#xff0c;本…

动态规划:leetcode 121. 买卖股票的最佳时机、122. 买卖股票的最佳时机II

leetcode 121. 买卖股票的最佳时机leetcode 122.买卖股票的最佳时机IIleetcode 121. 买卖股票的最佳时机给定一个数组 prices &#xff0c;它的第 i 个元素 prices[i] 表示一支给定股票第 i 天的价格。你只能选择 某一天 买入这只股票&#xff0c;并选择在 未来的某一个不同的日…

node版本管理工具nvm

1.标题卸载nvm和node.js 系统变量中删除nvm添加变量&#xff1a;NVM_HOME和NVM_SYMLINK环境变量中 path&#xff1a;删除nvm自动添加的变量 Path %NVM_HOME%;%NVM_SYMLINK%删除自身安装node环境&#xff0c;参考图一图二 图一 图二 2.安装nvm nvm-window下载------https:/…

ES window 系统环境下连接问题

环境问题&#xff1a;&#xff08;我采用的版本是 elasticsearch-7.9.3&#xff09;注意 开始修正之前的配置&#xff1a;前提&#xff1a;elasticsearch.yml增加或者修正一下配置&#xff1a;xpack.security.enabled: truexpack.license.self_generated.type: basicxpack.secu…

对象实例化【JVM】

JVM对象实例化简介/背景一、创建对象的方式1. new2. Class对象的newInstance方法3. Construstor对象的newInstance(xx)方法4. 使用clone方法二、创建对象的步骤1. 判断对象是否已经加载、链接、初始化2. 为对象分配内存3. 处理并发安全问题4. 初始化分配到的空间5. 设置对象的对…