消息队列rabbitmq的使用

news2025/1/18 3:18:29

前提条件:环境安装amqp和安装rabbitmq

sudo apt-get update
sudo apt-get install rabbitmq-amqp-dev

1、创建CMakeLists.txt文件

# Copyright (c) Huawei Technologies Co., Ltd. 2019. All rights reserved.

# CMake lowest version requirement
cmake_minimum_required(VERSION 3.5.1)

# project information
project(ACL_RESNET50)

# Compile options
add_compile_options(-std=c++11)

# set(CMAKE_RUNTIME_OUTPUT_DIRECTORY  "../out")
set(CMAKE_CXX_FLAGS_DEBUG "-fPIC -O0 -g -Wall")
set(CMAKE_CXX_FLAGS_RELEASE "-fPIC -O2 -Wall")

set(LIB_PATH $ENV{NPU_HOST_LIB})

# Header path
include_directories(
    ${INC_PATH}/acllib/include/
)

if(target STREQUAL "Simulator_Function")
    add_compile_options(-DFUNC_SIM)
endif()

# add host lib path
link_directories(
    ${LIB_PATH}
)
# 添加你的可执行文件
add_executable(my_amqp_app_server my_amqp_app_server.cpp)
add_executable(my_amqp_app_client rabbitmq_client.cpp)

# 链接 amqpcpp 库
target_link_libraries(my_amqp_app_server rabbitmq)
target_link_libraries(my_amqp_app_client rabbitmq)

2、创建rabbitmq生产者文件rabbitmq_client.cpp

#include <string.h>
#include <iostream>

#include <amqp.h>
#include <amqp_tcp_socket.h>

int main(int argc, char const *const *argv)
{
  //创建一个新的AMQP连接,他将被用于RabbitMQ服务器进行通信
  amqp_connection_state_t conn = amqp_new_connection();
   //这行代码创建了一个新的TCP套接字,并将其与之前的AMQP关联起来
  amqp_socket_t *socket = amqp_tcp_socket_new(conn);
  //这行代码打开了一个到"localhost"的套接字连接,端口号是AMQP协议的默认端口(通常是5672)
  amqp_socket_open(socket, "192.168.140.2", AMQP_PROTOCOL_PORT);
  //这行代码是使用用户名和密码(在这里是"guest",第一个是用户名,第二个密码)在默认的虚拟主机""
  amqp_login(conn, "/", 0, AMQP_DEFAULT_FRAME_SIZE, 0, AMQP_SASL_METHOD_PLAIN, "center", "123qwe");
 //打开一个通道,编号为1,所有的消息都是通过通道传输的。
  const amqp_channel_t KChannel = 1;
  amqp_channel_open(conn, KChannel);

  //这两行代码都声明了与1个"hello"的队列。在rabbitmq中,所有消息都是存储在队列中的
  amqp_bytes_t queueName(amqp_cstring_bytes("hello"));
  amqp_queue_declare(conn, KChannel, queueName, false, false, false, false, amqp_empty_table);
  amqp_basic_properties_t props;
  props._flags =  AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_REPLY_TO_FLAG | AMQP_BASIC_CORRELATION_ID_FLAG;
  props.content_type = amqp_cstring_bytes("text/plain");
  props.reply_to = amqp_cstring_bytes("response_queue");
  props.correlation_id = amqp_cstring_bytes("12345");
  //这行代码发布了一条消息到"hello"队列。消息的内容是字符串"Hello World!"
  amqp_basic_publish(conn, KChannel, amqp_empty_bytes, /* routing key*/ queueName, false, false, &props, amqp_cstring_bytes("Hello World!"));
  std::cout << " [x] Sent 'Hello World!'" << std::endl;
  //首先关闭通道,然后关闭连接,最后销毁连接。这是一种良好的资源管理实践,可以确保不会有资源泄露
  amqp_channel_close(conn, KChannel, AMQP_REPLY_SUCCESS);
  amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
  amqp_destroy_connection(conn);
  return 0;
}

3、创建rabbitmq消费者my_amqp_app_server.cpp

#include <iostream>
#include <string.h>

#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <iostream>
#include <fstream>
#include <cstring>
#include <map>
#include <math.h>

using namespace std;


 int main(int argc, char const *const *argv)
 {
   //创建一个新的RabbitMQ连接,将其状态存储在 conn 中。
   amqp_connection_state_t conn = amqp_new_connection();
   //为新建的RabbitMQ连接创建一个TCP socket。
   amqp_socket_t *socket = amqp_tcp_socket_new(conn);
   //打开刚刚创建的TCP socket,连接到本地的RabbitMQ服务器。
   amqp_socket_open(socket, "192.168.140.2", AMQP_PROTOCOL_PORT);
 //使用"guest"作为用户名和密码在服务器上登陆。
   amqp_login(conn, "/", 0, AMQP_DEFAULT_FRAME_SIZE, 0, AMQP_SASL_METHOD_PLAIN, "center", "123qwe");
   const amqp_channel_t KChannel = 1;//设置通道号为1。
   amqp_channel_open(conn, KChannel);//打开编号为1的通道。

   amqp_bytes_t queueName(amqp_cstring_bytes("hello"));//设置要声明的队列的名字为"hello"。
   amqp_queue_declare(conn, KChannel, queueName, false, false, false, false, amqp_empty_table);//在通道1上声明名为"hello"的队列。
   //开始在"hello"队列上消费消息,这里设定为自动发送acknowledgement。
   amqp_basic_consume(conn, KChannel, queueName, amqp_empty_bytes, false, /* auto ack*/true, false, amqp_empty_table);

   for (;;)
   {//无限循环,持续接收和处理消息
     amqp_maybe_release_buffers(conn);
     amqp_envelope_t envelope;//创建一个新的信封来接收消息。
     amqp_consume_message(conn, &envelope, nullptr, 0);//从连接中接收一条消息,并将其放入信封中。
     //打印出接收到的消息内容。
     std::cout << " [x] Received " << std::string((char *)envelope.message.body.bytes,(int)envelope.message.body.len) << std::endl;
     //获取消费者传送过来的属性信息
     std::string reply_to(reinterpret_cast<char*>(envelope.message.properties.reply_to.bytes), envelope.message.properties.reply_to.len);
     std::string correlation_id(reinterpret_cast<char*>(envelope.message.properties.correlation_id.bytes), envelope.message.properties.correlation_id.len);
     std::cout << "Received message: " << std::string(reinterpret_cast<char*>(envelope.message.body.bytes), envelope.message.body.len) 
                << " with reply_to: " << reply_to  << " correlation_id: " << correlation_id << std::endl;
     //响应,把响应信息发送到reply_to队列中。消费者通过correlation_id的唯一关联的id区分该响应的信息。
     amqp_basic_properties_t props;
     props._flags = AMQP_BASIC_CORRELATION_ID_FLAG;
     props.correlation_id = amqp_cstring_bytes(correlation_id.c_str()); 
     amqp_basic_publish(conn, KChannel, amqp_empty_bytes, amqp_cstring_bytes(reply_to.c_str()), false, false, &props, amqp_cstring_bytes(std::string("success").c_str()));  
     amqp_destroy_envelope(&envelope);//销毁信封,释放内存。
   }

   amqp_channel_close(conn, KChannel, AMQP_REPLY_SUCCESS);//关闭编号为1的通道。
   amqp_connection_close(conn, AMQP_REPLY_SUCCESS); //关闭RabbitMQ连接
   amqp_destroy_connection(conn); //销毁连接,释放内存。

   return 0;
 }

4、创建build文件夹,并且编译,生产可执行文件。

mkdir build 
cd build 
cmake .. 
make 

5、创建fastapi访问resfull接口,入口文件为main_fast_client.py

from fastapi.responses import JSONResponse
from fastapi import FastAPI, Request, UploadFile, File
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel

from RetrievalRpcClient import RetrievalRpcClient
app = FastAPI()

retrievalRpcClient = RetrievalRpcClient(input_queue=None, output_queue=None)

class TestRetrievalData(BaseModel):
    input_text: str = None

async def send_message(testRetrievaldata):
    response = retrievalRpcClient.produce(input_text=testRetrievaldata.input_text)
    return response

@app.post("/get_message")
async def get_message(testRetrievaldata: TestRetrievalData):
    response  = await send_message(testRetrievaldata)
    return response

if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app="main_fast_client:app", port=int(5600), host="0.0.0.0",workers=10)

6、链接的rabbitmq的类RetrievalRpcClient.py

import functools
import json
import queue
import threading
import time
import traceback
from datetime import datetime

import  pika
import uuid

class RetrievalRpcClient(object):
    def __init__(self,input_queue,output_queue):
        # 创建链接
        self.response_queue = input_queue
        self.output_queue = "hello"
        self.url = "amqp://center:123qwe@192.168.140.2:5672/"
        self.exchange = "retrival.mind"
        self.parameters = pika.URLParameters(self.url)
        self.connection_params = pika.ConnectionParameters(
            host='192.168.140.2',
            credentials=pika.PlainCredentials('center', '123qwe')
            # blocked_connection_timeout=600  # 设置超时时间为600秒(10分钟)
        )
        #或者使用pika.URLParameters链接器
        self.connection = pika.BlockingConnection(self.parameters)
        # self.connection = pika.BlockingConnection(self.parameters)
        self.connection = pika.BlockingConnection(self.connection_params)

    def on_response(self, ch, method, props, body):  # 回调
        # 每条指令执行的速度可能不一样,指令1比指令2先发送,但可能指令2的执行结果比指令1先返回到客户端,
        # 此时如果没有下面的判断,客户端就会把指令2的结果误认为指令1执行的结果,所以通过correlation_id进一步区分
        print(props)
        if self.corr_id == props.correlation_id:
            self.response = body
            # logger.info("self.corr_id = %s" % self.corr_id)
            # logger.info("收到response的数据 %s" %json.loads(self.response))
            print("收到response的数据 %s" %self.response)

        return self.response

    def produce(self, **kwargs):
        input_text = kwargs.get("input_text")
        #规范消息队列中数据传达格式
        data_dic = {}
        data = {"input_text": input_text}
       
        data_dic["Data"]=data
        data_dic["StartTime"]= datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        # logger.info("data=%s" % data)
        print("data_dic=%s" % data_dic)
        self.response = None  ##指令执行后返回的消息
        self.corr_id = str(uuid.uuid4())  ##可用来标识指令(顺序)
        print("self.corr_id=",self.corr_id)
        channel = self.connection.channel()
        # 指定队列,没有就创建,
        channel.queue_declare(queue=self.output_queue)
        # 声明交换机
        channel.exchange_declare(exchange=self.exchange, exchange_type='topic')
        channel.queue_bind(exchange=self.exchange, queue=self.output_queue, routing_key=self.output_queue)
        #也可以写成一个固定的队列名称作为接收返回的信息,每个请求通过correlation_id区分响应
        callback_queue = channel.queue_declare('', exclusive=False, auto_delete= True).method.queue #随机生成一个监听队列

        channel.confirm_delivery()
        #如果有上传模型文件,则需要经过推理和faiss两个服务,如果仅仅是文本则不需要。
        try:
            channel.basic_publish(exchange=self.exchange,
                                       routing_key=self.output_queue,
                                       properties=pika.BasicProperties(
                                           reply_to=callback_queue,  # 将指令执行结果返回到reply_to队列,该队列名称随机生成
                                           correlation_id=self.corr_id,  # 通过该唯一id,可以区分哪个response对应该请求。
                                       ),
                                       mandatory= True,
                                       body=json.dumps(data_dic))
            # logger.info("前端请求信息已经发送到队列 [%s]中" %self.output_queue)
        except Exception as e:
            #打印完整的异常信息
            traceback.print_exc()
            print("进程处理异常", e)
        # logger.info("监听返回的队列[%s]" %callback_queue)
        #启动一个线程监听
        print("监听返回的队列[%s]" %callback_queue)
        q = queue.Queue() #通过队列获取线程里面的放回的结果
        t = threading.Thread(target=self.do_consum,args=(channel,callback_queue,q))
        t.start()
        result = q.get()
        return result

    def do_consum(self, channel, callback_queue, q):
        channel.basic_consume(queue=callback_queue,  # 监听一接收到服务器端返回的数据就调用回调函数on_response
                              auto_ack=True,
                              on_message_callback=self.on_response)
        while self.response is None:  # 如果服务端没有返回,这一直循环。
            self.connection.process_data_events()  # 去queue接收数据(不阻塞)
        q.put(self.response)

7、通过restful接口访问

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1958524.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

tof系统标定流程之lens标定

1、lens标定详解 为什么在标定tof时需要进行lens的标定,可以说lens标定是一个必不可少的步骤,tof模组也是有镜头的,镜头的畸变会导致进入的光线出现偏差,最终照射到tof芯片表面导致深度图的分布出现畸变,通常是枕形畸变。例外一个用途在于,在计算fppn误差环节需要知道镜头…

机器学习算法与Python实战 | 两行代码即可应用 40 个机器学习模型--lazypredict 库!

本文来源公众号“机器学习算法与Python实战”&#xff0c;仅用于学术分享&#xff0c;侵权删&#xff0c;干货满满。 原文链接&#xff1a;两行代码即可应用 40 个机器学习模型 今天和大家一起学习使用 lazypredict 库&#xff0c;我们可以用一行代码在我们的数据集上实现许多…

【数据结构】队列(链表实现 + 力扣 + 详解 + 数组实现循环队列 )

Hi~&#xff01;这里是奋斗的明志&#xff0c;很荣幸您能阅读我的文章&#xff0c;诚请评论指点&#xff0c;欢迎欢迎 ~~ &#x1f331;&#x1f331;个人主页&#xff1a;奋斗的明志 &#x1f331;&#x1f331;所属专栏&#xff1a;数据结构 &#x1f4da;本系列文章为个人学…

流行巨星布兰妮·斯皮尔斯发生了什么事?她现在在哪里过得怎么样

流行音乐公主布兰妮斯皮尔斯是 21 世纪初的经典偶像。她从 15 岁起就开始唱歌和表演&#xff0c;并创作了《Oops I Did it Again》和《Baby One More Time》等热门歌曲。她的歌曲非常出色&#xff0c;在 2000 年荣登榜首。她接下来的几张专辑变得更加畅销&#xff0c;她毫不畏惧…

学习008-02-04-04 Enable Split Layout in a List View(在列表视图中启用拆分布局 )

Enable Split Layout in a List View&#xff08;在列表视图中启用拆分布局 &#xff09; This lesson explains how to enable a Split Layout in a List View. 本课介绍如何在列表视图中启用拆分布局。 The Detail View opens when you select an object from the List Vie…

G120 EPos配置方案及应用场景

EPos功能就是基本定位器功能,它可计算出轴的运行特性,使轴以时间最佳的方式移动到目标位置。EPos功能主要包括:设定值 直接给定(MDI)功能、 选择程序段功能、回参考点功能、点动功能、运行到固定挡块功能。 EPos功能通过处理给定的加速度、速度和位置值生成运行特性曲线,…

node+mysql+layui+ejs实现左侧导航栏菜单动态显示

nodemysqllayuiejs实现左侧导航菜单动态显示 实现思路效果图数据库技术栈代码实现main.html&#xff08;前端首页页面&#xff09;查询资源菜单方法 jsapp.js配置ejs模板 node入门到入土项目实战开始&#xff0c;前端篇项目适合node小白入门&#xff0c;因为我也是小白来学习no…

机器人笛卡尔空间阻抗控制

机器人笛卡尔空间阻抗控制是一种重要的机器人控制策略,它关注于机器人末端执行器在笛卡尔空间(即任务空间)内的动态特性,以实现与环境的柔顺交互。以下是对机器人笛卡尔空间阻抗控制的详细解释: 一、基本概念 笛卡尔空间:指机器人末端执行器(如手爪、工具等)所处的三维…

Hive之扩展函数(UDF)

Hive之扩展函数(UDF) 1、概念讲解 当所提供的函数无法解决遇到的问题时&#xff0c;我们通常会进行自定义函数&#xff0c;即&#xff1a;扩展函数。Hive的扩展函数可分为三种&#xff1a;UDF,UDTF,UDAF。 UDF&#xff1a;一进一出 UDTF&#xff1a;一进多出 UDAF&#xff1a…

YOLO v8目标检测(三)模型训练与正负样本匹配

YOLO v8目标检测 损失函数理论 在YOLO v5模型中&#xff0c;cls, reg, obj代表的是三个不同的预测组成部分&#xff0c;对应的损失函数如下&#xff1a; cls: 这代表类别预测&#xff08;classification&#xff09;。对应的损失是类别预测损失&#xff08;loss_cls&#xff…

Win10出现错误代码0x80004005 一键修复指南

对于 Windows 10 用户来说&#xff0c;错误代码 0x80004005 就是这样一种迷雾&#xff0c;它可能在不经意间出现&#xff0c;阻碍我们顺畅地使用电脑。这个错误通常与组件或元素的缺失有关&#xff0c;它可能源自注册表的错误、系统文件的损坏&#xff0c;或者是软件的不兼容。…

listener监听

背景: 过滤器代码也可实现接口请求次数统计,但会影响过滤器本意;故在dispatcher servlet层进行监听统计 价值: 所有接口的次数统计可适用于系统全天访问量; 单个请求接口的次数统计可在企业中根据接口次数的高低,可分析出接口对应的功能受用户的喜好程度 请求通过过滤器到了s…

common-intellisense:助力TinyVue 组件书写体验更丝滑

本文由体验技术团队Kagol原创~ 前两天&#xff0c;common-intellisense 开源项目的作者 Simon-He95 在 VueConf 2024 群里发了一个重磅消息&#xff1a; common-intellisense 支持 TinyVue 组件库啦&#xff01; common-intellisense 插件能够提供超级强大的智能提示功能&…

c生万物系列(职责链模式与if_else)

从处理器的角度来说&#xff0c;条件分支会导致指令流水线的中断&#xff0c;所以控制语句需要严格保存状态&#xff0c;因为处理器是很难直接进行逻辑判断的&#xff0c;有可能它会执行一段时间&#xff0c;发现出错后再返回&#xff0c;也有可能通过延时等手段完成控制流的正…

skynet 实操篇

文章目录 概述demo启动文件skynet_start配置文件main.luastart函数thread_workerskynet_context_message_dispatchskynet_mq_popdispatch_message 小结 概述 上一篇写完skynet入门篇&#xff0c;这一篇写点实操性质的。 demo 对于一个开源框架&#xff0c;大部分都有他们自己…

《Linux运维总结:基于x86_64架构CPU使用docker-compose一键离线部署zookeeper 3.8.4容器版分布式集群》

总结&#xff1a;整理不易&#xff0c;如果对你有帮助&#xff0c;可否点赞关注一下&#xff1f; 更多详细内容请参考&#xff1a;《Linux运维篇&#xff1a;Linux系统运维指南》 一、部署背景 由于业务系统的特殊性&#xff0c;我们需要面对不同的客户部署业务系统&#xff0…

C++客户端Qt开发——界面优化(美化登录界面)

美化登录界面 在.ui中拖入一个QFream&#xff0c;顶层窗口的QWidget无法设置背景图片&#xff0c;套上一层QFrame将背景图片设置到QFrame上即可 用布局管理器管理元素&#xff1a;用户名LineEdit&#xff0c;密码LineEdit&#xff0c;记住密码ComboBox&#xff0c;登录Button…

ubuntu2204安装elasticsearch7.17.22

下载安装 wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.17.22-amd64.deb wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.17.22-amd64.deb.sha512 shasum -a 512 -c elasticsearch-7.17.22-amd64.deb.sha512 su…

web、http协议、apache服务、nginx服务

web基本概念和常识 概念 web&#xff1a;为用户提供的一种在互联网上浏览信息的服务&#xff0c;是动态的、可交互的、跨平台的和图形化的&#xff1b; 为用户提供各种互联网服务&#xff0c;这些服务包括浏览服务以及各种交互式服务&#xff0c;包括聊天、购物等&#xff1…

windows下,pyrouge安装教程

1.安装perl 1.1 在命令行&#xff0c;检查perl是否安装 perl-v 1.2 安装perl 下载地址 Strawberry Perl for Windows - Releases 1&#xff09;下载msi版本 2&#xff09;双击安装包&#xff0c;傻瓜式安装&#xff0c;一路next&#xff0c;&#xff08;可修改安装路径&am…