Python ZeroMQ编程 网络通信协议详细说明和教程

news2024/11/30 6:56:21

ZeroMQ概述

ZeroMQ(又名ØMQ,MQ,或zmq)像一个可嵌入的网络库,但其作用就像一个并发框架。
ZeroMQ类似于标准Berkeley套接字,其提供了各种传输工具,如进程内、进程间、TCP和组播中进行原子消息传送的套接字。
可以使用各种模式实现N对N的套接字连接,这些模式包括:发布-订阅、任务分配、请求-应答。
ZeroMQ的速度足够快,因此可充当集群产品的结构。
ZeroMQ的异步I/O模型提供了可扩展的多核应用程序,用异步消息来处理任务
ZeroMQ核心由C语言编写,支持C、C++、java、python等多种编程语言的API,并可运行在大多数操作系统上
总结以下:ØMQ (ZeroMQ) 是一个基于消息队列的多线程网络库,它封装了网络通信、消息队列、线程调度等功能,向上层提供简洁的API,应用程序通过加载库文件,调用API函数来实现高性能网络通信。

ZeroMQ是一个高效的消息传输库,支持许多不同的传输协议和消息模式。与像TCP和HTTP等协议不同,ZeroMQ的目标是为应用程序之间的通信提供一种非常简单和快速的方式。它提供了比HTTP或TCP更高层次的抽象,允许开发人员直接在应用程序之间透明地传递消息,而无需自己构建完整的通信协议。

ZeroMQ提供了简单的API,可用于许多流行的编程语言,包括Python。它可以用于构建分布式系统或多线程应用,可以从中获取更高级别的抽象。

看起来有些抽象,下面我们结合ZeroMQ 的 Python 封装———— pyzmp,用实例看一下ZeroMQ的三种最基本的工作模式。

安装

安装方法

pip install pyzmq

查看是否安装成功

>>> import zmq
>>> print(zmq.__version__)

22.0.3

Request-Reply (请求响应模式)

Request-Reply模式概述:

  • 消息双向的,有来有往。
  • Client请求的消息,Server必须答复给Client。
  • Client在请求后,Server必须回响应,注意:Server不返回响应会报错。
  • Server和Client都可以是1:N的模型。通常把1认为是Server,N认为是Client。
  • 更底层的端点地址是对上层隐藏的,每个请求都隐含回应地址,而应用则不关心它。
  • ZMQ 可以很好的支持路由功能(实现路由功能的组件叫做 Device),把 1:N 扩展为 N:M(只需要加入若干路由节点)。
    在这里插入图片描述

Client端python实现

#client.py

import zmq

context = zmq.Context()

#  Socket to talk to server
print("Connecting to hello world server…")
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
socket.send(b"Hello")
#  Get the reply.
message = socket.recv()
print(f"Received reply [ {message} ]")

Server端python实现

#server.py
import time
import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")

while True:
    #  Wait for next request from client
    message = socket.recv()
    print("Received request: %s" % message)

    #  Do some 'work'
    time.sleep(1)

    #  Send reply back to client
    socket.send(b"World")
  • 启动client.py 首先会打印Connecting to hello world server… 但不会受到任何消息。
  • 然后启动server.py ,客户端收到来自客户端的request: b’Hello’
    = 此时client端收到来自server端的 reply: [ b’World’ ]
python client.py 
Connecting to hello world server…
Received reply [ b'World' ]

python server.py 
Received request: b'Hello'

可以试一下,多运行几个client.py,看看情况是什么样的。

Publish/Subscribe(订阅-发布模式 )

Pub-Subs模式概述:

消息单向,有去无回
一个发布端,多个订阅端;发布端只管产生数据,发布端发布一条消息,可被多个订阅端同时收到。
发布者不必关心订阅者的加入和离开,消息会以 1:N 的方式扩散到每个订阅者。
广播所有client,没有队列缓存,断开连接数据将永远丢失。
如果Publish端开始发布信息时,Subscribe端尚未连接进来,则这些信息会被直接丢弃。
PUB和SUB谁bind谁connect并无严格要求(虽本质并无区别),但仍建议PUB使用bind,SUB使用connect
使用SUB设置一个订阅时,必须使用zmq_setsockopt()对消息进行过滤
在这里插入图片描述

这里直接引用官方文档的例子:

发布者:类似于一个天气更新服务器,向订阅者发送天气更新,内容包括邮政编码、温度、湿度等信息

#Publisher.py
import zmq
from random import randrange


context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")

while True:
    zipcode = randrange(1, 100000)
    temperature = randrange(-80, 135)
    relhumidity = randrange(10, 60)

    socket.send_string("%i %i %i" % (zipcode, temperature, relhumidity))

订阅者:它监听发布者更新的数据流,过滤只接收与特定邮政编码相关的天气信息,默认接收接收10条数据

#Subscribe.py 
import sys
import zmq


#  Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print("Collecting updates from weather server...")
socket.connect("tcp://localhost:5556")

# Subscribe to zipcode, default is NYC, 10001
zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001"

# Python 2 - ascii bytes to unicode str
if isinstance(zip_filter, bytes):
    zip_filter = zip_filter.decode('ascii')
socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)

# Process 5 updates
total_temp = 0
for update_nbr in range(5):
    string = socket.recv_string()
    zipcode, temperature, relhumidity = string.split()
    total_temp += int(temperature)

print(
    "Average temperature for zipcode '%s' was %dF"
    % (zip_filter, total_temp / (update_nbr + 1))
)

在这里插入图片描述

Push/Pull(流水线模式)

流水线模式概述:

主要用于多任务并行。
消息单向,有去无回。
Push的任何一个消息,始终只会有一个Pull端收到消息。
Push 端还是 Pull 端都可以做 server,bind 到某个地址等待对方访问。
如果有多个PULL端同时连接到PUSH端,则PUSH端会在内部做一个负载均衡,采用平均分配的算法,将所有消息均衡发布到PULL端上。
由三部分组成,Push进行数据推送,work进行数据缓存,Pull进行数据竞争获取处理。
存在一个数据缓存和处理负载,当连接被断开,数据不会丢失,重连后数据继续发送到对端。
在这里插入图片描述

ventilator 使用的是 SOCKET_PUSH,将任务分发到 Worker 节点上。Worker 节点上,使用 SOCKET_PULL 从上游接受任务,并使用 SOCKET_PUSH 将结果汇集到 Sink。值得注意的是,任务的分发的时候也同样有一个负载均衡的路由功能,worker 可以随时自由加入,ventilator 可以均衡将任务分发出去。

Push/Pull模式还是蛮常用的,这里我们主要测试一下它的负载均衡。

Ventilator

# ventilator.py
import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5557")

while True:
    socket.send(b"test")
    print("已发送")
    time.sleep(1)
worker

# worker.py
import zmq

context = zmq.Context()

recive = context.socket(zmq.PULL)
recive.connect('tcp://127.0.0.1:5557')

sender = context.socket(zmq.PUSH)
sender.connect('tcp://127.0.0.1:5558')

while True:
    data = recive.recv()
    print("work1 正在转发...")
    sender.send(data)

sink

# sink.py
import zmq
import sys

context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind("tcp://*:5558")

while True:
    response = socket.recv()
    print("response: %s" % response)

打开4个Terminal,分别运行

python sink.py
python worker.py
python worker.py
python ventilator.py

简易程序版本

三、使用ZeroMQ进行通信

下面我们来看一个简单的例子,演示如何使用ZeroMQ在两个Python程序之间传递消息:

这是第一个Python程序,它会发送消息:

import zmq

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://127.0.0.1:5555")

socket.send(b"Hello World")

这是第二个Python程序,它将接收该消息:

import zmq

context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://127.0.0.1:5555")

message = socket.recv()
print(message)

在这个例子中,第一个程序创建了一个ZeroMQ上下文对象,并使用tcp://127.0.0.1:5555地址创建一个PUSH套接字。它发送一条消息,其内容为Hello World。

第二个程序也创建了一个ZeroMQ上下文对象,但它使用connect()方法连接了同一地址的PULL套接字。然后它调用recv()方法,等待来自第一个程序的消息,并在接收到消息后将其打印出来。

四、消息类型

ZeroMQ支持多种不同类型的消息,这些类型具有不同的功能和行为。下面是几种常见的消息类型:

REQ/REP
这种消息模式是简单的请求和响应模式。发送方使用REQ套接字发送请求,接收方使用REP套接字接收请求并发送响应。

PUB/SUB
这种消息模式是发布和订阅模式。发布者使用PUB套接字发布消息,订阅者使用SUB套接字订阅消息。

PUSH/PULL
这种消息模式是分发工作模式。PUSH套接字将任务分配给多个工作者并在它们完成任务后收集结果,而PULL套接字接收任务并完成它们并将结果返回。

五、使用不同的消息模式

下面是一个演示如何使用不同ZeroMQ消息模式的简单例子:

这是一个发布者程序:

import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://127.0.0.1:5555")

while True:
    time.sleep(1)
    socket.send(b"A new message from publisher!")

这是订阅者程序:

import zmq

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:5555")
socket.setsockopt(zmq.SUBSCRIBE, b"")

while True:
    message = socket.recv()
    print(message)

在这个例子中,发布者程序创建了一个PUB套接字,并绑定到地址tcp://127.0.0.1:5555。它使用无限循环向订阅者发送消息。

订阅者程序创建一个SUB套接字,并连接到相同的地址。它使用setsockopt()设置SUBSCRIBE选项,这意味着它将接收所有发布者发送的消息。然后它使用recv()方法等待接收消息,并在接收到消息时将其打印出来。

这些例子可以使您熟悉PythonZMQ和ZeroMQ的基础知识。使用ZeroMQ可以非常方便地在应用程序之间发送消息,可以让您构建分布式系统,提高应用程序的可靠性和效率。

总结

消息模型可以根据需要组合使用,后续的代理模式和路由模式等都是在三种基本模式上面的扩展或变异。继续探索,请移步官方文档

Github: https://github.com/zeromq/pyzmq
Docs: https://zeromq.github.io/pyzmq/
Guide: http://zguide.zeromq.org/py:all
pypi: https://pypi.org/project/pyzmq/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1299715.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构与算法-Rust 版读书笔记-1语言入门

数据结构与算法-Rust 版笔记 一、语言入门 1、关键字、注释、命名风格 目前(可能还会增加)39个,注意,Self和self是两个关键字。 Self enum match super as extern mod trait async false …

SOP(标准作业程序)和WI(操作指导书)的联系和区别

目录 1.SOP(标准作业程序):2.WI(操作指导书):3.SOP和WI的区别: 1.SOP(标准作业程序): SOP: 所谓SOP,是 Standard Operation Procedure三个单词中…

CF1898C Colorful Grid(构造)

题目链接 题目大意 n 行 m 列 的一个矩阵,每行有m - 1条边,每列有 n - 1 条边。 问一共走 k 条边,能不能从 (1, 1),走到(n, m),要求该路径上&am…

【nodejs升级版本】win10 nodejs版本低升级版本流程

首先 网上说的n模块不支持window系统!!! window系统升级node只能到node官网下载window安装包来覆盖之前的node 升级步骤如下: 1,找到你node的安装路径,不知道的可以cmd命令行中输入这个命令就可以看到了…

智能优化算法应用:基于正余弦算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用:基于正余弦算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用:基于正余弦算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.正余弦算法4.实验参数设定5.算法结果6.参考文…

uc_14_IP地址_套接字_字节序转换

1 计算机网络 计算机网络,是指将地理位置不同的具有独立功能的多台计算机及其外部设备,通过通信线路连接起来,在网络操作系统、网络管理软件及网络通信协议的管理和协调下,实现资源共享和信息传递的计算机系统。 网络协议是一种特…

【Vulnhub 靶场】【Hackable: III】【简单 - 中等】【20210602】

1、环境介绍 靶场介绍:https://www.vulnhub.com/entry/hackable-iii,720/ 靶场下载:https://download.vulnhub.com/hackable/hackable3.ova 靶场难度:简单 - 中等 发布日期:2021年06月02日 文件大小:1.6 GB 靶场作者&…

IDEA如何运行SpringBoot+Vue前后端分离的项目(超详细截图)

大家好,我是DeBug,很高兴你能来阅读!作为一名热爱编程的程序员,我希望通过这些教学笔记与大家分享我的编程经验和知识。在这里,我将会结合实际项目经验,分享编程技巧、最佳实践以及解决问题的方法。无论你是…

电源小白入门学习1——电源系统架构和相关指标

电源小白入门学习1——电源系统架构和相关指标 电源系统架构电源系统的指标及测量方法电源的效率电源的静态电流输出电压调整率纹波测量的注意事项动态负载测试 在开始本期内容之气,我先简单介绍一下我们电源小白学习系列内容:首先我是一个嵌入式小白&am…

c语言希尔排序总结(详解)

希尔排序: 1:分组插入排序两两分组降低元素个数提高插入的效率,先分组对每一组分别进行插入排序 希尔排序是插入排序的一种改进算法,也称为缩小增量排序。其基本原理是通过将待排序的序列分成若干个子序列,对每个子序…

二叉树算法专栏一《理论基础》

下面我会介绍一些我在刷题过程中经常用到的二叉树的一些基础知识,所以我不会教科书式地将二叉树的基础内容通通讲一遍。 二叉树的种类 在我们解题过程中二叉树有两种主要的形式:满二叉树和完全二叉树。 满二叉树 满二叉树是一种特殊的二叉树&#xf…

优麒麟ubuntukylin安装UE4.27.2

优麒麟ubuntukylin安装UE4.27.2 在(国产)优麒麟 ubuntukylin Linux平台上编译测试安装虚幻引擎。 优麒麟系统 这里选择的是官方增强版 https://www.ubuntukylin.com/downloads/ 同样的可以选择对应的Ubuntu22.04 LTS,唯一的区别就是优麒麟…

java--Math、System、Runtime

1.Math 代表数字,是一个工具类,里面提供的都是对数据进行操作的一些静态方法。 2.Math类提供的常见方法 3.System System代表程序所在的系统,也是一个工具类。 4.System类提供的常见方法 5.时间毫秒值 值的是从1970年1月1日 00:00:00走到…

Realme X7 Pro Root 刷机教程

Realme X7 Pro 刷机教程 Just For Fun,最近倒腾了下Realme X7 Pro 刷root。此博客为个人记录刷机过程,如有机友跟随本教程操作,请谨慎操作!!! 以下教程真针对Realme X7 Pro,其他版本方法未知&…

智能优化算法应用:基于斑马算法无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用:基于斑马算法无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用:基于斑马算法无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.斑马算法4.实验参数设定5.算法结果6.参考文献7.MATLAB…

多维时序 | MATLAB实现RIME-CNN-LSTM-Multihead-Attention多头注意力机制多变量时间序列预测

多维时序 | MATLAB实现RIME-CNN-LSTM-Multihead-Attention多头注意力机制多变量时间序列预测 目录 多维时序 | MATLAB实现RIME-CNN-LSTM-Multihead-Attention多头注意力机制多变量时间序列预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 MATLAB实现RIME-CNN-…

【rabbitMQ】rabbitMQ用户,虚拟机地址(添加,修改,删除操作)

rabbitMQ的下载,安装和配置 https://blog.csdn.net/m0_67930426/article/details/134892759?spm1001.2014.3001.5502 rabbitMQ控制台模拟收发消息 https://blog.csdn.net/m0_67930426/article/details/134904365?spm1001.2014.3001.5502 目录 用户 添加用户…

node.js安装和配置

软件介绍 Node.js是一个免费的、开源的、跨平台的JavaScript运行时环境,允许开发人员在浏览器之外编写命令行工具和服务器端脚本。 Node.js是一个基于Chrome JavaScript运行时建立的一个平台。 Node.js是一个事件驱动I/O服务端JavaScript环境,基于Googl…

在线网页生成工具GrapesJS

项目地址 https://github.com/GrapesJS/grapesjshttps://github.com/GrapesJS/grapesjs 项目简述 这是一个基于node.js的在线网页生成项目,对简化开发有很大的帮助。 主要使用的语言如下: 编辑页面如下: 使用也很简洁 具体可以看下项目。…

NYX靶场

信息收集 # Nmap 7.94 scan initiated Fri Nov 24 21:59:30 2023 as: nmap -sn -oN live.nmap 192.168.182.0/24 Nmap scan report for 192.168.182.1 (192.168.182.1) Host is up (0.00044s latency). MAC Address: 00:50:56:C0:00:08 (VMware) Nmap scan report for 192.168…