PHP使用RabbitMQ(正常连接与开启SSL验证后的连接)

news2025/1/23 4:57:22

代码中包含了PHP在一般情况下使用方法和RabbitMQ开启了SSL验证后的使用方法(我这边消费队列是使用接口请求的方式,每次只从中取出一条)

安装amqp扩展

PHP使用RabbitMQ前,需要安装amqp扩展,之前文章中介绍了Windows环境PHP安装amqp扩展的方法:windows环境PHP使用RabbitMq安装amqp扩展_windows mq扩展安装-CSDN博客

Linux中安装amqp扩展:

### 先进入/usr/local目录下,下载两个文件到此目录(我的PHP版本是7.2):

wget -c https://github.com/alanxz/rabbitmq-c/releases/download/v0.8.0/rabbitmq-c-0.8.0.tar.gz

wget -c http://pecl.php.net/get/amqp-1.9.3.tgz

### 若使用的docker,将上面下载的两个包 拷贝到容器内【 docker cp ./文件 dockerID:/usr/local】,然后执行下面命令即可

### 解压rabbitmq-c-0.8.0.tar.gz

tar zxf rabbitmq-c-0.8.0.tar.gz

cd /usr/local/rabbitmq-c-0.8.0

./configure --prefix=/usr/local/rabbitmq-c-0.8.0

make && make install

### 然后解压 amqp-1.9.3.tgz 解压后amqp-1.9.3文件下内还有个amqp-1.9.3文件夹,将内部的amqp-1.9.3目录拷贝到/usr/local/下,执行下列命令:

cd /usr/local/amqp-1.9.3

/usr/local/bin/phpize

./configure --with-php-config=/usr/local/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c-0.8.0

cp /usr/local/rabbitmq-c-0.8.0/librabbitmq/amqp_ssl_socket.h /usr/local/amqp-1.9.3/

make && make install

### 最后修改php.ini    加上配置:

extension = amqp.so 

安装后,执行php -m 显示amqp  即表明扩展安装成功!

加载PHP代码的扩展包

然后需要加载代码的扩展包,比较方便快捷的方法是使用composer 加载扩展包

composer require php-amqplib/php-amqplib

若想指定版本:composer require php-amqplib/php-amqplib:版本

具体使用哪个版本可以在此链接内查询:https://packagist.org/packages/php-amqplib/php-amqplib

示例代码(包含开启了SSL的连接方式)

<?php

namespace common\helpers;

use models\setting\Log;
use PhpAmqpLib\Connection\AMQPSSLConnection;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class AmqpHelper
{
    /**
     * rabbitMq 未开启ssl验证 消费者
     * @return false|string|void
     * @throws \AMQPChannelException
     * @throws \AMQPConnectionException
     * @throws \AMQPQueueException
     * @time 2024/12/2 13:43
     * @author zsh
     */
    public static function consumerResult()
    {
        //队列配置信息
        $configParams = array(
            'host' => \Yii::$app->params['cotaTct']['queueHost'],
            'port' => \Yii::$app->params['cotaTct']['queuePort'],
            'login' => \Yii::$app->params['cotaTct']['queueLogin'],
            'password' => \Yii::$app->params['cotaTct']['queuePassword'],
            'vhost' => \Yii::$app->params['cotaTct']['queueVhost']
        );

        $conn = new \AMQPConnection($configParams);
        if (!$conn->connect()) {
            die("连接rabbitmq失败!\n");
        }

        //建立信道
        $channel = new \AMQPChannel($conn);

        // 创建队列
        $q = new \AMQPQueue($channel);

        $queueName = \Yii::$app->params['cotaTct']['queueName']; //队列名
        $q->setName($queueName);
        $q->setFlags(AMQP_DURABLE);             // 持久化


        // 绑定交换机与队列,并指定路由键
        $q->bind(\Yii::$app->params['cotaTct']['exchange'], \Yii::$app->params['cotaTct']['routingKey']);

        // 消息获取
        $ret = $q->get(AMQP_AUTOACK);

        if ($ret) {
//            echo "\nget data:\n";
//            var_dump($ret->getBody());
//            var_dump(json_decode($ret->getBody(), true));
            $conn->disconnect();
            return $ret->getBody();

        }else{
            $conn->disconnect();
            return false;
        }
    }

    /**
     * rabbitMq 开启ssl了验证 消费者
     * @return mixed|string|void
     * @throws \ErrorException
     * @time 2024/12/2 13:44
     * @author zsh
     */
    public static function sslConsumerResult()
    {
        $configParams = array(
            'host' => \Yii::$app->params['cotaTct']['prodQueueHost'],
            'port' => \Yii::$app->params['cotaTct']['prodQueuePort'],
            'login' => \Yii::$app->params['cotaTct']['prodQueueLogin'],
            'password' => \Yii::$app->params['cotaTct']['prodQueuePassword'],
            'vhost' => \Yii::$app->params['cotaTct']['queueVhost']
        );

        // 创建SSL连接时忽略证书验证
        $ssl_options = array(
            'verify_peer' => false,
            'verify_peer_name' => false,
        );

        $connection = new AMQPSSLConnection(
            $configParams['host'],
            $configParams['port'],
            $configParams['login'],
            $configParams['password'],
            $configParams['vhost'],
            $ssl_options);

        if (!$connection->isConnected()) {
            die("连接rabbitmq失败!\n");
        }
//        echo '链接成功...';

        $queueName = \Yii::$app->params['cotaTct']['queueName']; //队列名
        $exchange = \Yii::$app->params['cotaTct']['exchange'];
        $routingKey = \Yii::$app->params['cotaTct']['routingKey'];

        $channel = $connection->channel();

        // 声明交换器
        $channel->exchange_declare($exchange, 'topic', false, true, false);

        // 获取系统生成的消息队列名称,这里也可以指定一个队列名称
        $channel->queue_declare($queueName, false, true, false, false);

        // 将队列名与交换器名进行绑定,并指定routing_key(路由键值)
        $channel->queue_bind($queueName,$exchange,$routingKey);

        $message = '';
        // 定义收到消息回调函数
        $callback = function ($msg) use (&$message) {
//            echo 'Message:'.$msg->body;
            $message = $msg->body;
            // 手动确认消息是否正常消费
            $msg->delivery_info['channel']->basic_Ack($msg->delivery_info['delivery_tag']);
        };

        // 设置消费成功后才能继续进行下一个消费
        $channel->basic_qos(null, 1, null);

        // 开启消费no_ack=false,设置为手动应答
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);

        // 循环进行消费
//        while ($channel->is_consuming()) {
//            try {
//                $channel->wait(null, false, $timeout = 10);
//            }catch (AMQPTimeoutException $ex){
//                // 没有消息可处理,退出循环
//                echo $ex->getMessage();
//                break;
//            }
//        }

        if ($channel->is_consuming()) {
            try {
                $channel->wait(null, false, $timeout = 5);
            }catch (AMQPTimeoutException $ex){
                // 没有消息可处理,退出循环
                echo $ex->getMessage();
            }
        }

        //关闭连接
        $channel->close();
        $connection->close();

        $return = $message;
        unset($message);
        $message = null;

        return $return;
    }

    /**
     * rabbitMq 未开启ssl验证 生产者
     * @return mixed|string|void
     * @throws \ErrorException
     * @time 2024/12/2 13:44
     * @author zsh
     */
    public static function producer($message)
    {
        $configParams = array(
            'host' => \Yii::$app->params['cotaTct']['queueHost'],
            'port' => \Yii::$app->params['cotaTct']['queuePort'],
            'login' => \Yii::$app->params['cotaTct']['queueLogin'],
            'password' => \Yii::$app->params['cotaTct']['queuePassword'],
            'vhost' => \Yii::$app->params['cotaTct']['queueVhost']
        );
        $exchangeName = \Yii::$app->params['cotaTct']['producerExchange'];

        try {
            $conn = new AMQPStreamConnection($configParams['host'], $configParams['port'], $configParams['login'], $configParams['password']);

            //创建channel
            $channel = $conn->channel();
            $channel->exchange_declare($exchangeName,'fanout',false,true,false);

            $messageData = new AMQPMessage($message);
            $channel->basic_publish($messageData, $exchangeName);

            $channel->close();
            $conn->close();

            return true;
        }catch (\Exception $e){
            Log::error('AMQP队列错误:'.$e,'AMQP');
            return false;
        }
    }

    /**
     * rabbitMq 开启了ssl验证 生产者
     * @return mixed|string|void
     * @throws \ErrorException
     * @time 2024/12/2 13:44
     * @author zsh
     */
    public static function sslProducer($message)
    {
        $configParams = array(
            'host' => \Yii::$app->params['cotaTct']['prodQueueHost'],
            'port' => \Yii::$app->params['cotaTct']['prodQueuePort'],
            'login' => \Yii::$app->params['cotaTct']['prodQueueLogin'],
            'password' => \Yii::$app->params['cotaTct']['prodQueuePassword'],
            'vhost' => \Yii::$app->params['cotaTct']['queueVhost']
        );
        $exchangeName = \Yii::$app->params['cotaTct']['producerExchange'];

        // 创建SSL连接时忽略证书验证
        $ssl_options = array(
            'verify_peer' => false,
            'verify_peer_name' => false,
        );

        try {
            $conn = new AMQPSSLConnection(
                $configParams['host'],
                $configParams['port'],
                $configParams['login'],
                $configParams['password'],
                $configParams['vhost'],
                $ssl_options);

            if (!$conn->isConnected()) {
                die("连接rabbitmq失败!\n");
            }

            //创建channel
            $channel = $conn->channel();
            $channel->exchange_declare($exchangeName,'fanout',false,true,false);

            $messageData = new AMQPMessage($message);
            $channel->basic_publish($messageData, $exchangeName);

            $channel->close();
            $conn->close();

            return true;
        }catch (\Exception $e){
            Log::error('AMQP队列错误:'.$e,'AMQP');
            return false;
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2253710.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

uniapp h5 vue3 m3u8 和 mp4 外链视频播放

m3u8视频播放 使用mui-player 和hls.js。 安装npm install mui-player hls.js我的版本是"hls.js": "^1.5.17"和"mui-player": "^1.8.1"使用 页面标签&#xff1a; 引用&#xff1a; 点击目录播放视频&#xff1a; m3u8视频播放&a…

给el-table表头添加icon图标,以及鼠标移入icon时显示el-tooltip提示内容

在你的代码中&#xff0c;你已经正确地使用了 el-tooltip 组件来实现鼠标划过加号时显示提示信息。el-tooltip 组件的 content 属性设置了提示信息的内容&#xff0c;placement 属性设置了提示信息的位置。 你需要确保 el-tooltip 组件的 content 属性和 placement 属性设置正…

node.js实现分页,jwt鉴权机制,token,cookie和session的区别

文章目录 1. 分⻚功能2. jwt鉴权机制1.jwt是什么2.jwt的应用3.优缺点 3. cookie&#xff0c;token&#xff0c;session的对比 1. 分⻚功能 为什么要分页 如果数据量很⼤&#xff0c;⽐如⼏万条数据&#xff0c;放在⼀个⻚⾯显⽰的话显然不友好&#xff0c;这时候就需要采⽤分⻚…

大数据新视界 -- Hive 元数据管理:核心元数据的深度解析(上)(27 / 30)

&#x1f496;&#x1f496;&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎你们来到 青云交的博客&#xff01;能与你们在此邂逅&#xff0c;我满心欢喜&#xff0c;深感无比荣幸。在这个瞬息万变的时代&#xff0c;我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的…

SpringBoot如何使用EasyExcel实现表格导出(简洁快速入门版本)

前言 前面给大家介绍了动态表头的导入&#xff0c;这篇文章给大家介绍如何实现导出 前面给大家介绍了动态表头的导入&#xff0c;我们了解了如何通过EasyExcel灵活地读取结构不固定的Excel文件。这次&#xff0c;我们将目光转向数据导出——即如何将数据以Excel文件的形式输出…

快速上手 RabbitMQ:使用 Docker 轻松搭建消息队列系统

在现代的分布式系统中&#xff0c;消息队列&#xff08;Message Queue&#xff09;是实现异步通信、解耦系统组件、提高系统可扩展性和可靠性的重要工具。RabbitMQ 是一个广泛使用的开源消息代理软件&#xff0c;它实现了高级消息队列协议&#xff08;AMQP&#xff09;&#xf…

ELK的Filebeat

目录 传送门前言一、概念1. 主要功能2. 架构3. 使用场景4. 模块5. 监控与管理 二、下载地址三、Linux下7.6.2版本安装filebeat.yml配置文件参考&#xff08;不要直接拷贝用&#xff09;多行匹配配置过滤配置最终配置&#xff08;一、多行匹配、直接读取日志文件、EFK方案&#…

UE5 像素流进行内网https证书创建

确定证书需求 内网 HTTPS 通信通常需要以下内容&#xff1a; 自签名证书&#xff08;适用于内网环境&#xff0c;不需要通过公开的证书颁发机构 CA&#xff09; 或者通过内部的企业 CA 签发的证书&#xff08;更安全&#xff09;。 生成自签名证书 使用工具&#xff08;如 Ope…

44页PDF | 信息化战略规划标准框架方法论与实施方法(限免下载)

一、前言 这份报告详细介绍了企业信息化战略规划的标准框架、方法论以及实施方法&#xff0c;强调了信息化规划应以业务战略和IT战略为驱动力&#xff0c;通过构筑企业架构&#xff08;EA&#xff09;来连接长期战略和信息化建设。报告提出了信息化规划原则&#xff0c;探讨了…

RNACOS:用Rust实现的Nacos服务

RNACOS是一个使用Rust语言开发的Nacos服务实现&#xff0c;它继承了Nacos的所有核心功能&#xff0c;并在此基础上进行了优化和改进。作为一个轻量级、快速、稳定且高性能的服务&#xff0c;RNACOS不仅包含了注册中心、配置中心和Web管理控制台的功能&#xff0c;还支持单机和集…

任务管理法宝:甘特图详解

在项目管理中&#xff0c;如何清晰、直观地展示项目的进度和任务分配&#xff1f; 甘特图作为一种经典的项目管理工具&#xff0c;提供了有效的解决方案。无论是团队合作还是个人项目管理&#xff0c;甘特图都能帮助你轻松追踪各项任务的进展。今天&#xff0c;我们将详细介绍…

RabbitMQ 客户端 连接、发送、接收处理消息

RabbitMQ 客户端 连接、发送、接收处理消息 一. RabbitMQ 的机制跟 Tcp、Udp、Http 这种还不太一样 RabbitMQ 服务&#xff0c;不是像其他服务器一样&#xff0c;负责逻辑处理&#xff0c;然后转发给客户端 而是所有客户端想要向 RabbitMQ服务发送消息&#xff0c; 第一步&a…

PyQt 中的无限循环后台任务

在 PyQt 中实现一个后台无限循环任务&#xff0c;需要确保不会阻塞主线程&#xff0c;否则会导致 GUI 无响应。常用的方法是利用 线程&#xff08;QThread&#xff09; 或 任务&#xff08;QRunnable 和 QThreadPool&#xff09; 来运行后台任务。以下是一些实现方式和关键点&a…

云计算vsphere 服务器上添加主机配置

这里是esxi 主机 先把主机打开 然后 先开启dns 再开启 vcenter 把每台设备桌面再vmware workstation 上显示 同上也是一样 &#xff0c;因为在esxi 主机的界面可能有些东西不好操作 我们选择主机和集群 左边显示172.16.100.200

Python酷库之旅-第三方库Pandas(255)

目录 一、用法精讲 1206、pandas.tseries.offsets.SemiMonthEnd.is_on_offset方法 1206-1、语法 1206-2、参数 1206-3、功能 1206-4、返回值 1206-5、说明 1206-6、用法 1206-6-1、数据准备 1206-6-2、代码示例 1206-6-3、结果输出 1207、pandas.tseries.offsets.S…

Envoy-istio

最近研究envoy-istio&#xff0c;发现这个博客&#xff0c;觉得很不错&#xff0c;这里记录一下 envoy-istio介绍 envoy-istio - 随笔分类 - yaowx - 博客园 envoy部分七&#xff1a;envoy的http流量管理基础 envoy部分六&#xff1a;envoy的集群管理 envoy部分五&#xff…

甘特图的绘制步骤:教你如何绘制甘特图

甘特图是项目管理中一种极为重要的可视化工具&#xff0c;它以直观的方式展示项目进度&#xff0c;包括任务的开始时间、结束时间、持续时长以及任务之间的先后顺序。在当今的项目管理领域&#xff0c;Excel 和专业的项目管理软件是制作甘特图的两大常用途径&#xff0c;它们各…

C++模拟堆

模板题目 图片来源Acwing 堆的基础知识 代码实现 #include<iostream> #include<algorithm>using namespace std;const int N 1e5 10; int a[N]; int n, m;void down(int u) {int t u;if (2 * u < n && a[2 * u] < a[u]){t 2 * u;}if (2 * u …

牛客linux

1、 统计文件的行数 # 方法 1 wc -l ./nowcoder.txt | awk {print $1} # 方法 2 &#xff0c;awk 可以打印所有行的行号, 或者只打印最后一行 awk {print NR} ./nowcoder.txt |tail -n 1 awk END{print NR} ./nowcoder.txt # 方法 3 grep -c 、-n等等 grep -c "" ./…

【unity小技巧】在 Unity 中,Application获取各种文件路径或访问不同类型的存储路径

文章目录 前言1. **Application.persistentDataPath**2. **Application.dataPath**3. **Application.streamingAssetsPath**4. **Application.temporaryCachePath**5. **Application.consoleLogPath**6. **Application.userDataPath**7. **Application.streamingAssetsPath 与 …