代码中包含了PHP在一般情况下使用方法和RabbitMQ开启了SSL验证后的使用方法(我这边消费队列是使用接口请求的方式,每次只从中取出一条)
安装amqp扩展
PHP使用RabbitMQ前,需要安装amqp扩展,之前文章中介绍了Windows环境PHP安装amqp扩展的方法:windows环境PHP使用RabbitMq安装amqp扩展_windows mq扩展安装-CSDN博客
Linux中安装amqp扩展:
### 先进入/usr/local目录下,下载两个文件到此目录(我的PHP版本是7.2):
wget -c https://github.com/alanxz/rabbitmq-c/releases/download/v0.8.0/rabbitmq-c-0.8.0.tar.gz
wget -c http://pecl.php.net/get/amqp-1.9.3.tgz
### 若使用的docker,将上面下载的两个包 拷贝到容器内【 docker cp ./文件 dockerID:/usr/local】,然后执行下面命令即可
### 解压rabbitmq-c-0.8.0.tar.gz
tar zxf rabbitmq-c-0.8.0.tar.gz
cd /usr/local/rabbitmq-c-0.8.0
./configure --prefix=/usr/local/rabbitmq-c-0.8.0
make && make install
### 然后解压 amqp-1.9.3.tgz 解压后amqp-1.9.3文件下内还有个amqp-1.9.3文件夹,将内部的amqp-1.9.3目录拷贝到/usr/local/下,执行下列命令:
cd /usr/local/amqp-1.9.3
/usr/local/bin/phpize
./configure --with-php-config=/usr/local/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c-0.8.0
cp /usr/local/rabbitmq-c-0.8.0/librabbitmq/amqp_ssl_socket.h /usr/local/amqp-1.9.3/
make && make install
### 最后修改php.ini 加上配置:
extension = amqp.so
安装后,执行php -m 显示amqp 即表明扩展安装成功!
加载PHP代码的扩展包
然后需要加载代码的扩展包,比较方便快捷的方法是使用composer 加载扩展包
composer require php-amqplib/php-amqplib
若想指定版本:composer require php-amqplib/php-amqplib:版本
具体使用哪个版本可以在此链接内查询:https://packagist.org/packages/php-amqplib/php-amqplib
示例代码(包含开启了SSL的连接方式)
<?php
namespace common\helpers;
use models\setting\Log;
use PhpAmqpLib\Connection\AMQPSSLConnection;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class AmqpHelper
{
/**
* rabbitMq 未开启ssl验证 消费者
* @return false|string|void
* @throws \AMQPChannelException
* @throws \AMQPConnectionException
* @throws \AMQPQueueException
* @time 2024/12/2 13:43
* @author zsh
*/
public static function consumerResult()
{
//队列配置信息
$configParams = array(
'host' => \Yii::$app->params['cotaTct']['queueHost'],
'port' => \Yii::$app->params['cotaTct']['queuePort'],
'login' => \Yii::$app->params['cotaTct']['queueLogin'],
'password' => \Yii::$app->params['cotaTct']['queuePassword'],
'vhost' => \Yii::$app->params['cotaTct']['queueVhost']
);
$conn = new \AMQPConnection($configParams);
if (!$conn->connect()) {
die("连接rabbitmq失败!\n");
}
//建立信道
$channel = new \AMQPChannel($conn);
// 创建队列
$q = new \AMQPQueue($channel);
$queueName = \Yii::$app->params['cotaTct']['queueName']; //队列名
$q->setName($queueName);
$q->setFlags(AMQP_DURABLE); // 持久化
// 绑定交换机与队列,并指定路由键
$q->bind(\Yii::$app->params['cotaTct']['exchange'], \Yii::$app->params['cotaTct']['routingKey']);
// 消息获取
$ret = $q->get(AMQP_AUTOACK);
if ($ret) {
// echo "\nget data:\n";
// var_dump($ret->getBody());
// var_dump(json_decode($ret->getBody(), true));
$conn->disconnect();
return $ret->getBody();
}else{
$conn->disconnect();
return false;
}
}
/**
* rabbitMq 开启ssl了验证 消费者
* @return mixed|string|void
* @throws \ErrorException
* @time 2024/12/2 13:44
* @author zsh
*/
public static function sslConsumerResult()
{
$configParams = array(
'host' => \Yii::$app->params['cotaTct']['prodQueueHost'],
'port' => \Yii::$app->params['cotaTct']['prodQueuePort'],
'login' => \Yii::$app->params['cotaTct']['prodQueueLogin'],
'password' => \Yii::$app->params['cotaTct']['prodQueuePassword'],
'vhost' => \Yii::$app->params['cotaTct']['queueVhost']
);
// 创建SSL连接时忽略证书验证
$ssl_options = array(
'verify_peer' => false,
'verify_peer_name' => false,
);
$connection = new AMQPSSLConnection(
$configParams['host'],
$configParams['port'],
$configParams['login'],
$configParams['password'],
$configParams['vhost'],
$ssl_options);
if (!$connection->isConnected()) {
die("连接rabbitmq失败!\n");
}
// echo '链接成功...';
$queueName = \Yii::$app->params['cotaTct']['queueName']; //队列名
$exchange = \Yii::$app->params['cotaTct']['exchange'];
$routingKey = \Yii::$app->params['cotaTct']['routingKey'];
$channel = $connection->channel();
// 声明交换器
$channel->exchange_declare($exchange, 'topic', false, true, false);
// 获取系统生成的消息队列名称,这里也可以指定一个队列名称
$channel->queue_declare($queueName, false, true, false, false);
// 将队列名与交换器名进行绑定,并指定routing_key(路由键值)
$channel->queue_bind($queueName,$exchange,$routingKey);
$message = '';
// 定义收到消息回调函数
$callback = function ($msg) use (&$message) {
// echo 'Message:'.$msg->body;
$message = $msg->body;
// 手动确认消息是否正常消费
$msg->delivery_info['channel']->basic_Ack($msg->delivery_info['delivery_tag']);
};
// 设置消费成功后才能继续进行下一个消费
$channel->basic_qos(null, 1, null);
// 开启消费no_ack=false,设置为手动应答
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
// 循环进行消费
// while ($channel->is_consuming()) {
// try {
// $channel->wait(null, false, $timeout = 10);
// }catch (AMQPTimeoutException $ex){
// // 没有消息可处理,退出循环
// echo $ex->getMessage();
// break;
// }
// }
if ($channel->is_consuming()) {
try {
$channel->wait(null, false, $timeout = 5);
}catch (AMQPTimeoutException $ex){
// 没有消息可处理,退出循环
echo $ex->getMessage();
}
}
//关闭连接
$channel->close();
$connection->close();
$return = $message;
unset($message);
$message = null;
return $return;
}
/**
* rabbitMq 未开启ssl验证 生产者
* @return mixed|string|void
* @throws \ErrorException
* @time 2024/12/2 13:44
* @author zsh
*/
public static function producer($message)
{
$configParams = array(
'host' => \Yii::$app->params['cotaTct']['queueHost'],
'port' => \Yii::$app->params['cotaTct']['queuePort'],
'login' => \Yii::$app->params['cotaTct']['queueLogin'],
'password' => \Yii::$app->params['cotaTct']['queuePassword'],
'vhost' => \Yii::$app->params['cotaTct']['queueVhost']
);
$exchangeName = \Yii::$app->params['cotaTct']['producerExchange'];
try {
$conn = new AMQPStreamConnection($configParams['host'], $configParams['port'], $configParams['login'], $configParams['password']);
//创建channel
$channel = $conn->channel();
$channel->exchange_declare($exchangeName,'fanout',false,true,false);
$messageData = new AMQPMessage($message);
$channel->basic_publish($messageData, $exchangeName);
$channel->close();
$conn->close();
return true;
}catch (\Exception $e){
Log::error('AMQP队列错误:'.$e,'AMQP');
return false;
}
}
/**
* rabbitMq 开启了ssl验证 生产者
* @return mixed|string|void
* @throws \ErrorException
* @time 2024/12/2 13:44
* @author zsh
*/
public static function sslProducer($message)
{
$configParams = array(
'host' => \Yii::$app->params['cotaTct']['prodQueueHost'],
'port' => \Yii::$app->params['cotaTct']['prodQueuePort'],
'login' => \Yii::$app->params['cotaTct']['prodQueueLogin'],
'password' => \Yii::$app->params['cotaTct']['prodQueuePassword'],
'vhost' => \Yii::$app->params['cotaTct']['queueVhost']
);
$exchangeName = \Yii::$app->params['cotaTct']['producerExchange'];
// 创建SSL连接时忽略证书验证
$ssl_options = array(
'verify_peer' => false,
'verify_peer_name' => false,
);
try {
$conn = new AMQPSSLConnection(
$configParams['host'],
$configParams['port'],
$configParams['login'],
$configParams['password'],
$configParams['vhost'],
$ssl_options);
if (!$conn->isConnected()) {
die("连接rabbitmq失败!\n");
}
//创建channel
$channel = $conn->channel();
$channel->exchange_declare($exchangeName,'fanout',false,true,false);
$messageData = new AMQPMessage($message);
$channel->basic_publish($messageData, $exchangeName);
$channel->close();
$conn->close();
return true;
}catch (\Exception $e){
Log::error('AMQP队列错误:'.$e,'AMQP');
return false;
}
}
}