基于IPC-CFX的点对点通信C#

news2025/1/11 11:13:30

        IPC-CFX有两种主要的通信方式,可以通过RabbitMQ发布和订阅,也可以通过request和response进行点对点的通信,本文主要讲的是点对点的通信方式。

        在vscode里建立新的dotnet项目,可以通过终端输入dotnet new console来建立,文件目录为CFXDemo->machine1和CFXDemo->machine2。

        通过nuget插件分别为两个项目都安装CFX.CFXSDK、AMQPNetLite.Core和Newtonsoft.Json这几个metapackage。

         我们将machine1作为发送端(sendRequest),machine2作为接收端(Receive),则Machine1的代码如下所示:

using System.Threading;
using CFX;
using CFX.Transport;
using System;
using System.Security.Principal;

namespace machine1
{
    class Program
    {

        static void Main(string[] args)
        {

            OpenRequest();
            Console.ReadLine();
            for(int i = 0;i<5;i++){
                SendRequest();
                Thread.Sleep(2000);
            }
            
        }

        static string sendCFXHandle = "a.b.001";     
        static string receiveCFXHandle = "a.b.002";
        static string sendRequestUri = string.Format("amqp://127.0.0.1:1235");
        static string receivRequestUri = string.Format("amqp://127.0.0.1:1234");


        #region send request

        static AmqpCFXEndpoint endpointSendRequest;

        static void OpenRequest()
        {


            if (endpointSendRequest != null)
            {
                endpointSendRequest.Close();
                endpointSendRequest = null;

            }

            endpointSendRequest = new AmqpCFXEndpoint();

            if (!endpointSendRequest.IsOpen)
            {
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());
                endpointSendRequest.Open(sendCFXHandle);    //这一步会绑定endpointSendRequest里的CFXHandle,即sendCFXHandle的值
                Console.WriteLine("Request.Source is : {0}",endpointSendRequest.CFXHandle);
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());

            }

            // Set a timeout of 20 seconds.  If the target endpoint does not
            // respond in this time, the request will time out.
            AmqpCFXEndpoint.RequestTimeout = TimeSpan.FromSeconds(20);

        }
        static void SendRequest()
        {

            // Build a GetEndpointInfomation Request
            CFXEnvelope request = CFXEnvelope.FromCFXMessage(new GetEndpointInformationRequest()
            {
                CFXHandle = receiveCFXHandle
            });
            request.Source = endpointSendRequest.CFXHandle;
            request.Target = receiveCFXHandle;

            try
            {
                CFXEnvelope response = endpointSendRequest.ExecuteRequest(receivRequestUri, request);
                Console.WriteLine($"response:\n{response.ToJson(true)}");
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }

        #endregion send request


        #region receive request
        static AmqpCFXEndpoint endpointReceiveRequest;

        static void OpenReceive()
        {
            if (endpointReceiveRequest != null)
            {
                endpointReceiveRequest.Close();
                endpointReceiveRequest = null;
            }

            endpointReceiveRequest = new AmqpCFXEndpoint();

            endpointReceiveRequest.OnRequestReceived -= Endpoint_OnRequestReceived;
            endpointReceiveRequest.OnRequestReceived += Endpoint_OnRequestReceived;

            if (!endpointReceiveRequest.IsOpen)
            {
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());
                endpointReceiveRequest.Open(receiveCFXHandle, new Uri(receivRequestUri));
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());
            }
        }


        static CFXEnvelope Endpoint_OnRequestReceived(CFXEnvelope request)
        {
            Console.WriteLine($"Endpoint_OnRequestReceived:  { request.ToString()}");

            // Process request.  Return Result.
            if (request.MessageBody is WhoIsThereRequest)
            {
                CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse()
                { CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "" });
                result.Source = receiveCFXHandle;
                result.Target = request.Source;
                result.TimeStamp = DateTime.Now;
                return result;
            }

            if (request.MessageBody is GetEndpointInformationRequest)
            {
                CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse()
                { CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "...." });
                result.Source = receiveCFXHandle;
                result.Target = request.Source;
                result.TimeStamp = DateTime.Now;
                return result;
            }

            return null;
        }

        #endregion receive request




    }
}

        作为接收端,machine2的代码如下所示:

using CFX;
using CFX.Transport;
using System;

namespace machine2
{
    class Program
    {


        static void Main(string[] args)
        {

            Console.WriteLine("ReceivEndPoint is waiting Request......");
            OpenReceive();
            Console.WriteLine("Press Enter Key to end the App");
            Console.ReadKey();

        }

        static string sendCFXHandle = "a.b.001";
        static string receiveCFXHandle = "a.b.002";
        static string sendRequestUri = string.Format("amqp://127.0.0.1:1235");
        static string receivRequestUri = string.Format("amqp://127.0.0.1:1234");


        #region send request

        static AmqpCFXEndpoint endpointSendRequest;

        static void OpenRequest()
        {


            if (endpointSendRequest != null)
            {
                endpointSendRequest.Close();
                endpointSendRequest = null;

            }

            endpointSendRequest = new AmqpCFXEndpoint();

            if (!endpointSendRequest.IsOpen)
            {
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());
                endpointSendRequest.Open(receiveCFXHandle, new Uri(receivRequestUri));
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());

            }

            // Set a timeout of 20 seconds.  If the target endpoint does not
            // respond in this time, the request will time out.
            AmqpCFXEndpoint.RequestTimeout = TimeSpan.FromSeconds(20);

        }
        static void SendRequest()
        {

            // Build a GetEndpointInfomation Request
            CFXEnvelope request = CFXEnvelope.FromCFXMessage(new GetEndpointInformationRequest()
            {
                CFXHandle = receiveCFXHandle
            });
            request.Source = endpointSendRequest.CFXHandle;
            request.Target = receiveCFXHandle;

            try
            {
                CFXEnvelope response = endpointSendRequest.ExecuteRequest(receivRequestUri, request);
                Console.WriteLine($"response:\n{response.ToJson(true)}");
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }

        #endregion send request


        #region receive request
        static AmqpCFXEndpoint endpointReceiveRequest;

        static void OpenReceive()
        {
            if (endpointReceiveRequest != null)
            {
                endpointReceiveRequest.Close();
                endpointReceiveRequest = null;
            }

            endpointReceiveRequest = new AmqpCFXEndpoint();

            endpointReceiveRequest.OnRequestReceived -= Endpoint_OnRequestReceived;
            endpointReceiveRequest.OnRequestReceived += Endpoint_OnRequestReceived;

            if (!endpointReceiveRequest.IsOpen)
            {
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());
                endpointReceiveRequest.Open(receiveCFXHandle, new Uri(receivRequestUri));
                //endpointReceiveRequest.Open(receiveCFXHandle);
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());
            }
        }


        static CFXEnvelope Endpoint_OnRequestReceived(CFXEnvelope request)
        {
            Console.WriteLine($"Endpoint_OnRequestReceived:  { request.ToString()}");
            Console.WriteLine($"request:\n{request.ToJson(true)}");

            // Process request.  Return Result.
            if (request.MessageBody is WhoIsThereRequest)
            {
                CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse()
                { CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "" });
                result.Source = receiveCFXHandle;
                result.Target = request.Source;
                result.TimeStamp = DateTime.Now;
                return result;
            }

            if (request.MessageBody is GetEndpointInformationRequest)
            {
                CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse()
                { CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "..." });
                result.Source = receiveCFXHandle;
                result.Target = request.Source;
                result.TimeStamp = DateTime.Now;
                return result;
            }

            return null;
        }

        #endregion receive request




    }
}

         运行结果,可以用json格式对response和request的内容进行解析。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/771478.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring Cloud 2022 发布,这几个组件要移除了!

继SpringBoot 3.0和SpringFramework 6.0之后&#xff0c;Spring Cloud 终于也推出了新版本——2022.0.0&#xff0c;官网把这个版本命名为Kilburn。 目前在Maven仓库中已经可以下载使用了&#xff0c;通过POM文件即可依赖到项目中&#xff1a; <dependencyManagement>&l…

阿里云声音复刻

阿里云声音复刻 个性化人声定制 阿里云个性化人声定制是智能语音交互产品自学习平台下的一部分 使用方式&#xff1a;https://help.aliyun.com/document_detail/456006.html 方式一&#xff1a;控制台界面定制使用方式 方式二&#xff1a;通过OpenAPI定制&#xff1a;在该页…

微服务保护——Sentinel【实战篇】

一、限流规则&#x1f349; 1.簇点链路&#x1f95d; 簇点链路&#xff1a;就是项目内的调用链路&#xff0c;链路中被监控的每个接口就是一个资源。默认情况下sentinel会监控SpringMVC的每一个端点&#xff08;Endpoint&#xff09;&#xff0c;因此SpringMVC的每一个端点&a…

CS162 11-12 调度与死锁

调度 overview 1.FCFS 可以利用好cache缓存&#xff0c;减少上下文切换。 2.很直观&#xff0c;贪心&#xff0c;可以减少平均的响应时间 3 4. 5.等待调度的时间是平均的 6.优先级翻转&#xff0c;和优先级捐赠 解决 cfs中的调度 死锁 四个必要不充分条件 银行家算法&…

基于 ChatGPT 的 helm 入门

1. 写在最前面 公司最近在推业务上云&#xff08;底层为 k8s 管理&#xff09;&#xff0c;平台侧为了简化业务侧部署的复杂度&#xff0c;基于 helm 、chart 等提供了一个发布平台。 发布平台的使用使业务侧在不了解 helm 、chart 等工具的时候&#xff0c;「只要点点」就可…

初识protobuf

Protobuf 全称Protocol Buffers&#xff08;协议缓冲区&#xff09;&#xff0c;是一种轻量级、高效的数据序列化格式&#xff0c;由Google开发。它被设计用于结构化数据的序列化、反序列化以及数据交换&#xff0c;常用于网络通信和数据存储等领域。 Protobuf使用简洁的消息描…

【实战技能】基于硬件垂直消隐的多缓冲技术在LVGL, emWin,GUIX和TouchGFX应用,含视频教程

原贴地址&#xff1a;https://www.armbbs.cn/forum.php?modviewthread&tid120114 这两天研究了下LVGL的持单缓冲&#xff0c;双缓冲和配合硬件消隐的双缓冲的实现&#xff08;已经分享V5&#xff0c;V6和V7开发板的程序模板&#xff09;&#xff0c;特别是这个整屏缓冲方…

DB-Engines排名公布 GBASE南大通用入围国产数据库TOP 3

什么是DB-Engines排名&#xff1f; DB-Engines排名是数据库领域的流行度榜单&#xff0c;它对全球范围内的419款数据库&#xff08;截至2023年7月&#xff09;进行排名&#xff0c;每月更新一次&#xff0c;排名越靠前&#xff0c;则表示越流行。在很多技术选型的场合&#xf…

Kubernetes——CKA证书

拿到CKA证书啦&#xff0c;打算近期再准备一下备考的学习笔记以及备考经验&#xff0c;有需要的朋友可以点赞加关注&#xff0c;我会持续更新&#xff0c;您的一个赞就能给我一份整理笔记的动力&#xff01;&#x1f92d;

掘金量化—Python SDK文档—5.API 介绍(1)

​ Python SDK文档 5.API 介绍 5.1基本函数 init - 初始化策略 初始化策略, 策略启动时自动执行。可以在这里初始化策略配置参数。 函数原型&#xff1a; init(context)参数&#xff1a; 参数名类型说明contextcontext上下文&#xff0c;全局变量可存储在这里 示例&…

vue2watch监听遇到的问题

1 vue 父组件里引入子组件 显示与隐藏是v-if控制时 父传入子的参数通过watch 监听请求接口时 watch 时而监听不到 请求接口的参数就不对 如图 父组件这么引入子组件v-show 和v-if 是有区别的 2 子组件通过watch 监听后 清空页面要展示的列表数据 重新从第一页加载数据&#x…

程序员如何准备技术面试

程序员如何准备技术面试 &#x1f607;博主简介&#xff1a;我是一名正在攻读研究生学位的人工智能专业学生&#xff0c;我可以为计算机、人工智能相关本科生和研究生提供排忧解惑的服务。如果您有任何问题或困惑&#xff0c;欢迎随时来交流哦&#xff01;&#x1f604; ✨座右…

Redis常见须知

介绍一下redis数据库 Redis 是一种基于内存的数据库&#xff0c;对数据的读写操作都是在内存中完成&#xff0c;因此读写速度非常快&#xff0c;常用于缓存&#xff0c;消息队列、分布式锁等场景。 Redis 提供了多种数据类型来支持不同的业务场景&#xff0c;比如 String(字符…

【后端面经-Java】JVM垃圾回收机制

【后端面经-Java】JVM垃圾回收机制 1. Where&#xff1a;回收哪里的东西&#xff1f;——JVM内存分配2. Which&#xff1a;内存对象中谁会被回收&#xff1f;——GC分代思想2.1 年轻代/老年代/永久代2.2 内存细分 3. When&#xff1a;什么时候回收垃圾&#xff1f;——GC触发条…

【汉诺塔问题分析】

一、背景 汉诺塔问题是一种经典的递归问题&#xff0c;它由法国数学家Huygens在1665年发现&#xff0c;也是一道有趣的数学难题。这道问题的主要目的是将三根柱子上的一堆盘子移动到另一根柱子上&#xff0c;移动过程中每次只能移动一个盘子&#xff0c;并且大盘子不能放在小盘…

【LeetCode热题100】打卡第40天:翻转二叉树回文链表

文章目录 【LeetCode热题100】打卡第40天&#xff1a;翻转二叉树&回文链表⛅前言 翻转二叉树&#x1f512;题目&#x1f511;题解 回文链表&#x1f512;题目&#x1f511;题解 【LeetCode热题100】打卡第40天&#xff1a;翻转二叉树&回文链表 ⛅前言 大家好&#xff…

高数-第一章-函数-极限 连续

目录 第一章 函数 极限 连续第一节 函数第二节 极限一、极限的概念与性质&#xff08;1&#xff09;数列的极限例1例2 &#xff08;2&#xff09;函数的极限&#xff08;3&#xff09;极限的性质&#xff08;保号性重点 有界性&#xff09;例12例13例14 &#xff08;4&#xff…

Python 3 拷贝、浅拷贝、直接引用

诸神缄默不语-个人CSDN博文目录 复杂的以后再补。 总的来说&#xff0c;像常数、字符串这种比较简单的变量无所谓&#xff0c;但是对于一些复杂对象&#xff08;比如list等&#xff09;&#xff0c;如果直接使ba&#xff0c;相当于直接把a的路径给了b&#xff0c;b这个对象的…

stb_image简单使用

简介stb_image stb_image 是一个非常轻量级的、单文件的图像加载库&#xff0c;用于加载和解码多种图像格式&#xff08;如BMP、JPEG、PNG、GIF等&#xff09;的图像数据。它由Sean T. Barrett开发&#xff0c;并以公共领域&#xff08;Public Domain&#xff09;许可发布&…

【软件测试】web测试bug定位思路总结,“我“不再背锅...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 需要掌握的知识 …