前提条件:环境安装amqp和安装rabbitmq
sudo apt-get update
sudo apt-get install rabbitmq-amqp-dev
1、创建CMakeLists.txt文件
# Copyright (c) Huawei Technologies Co., Ltd. 2019. All rights reserved.
# CMake lowest version requirement
cmake_minimum_required(VERSION 3.5.1)
# project information
project(ACL_RESNET50)
# Compile options
add_compile_options(-std=c++11)
# set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "../out")
set(CMAKE_CXX_FLAGS_DEBUG "-fPIC -O0 -g -Wall")
set(CMAKE_CXX_FLAGS_RELEASE "-fPIC -O2 -Wall")
set(LIB_PATH $ENV{NPU_HOST_LIB})
# Header path
include_directories(
${INC_PATH}/acllib/include/
)
if(target STREQUAL "Simulator_Function")
add_compile_options(-DFUNC_SIM)
endif()
# add host lib path
link_directories(
${LIB_PATH}
)
# 添加你的可执行文件
add_executable(my_amqp_app_server my_amqp_app_server.cpp)
add_executable(my_amqp_app_client rabbitmq_client.cpp)
# 链接 amqpcpp 库
target_link_libraries(my_amqp_app_server rabbitmq)
target_link_libraries(my_amqp_app_client rabbitmq)
2、创建rabbitmq生产者文件rabbitmq_client.cpp
#include <string.h>
#include <iostream>
#include <amqp.h>
#include <amqp_tcp_socket.h>
int main(int argc, char const *const *argv)
{
//创建一个新的AMQP连接,他将被用于RabbitMQ服务器进行通信
amqp_connection_state_t conn = amqp_new_connection();
//这行代码创建了一个新的TCP套接字,并将其与之前的AMQP关联起来
amqp_socket_t *socket = amqp_tcp_socket_new(conn);
//这行代码打开了一个到"localhost"的套接字连接,端口号是AMQP协议的默认端口(通常是5672)
amqp_socket_open(socket, "192.168.140.2", AMQP_PROTOCOL_PORT);
//这行代码是使用用户名和密码(在这里是"guest",第一个是用户名,第二个密码)在默认的虚拟主机""
amqp_login(conn, "/", 0, AMQP_DEFAULT_FRAME_SIZE, 0, AMQP_SASL_METHOD_PLAIN, "center", "123qwe");
//打开一个通道,编号为1,所有的消息都是通过通道传输的。
const amqp_channel_t KChannel = 1;
amqp_channel_open(conn, KChannel);
//这两行代码都声明了与1个"hello"的队列。在rabbitmq中,所有消息都是存储在队列中的
amqp_bytes_t queueName(amqp_cstring_bytes("hello"));
amqp_queue_declare(conn, KChannel, queueName, false, false, false, false, amqp_empty_table);
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_REPLY_TO_FLAG | AMQP_BASIC_CORRELATION_ID_FLAG;
props.content_type = amqp_cstring_bytes("text/plain");
props.reply_to = amqp_cstring_bytes("response_queue");
props.correlation_id = amqp_cstring_bytes("12345");
//这行代码发布了一条消息到"hello"队列。消息的内容是字符串"Hello World!"
amqp_basic_publish(conn, KChannel, amqp_empty_bytes, /* routing key*/ queueName, false, false, &props, amqp_cstring_bytes("Hello World!"));
std::cout << " [x] Sent 'Hello World!'" << std::endl;
//首先关闭通道,然后关闭连接,最后销毁连接。这是一种良好的资源管理实践,可以确保不会有资源泄露
amqp_channel_close(conn, KChannel, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
return 0;
}
3、创建rabbitmq消费者my_amqp_app_server.cpp
#include <iostream>
#include <string.h>
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <iostream>
#include <fstream>
#include <cstring>
#include <map>
#include <math.h>
using namespace std;
int main(int argc, char const *const *argv)
{
//创建一个新的RabbitMQ连接,将其状态存储在 conn 中。
amqp_connection_state_t conn = amqp_new_connection();
//为新建的RabbitMQ连接创建一个TCP socket。
amqp_socket_t *socket = amqp_tcp_socket_new(conn);
//打开刚刚创建的TCP socket,连接到本地的RabbitMQ服务器。
amqp_socket_open(socket, "192.168.140.2", AMQP_PROTOCOL_PORT);
//使用"guest"作为用户名和密码在服务器上登陆。
amqp_login(conn, "/", 0, AMQP_DEFAULT_FRAME_SIZE, 0, AMQP_SASL_METHOD_PLAIN, "center", "123qwe");
const amqp_channel_t KChannel = 1;//设置通道号为1。
amqp_channel_open(conn, KChannel);//打开编号为1的通道。
amqp_bytes_t queueName(amqp_cstring_bytes("hello"));//设置要声明的队列的名字为"hello"。
amqp_queue_declare(conn, KChannel, queueName, false, false, false, false, amqp_empty_table);//在通道1上声明名为"hello"的队列。
//开始在"hello"队列上消费消息,这里设定为自动发送acknowledgement。
amqp_basic_consume(conn, KChannel, queueName, amqp_empty_bytes, false, /* auto ack*/true, false, amqp_empty_table);
for (;;)
{//无限循环,持续接收和处理消息
amqp_maybe_release_buffers(conn);
amqp_envelope_t envelope;//创建一个新的信封来接收消息。
amqp_consume_message(conn, &envelope, nullptr, 0);//从连接中接收一条消息,并将其放入信封中。
//打印出接收到的消息内容。
std::cout << " [x] Received " << std::string((char *)envelope.message.body.bytes,(int)envelope.message.body.len) << std::endl;
//获取消费者传送过来的属性信息
std::string reply_to(reinterpret_cast<char*>(envelope.message.properties.reply_to.bytes), envelope.message.properties.reply_to.len);
std::string correlation_id(reinterpret_cast<char*>(envelope.message.properties.correlation_id.bytes), envelope.message.properties.correlation_id.len);
std::cout << "Received message: " << std::string(reinterpret_cast<char*>(envelope.message.body.bytes), envelope.message.body.len)
<< " with reply_to: " << reply_to << " correlation_id: " << correlation_id << std::endl;
//响应,把响应信息发送到reply_to队列中。消费者通过correlation_id的唯一关联的id区分该响应的信息。
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CORRELATION_ID_FLAG;
props.correlation_id = amqp_cstring_bytes(correlation_id.c_str());
amqp_basic_publish(conn, KChannel, amqp_empty_bytes, amqp_cstring_bytes(reply_to.c_str()), false, false, &props, amqp_cstring_bytes(std::string("success").c_str()));
amqp_destroy_envelope(&envelope);//销毁信封,释放内存。
}
amqp_channel_close(conn, KChannel, AMQP_REPLY_SUCCESS);//关闭编号为1的通道。
amqp_connection_close(conn, AMQP_REPLY_SUCCESS); //关闭RabbitMQ连接
amqp_destroy_connection(conn); //销毁连接,释放内存。
return 0;
}
4、创建build文件夹,并且编译,生产可执行文件。
mkdir build
cd build
cmake ..
make
5、创建fastapi访问resfull接口,入口文件为main_fast_client.py
from fastapi.responses import JSONResponse
from fastapi import FastAPI, Request, UploadFile, File
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from RetrievalRpcClient import RetrievalRpcClient
app = FastAPI()
retrievalRpcClient = RetrievalRpcClient(input_queue=None, output_queue=None)
class TestRetrievalData(BaseModel):
input_text: str = None
async def send_message(testRetrievaldata):
response = retrievalRpcClient.produce(input_text=testRetrievaldata.input_text)
return response
@app.post("/get_message")
async def get_message(testRetrievaldata: TestRetrievalData):
response = await send_message(testRetrievaldata)
return response
if __name__ == '__main__':
import uvicorn
uvicorn.run(app="main_fast_client:app", port=int(5600), host="0.0.0.0",workers=10)
6、链接的rabbitmq的类RetrievalRpcClient.py
import functools
import json
import queue
import threading
import time
import traceback
from datetime import datetime
import pika
import uuid
class RetrievalRpcClient(object):
def __init__(self,input_queue,output_queue):
# 创建链接
self.response_queue = input_queue
self.output_queue = "hello"
self.url = "amqp://center:123qwe@192.168.140.2:5672/"
self.exchange = "retrival.mind"
self.parameters = pika.URLParameters(self.url)
self.connection_params = pika.ConnectionParameters(
host='192.168.140.2',
credentials=pika.PlainCredentials('center', '123qwe')
# blocked_connection_timeout=600 # 设置超时时间为600秒(10分钟)
)
#或者使用pika.URLParameters链接器
self.connection = pika.BlockingConnection(self.parameters)
# self.connection = pika.BlockingConnection(self.parameters)
self.connection = pika.BlockingConnection(self.connection_params)
def on_response(self, ch, method, props, body): # 回调
# 每条指令执行的速度可能不一样,指令1比指令2先发送,但可能指令2的执行结果比指令1先返回到客户端,
# 此时如果没有下面的判断,客户端就会把指令2的结果误认为指令1执行的结果,所以通过correlation_id进一步区分
print(props)
if self.corr_id == props.correlation_id:
self.response = body
# logger.info("self.corr_id = %s" % self.corr_id)
# logger.info("收到response的数据 %s" %json.loads(self.response))
print("收到response的数据 %s" %self.response)
return self.response
def produce(self, **kwargs):
input_text = kwargs.get("input_text")
#规范消息队列中数据传达格式
data_dic = {}
data = {"input_text": input_text}
data_dic["Data"]=data
data_dic["StartTime"]= datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# logger.info("data=%s" % data)
print("data_dic=%s" % data_dic)
self.response = None ##指令执行后返回的消息
self.corr_id = str(uuid.uuid4()) ##可用来标识指令(顺序)
print("self.corr_id=",self.corr_id)
channel = self.connection.channel()
# 指定队列,没有就创建,
channel.queue_declare(queue=self.output_queue)
# 声明交换机
channel.exchange_declare(exchange=self.exchange, exchange_type='topic')
channel.queue_bind(exchange=self.exchange, queue=self.output_queue, routing_key=self.output_queue)
#也可以写成一个固定的队列名称作为接收返回的信息,每个请求通过correlation_id区分响应
callback_queue = channel.queue_declare('', exclusive=False, auto_delete= True).method.queue #随机生成一个监听队列
channel.confirm_delivery()
#如果有上传模型文件,则需要经过推理和faiss两个服务,如果仅仅是文本则不需要。
try:
channel.basic_publish(exchange=self.exchange,
routing_key=self.output_queue,
properties=pika.BasicProperties(
reply_to=callback_queue, # 将指令执行结果返回到reply_to队列,该队列名称随机生成
correlation_id=self.corr_id, # 通过该唯一id,可以区分哪个response对应该请求。
),
mandatory= True,
body=json.dumps(data_dic))
# logger.info("前端请求信息已经发送到队列 [%s]中" %self.output_queue)
except Exception as e:
#打印完整的异常信息
traceback.print_exc()
print("进程处理异常", e)
# logger.info("监听返回的队列[%s]" %callback_queue)
#启动一个线程监听
print("监听返回的队列[%s]" %callback_queue)
q = queue.Queue() #通过队列获取线程里面的放回的结果
t = threading.Thread(target=self.do_consum,args=(channel,callback_queue,q))
t.start()
result = q.get()
return result
def do_consum(self, channel, callback_queue, q):
channel.basic_consume(queue=callback_queue, # 监听一接收到服务器端返回的数据就调用回调函数on_response
auto_ack=True,
on_message_callback=self.on_response)
while self.response is None: # 如果服务端没有返回,这一直循环。
self.connection.process_data_events() # 去queue接收数据(不阻塞)
q.put(self.response)
7、通过restful接口访问