rust操作rabbitmq

news2024/11/14 18:01:10

Rust 操作 Rabbitmq

使用docker快速部署rabbitmq

docker pull rabbitmq:management
# 15672为rabbitmq 管理员端口,默认账号密码为guest(账号密码相同)
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

rust 添加amqp库lapin

cargo add lapin

1. 连接到rabbitmq

let conn=lapin::Connection::connect(
      "amqp://localhost:5672",
      lapin::ConnectionProperties::default(),
  )
  .await?;
let chan=conn.create_channel().await?;

2. 交换机创建和队列创建

//创建一个名为itest的交换机,模式为话题模式
chan.exchange_declare(
    "itest",
    lapin::ExchangeKind::Topic,
    lapin::options::ExchangeDeclareOptions::default(),
    lapin::types::FieldTable::default(),
)
.await?;
//创建一个名为queue1的队列
chan.queue_declare(
    "queue1",
    lapin::options::QueueDeclareOptions::default(),
    lapin::types::FieldTable::default(),
)
.await?;
//绑定队列到交换机,将名为队列queue1绑到交换机itest,并设置路由名为/queue1
chan.queue_bind(
    "queue1",
    "itest",
    "/queue1",
    lapin::options::QueueBindOptions::default(),
    lapin::types::FieldTable::default(),
).await?;

3. 生产者发布消息

// 发送给itest交换机,交换机会把消息交给路由/queue1
chan.basic_publish(
    "itest",
    "/queue1",
    lapin::options::BasicPublishOptions::default(),
    "hello".as_bytes(),
    lapin::BasicProperties::default(),
).await.expect("publish message failed");

4. 消费者订阅消息

let consumer = chan
   .basic_consume(
        "queue1",
        "",
        lapin::options::BasicConsumeOptions::default(),
        lapin::types::FieldTable::default(),
    )
    .await?;
consumer.set_delegate(|d: lapin::message::DeliveryResult| async move {
    match d {
        Err(err) => eprintln!("subscribe message error {err}"),
        Ok(data) => {
            if let Some(data) = data {
                let raw = data.data.clone();
                let f = data.ack(lapin::options::BasicAckOptions::default());
                println!(
                    "accept msg {}",
                    String::from_utf8(raw).expect("parse msg failed")
                );
                if let Err(err) = f.await {
                    eprintln!("ack failed {err}");
                }
            }
        }
    }
});

最终demo

#[cfg(test)]
mod mq{
	#[tokio::test]
    async fn rabbitmq() -> Result<(), Box<dyn std::error::Error>> {
    //连接到rabbitmq
        let conn = lapin::Connection::connect(
            "amqp://localhost:5672",
            lapin::ConnectionProperties::default(),
        )
        .await?;
        let chan = conn.create_channel().await?;
        //初始化queue和exchange
        chan.queue_declare(
            "queue1",
            lapin::options::QueueDeclareOptions::default(),
            lapin::types::FieldTable::default(),
        )
        .await?;
        chan.exchange_declare(
            "itest",
            lapin::ExchangeKind::Topic,
            lapin::options::ExchangeDeclareOptions::default(),
            lapin::types::FieldTable::default(),
        )
        .await?;
        chan.queue_bind(
            "queue1",
            "itest",
            "/queue1",
            lapin::options::QueueBindOptions::default(),
            lapin::types::FieldTable::default(),
        )
        .await?;
        //发送消息
        tokio::spawn(async move {
            chan.basic_publish(
                "itest",
                "/queue1",
                lapin::options::BasicPublishOptions::default(),
                "hello".as_bytes(),
                lapin::BasicProperties::default(),
            )
            .await
            .expect("publish message failed");
        });
        let chan = conn.create_channel().await?;
        let consumer = chan
            .basic_consume(
                "queue1",
                "",
                lapin::options::BasicConsumeOptions::default(),
                lapin::types::FieldTable::default(),
            )
            .await?;
            //使用回调来触发接受到新消息时的操作,使用futures_lite 中StreamExt 可以不使用回调
        consumer.set_delegate(|d: lapin::message::DeliveryResult| async move {
            match d {
                Err(err) => eprintln!("subscribe message error {err}"),
                Ok(data) => {
                    if let Some(data) = data {
                        let raw = data.data.clone();
                        let f = data.ack(lapin::options::BasicAckOptions::default());
                        println!(
                            "accept msg {}",
                            String::from_utf8(raw).expect("parse msg failed")
                        );
                        if let Err(err) = f.await {
                            eprintln!("ack failed {err}");
                        }
                    }
                }
            }
        });
        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
        Ok(())
    }
}

结果展示

请添加图片描述
rabbitmq 管理后台页面可以看到我们创建的itest交换机和queue1队列向绑定,queue1的路由地址为/queue1
在这里插入图片描述

简言

amqp 包其实无论是rust 的lapin还是golang的streadway/amqp,操作手法整体都是一样的,rabbitmq其它几种模式可以参考我goalng 的rabbitmq几种模式下操作方式来类推

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2048501.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

影院订票系统/电影院售票系统/电影院购票系统的设计与实现/影院管理系统

摘 要 “互联网”的战略实施后&#xff0c;很多行业的信息化水平都有了很大的提升。但是目前很多电影院日常业务仍是通过人工管理的方式进行&#xff0c;需要在影院订票投入大量的人力进行很多重复性工作&#xff0c;这样就浪费了许多的人力物力&#xff0c;工作效率较低&…

【Godot4自学手册】第四十五节用着色器(shader)制作水中效果

本节内容&#xff0c;主要学习利用着色器制作水波纹效果&#xff0c;效果如下&#xff1a; 一、搭建新的场景 首先我们新建场景&#xff0c;根节点选择Node2D&#xff0c;命名为Water&#xff0c;给根节点添加两个Tilemap节点&#xff0c;一个命名为Background主要用于绘制地…

JUC介绍

一、并发与并行 1.并发 早期计算机CPU是单核的&#xff0c;为了提高CPU的利用率&#xff0c;减少等待时间&#xff0c;使用到了并发工作的理论 并发就是将CPU资源合理分配给多个任务&#xff0c;当一个任务执行I/O操作时&#xff0c;转去执行其他任务 2.并行 针对多核CPU&…

25届科大讯飞飞星计划 AI研究算法工程师 面经

目录 一面/技术面 2024/08/15 &#x1f4cb; 总结&#xff1a; 本来应该是在7月底面试的&#xff0c;但因为有事就拖到了现在&#xff0c;或许是飞星计划里最晚面试的一批&#xff1f;面试官很和蔼&#xff0c;问的问题不算难&#xff0c;总体体验还算不错。 一面/技术面 2024/…

MySQL基础--逻辑存储结构,架构

逻辑存储结构 表空间&#xff08;ibd 文件&#xff09;&#xff1a;一个 mysql 实例可以对应多个表空间&#xff0c;用于存储记录&#xff0c;索引等数据。 段&#xff1a;分为数据段&#xff0c;索引段&#xff0c;回滚段&#xff0c;InnoDB 是索引组织表&#xff0c;数据段就…

Unity引擎基础知识

目录 Unity基础知识概要 1. 创建工程 2. 工程目录介绍 3. Unity界面和五大面板 4. 游戏物体创建与操作 5. 场景和层管理 6. 组件系统 7. 脚本语言C# 8. 物理引擎和UI系统 学习资源推荐 Unity引擎中如何优化大型游戏项目的性能&#xff1f; Unity C#脚本语言的高级编…

修复 iPad 卡在准备更新或正在进行更新的问题

为什么iPad 更新卡住了&#xff1f;原因很难确定&#xff0c;因为 iPad 的许多故障和状况都可能导致 iPad 无法更新 iOS 和应用程序。此外&#xff0c;很难弄清楚这种情况持续了多长时间。但是&#xff0c;您不必太担心&#xff0c;因为这只是一个小案例&#xff0c;您可以阅读…

Java入门(上)

day01 - Java基础语法 1. 人机交互 1.1 什么是cmd&#xff1f; 就是在windows操作系统中&#xff0c;利用命令行的方式去操作计算机。 我们可以利用cmd命令去操作计算机&#xff0c;比如&#xff1a;打开文件&#xff0c;打开文件夹&#xff0c;创建文件夹等。 1.2 如何打…

单元训练13:串行接口的进阶应用

蓝桥杯&#xff0c;小蜜蜂&#xff0c;单元训练13&#xff1a;串行接口的进阶应用 /** Description:* Author: fdzhang* Email: zfdcqq.com* Date: 2024-08-17 15:41:34* LastEditTime: 2024-08-17 19:48:35* LastEditors: fdzhang*/ #include "stc15f2k60s2.h"#defi…

算法工程师第四十天(647. 回文子串 516.最长回文子序列 动态规划总结篇 )

参考文献 代码随想录 一、回文子串 给你一个字符串 s &#xff0c;请你统计并返回这个字符串中 回文子串 的数目。 回文字符串 是正着读和倒过来读一样的字符串。 子字符串 是字符串中的由连续字符组成的一个序列。 示例 1&#xff1a; 输入&#xff1a;s "abc"…

MySQL 异步主从复制流程解析

前言&#xff1a; 首先MySQL主从复制方式有多种&#xff0c;包括 binlog、GTID等&#xff0c;这里基于 binlog 的形式&#xff0c;解析异步主从复制流程 首先通过下面命令查看全部 binlog 日志文件 show binary logs; binlog 日志文件如下&#xff1a; 然后查看其中一个文件…

ECMAScript6语法:默认参数和rest参数

1、默认参数 默认参数即在定义函数的参数列表中指定了默认值的参数。在 ES5 中&#xff0c;并没有提供在参数列表中指定参数默认值的语法&#xff0c;要想为函数的参数指定默认值&#xff0c;只能在函数体中实现&#xff0c;示例代码如下&#xff1a; function table(width, …

MBR10200FCT-ASEMI智能AI专用MBR10200FCT

编辑&#xff1a;ll MBR10200FCT-ASEMI智能AI专用MBR10200FCT 型号&#xff1a;MBR10200FCT 品牌&#xff1a;ASEMI 封装&#xff1a;TO-220F 批号&#xff1a;最新 最大平均正向电流&#xff08;IF&#xff09;&#xff1a;10A 最大循环峰值反向电压&#xff08;VRRM&a…

西安旅游系统--论文pf

TOC springboot383西安旅游系统--论文pf 第1章 绪论 1.1 课题背景 二十一世纪互联网的出现&#xff0c;改变了几千年以来人们的生活&#xff0c;不仅仅是生活物资的丰富&#xff0c;还有精神层次的丰富。在互联网诞生之前&#xff0c;地域位置往往是人们思想上不可跨域的鸿…

YOLOv8侦测任务更换主干网络成MobileNetV3

目录 1. 添加主干网络模块 ​编辑1.1 在init.py中添加模块名 1.2 主体代码中添加调用语句块 2. 配置yaml文件 3. 修改成功 1. 添加主干网络模块 1.1 在init.py中添加模块名 1.2 主体代码中添加调用语句块 2. 配置yaml文件 3. 修改成功 自己随便找一个程序跑一跑验证…

微电网控制器是什么?微电网中央控制器|微电网协调控制器|微电网控制系统图|Micon2505微网中央控制器方案介绍

微电网控制器是什么&#xff1f;微电网中央控制器|微电网协调控制器|微电网控制系统图|Micon2505微网中央控制器方案介绍及其在油田采油机场景中的应用。微电网控制器广泛应用于具备光伏&#xff0c;储能&#xff0c;V2G&#xff0c;充电桩&#xff0c;风电&#xff0c;柴油发电…

图解内存分配算法 -- 小内存分配算法

图解内存分配算法 – 小内存分配算法 文章目录 图解内存分配算法 -- 小内存分配算法1. 算法介绍2. 算法图解2.1 约定2.2 数据结构介绍2.3 初始化2.4 第一次 malloc 40字节2.5 第二次 malloc 18 字节2.6 第三次 malloc 20字节2.7 第四次 malloc 40字节2.8 第一次 free2.9 第二次…

LINUX服务器部署准备

文章目录 配置环境变量NODE下载解压安装 NGINX下载NGINX下载GCC并安装解压安装启动NGINX MAVEN下载解压安装配置环境 TOMCAT下载兼容版本解压安装开启远程访问 REDIS下载解压安装配置远程使用关闭保护模式 配置环境变量 vi /etc/profile source /etc/profile解压成功之后、运行…

安装IDEA2021.2.1(含安装包)及其扩展设置

一、下载 通过百度网盘分享的文件&#xff1a;ideaIU-2021.2.1.exe 链接&#xff1a;https://pan.baidu.com/s/1cCUHNm0dpWlfkxf5RCEgfw 提取码&#xff1a;v62e 二、安装 安装视频网址&#xff1a;Java基础概念-12-idea的概述和下载安装_哔哩哔哩_bilibili 三、idea中的第一…

Postman内置动态参数和自定义动态参数

业务场景 现在有两个接口&#xff0c;接口1&#xff1a;获取接口统一鉴权码token接口&#xff0c;接口2&#xff1a;创建标签接口&#xff0c;标签接口的创建依赖接口1返回的鉴权码&#xff0c;即需要获取access_token的值&#xff0c;替换ACCESS_TOKEN。且接口2中标签名不能和…