基于python-socket构建任务服务器(基于socket发送指令创建、停止任务)

news2024/12/23 8:36:57

在实现ia业务服务器时需要构建一个python-socket客户端,1、要求能与服务器保持心跳连接,每10秒钟发送一次心跳信号;2、要求能根据socket服务器发送的指令创建或终止一个定时任务。
为此以3个类实现该功能,分别为socket通信类(用于实现通信连接与任务创建)、任务池类(用于管理任务)、任务类(用于实现具体任务)。

1、socket通信客户端

这里定义的MySocket类主体结构如下图所示,共包含4个函数,2个线程(其本身继承Thread类实现主任务流程——run函数、接收服务器信息并创建任务添加到任务池;同时又在__init__函数中将self.thread_msg类封装为一个线程,每隔10秒钟向socket服务器发送一次心跳包)。check_connection函数用于检测socket是否与服务器断开连接,在send_msg函数中调用,当发现客户端掉线后则立刻进行重连。send_msg函数用于发送信息给服务器,因为run函数与thread_msg函数2个线程都需要调用连接与服务器发送数据,为避免冲突故而定义为函数在内部进行加锁。
在这里插入图片描述

#socket客户端
class MySocket(Thread):
    def __init__(self,config):
        super().__init__()
        # 1.创建套接字
        self.tcp_socket = socket(AF_INET,SOCK_STREAM)
        self.tcp_socket.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) #在客户端开启心跳维护
        # 2.准备连接服务器,建立连接
        self.serve_ip = config["serve_ip"]#当前"118.24.111,149"
        self.serve_port = config["serve_port"]  #端口当前7900
        self.sleep_time = config["sleep_time"]
        print("connect to : ",self.serve_ip,self.serve_port)
        self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        self.lock = threading.RLock()
        
        self.taskpool=TaskPool()

        task_msg=threading.Thread(target=self.thread_msg)
        task_msg.daemon = True
        task_msg.start()
            #定时发送信息
    def run(self):
        while True:
            a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1k
            a=a.decode('utf-8')
            print("------主线程-----",a)
            jdata=json.loads(a)
            #jdata={"streamAddr":"rtmp://adasdasdxcvsdfj.sdfdsfsd","state":1,"count":5,"taskname":"aaa","jsonname":"a.json"}
            task=OCRTask(jdata)
            self.taskpool.append(task)
            
            json_data={  
                "type":"OCR_STATE_ACK",
                "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致
                "streamAddr": jdata["streamAddr"]
            }
            #print( json_data)
            message = json.dumps(json_data)
            data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
            data=hex_to_bytes(data)
            self.send_msg(data)

    def check_connection(self):
        try:
            self.tcp_socket.getpeername()
            return True
        except socket.error:
            return False
    
    #定时发送心跳信息
    def thread_msg(self):
        while True:
            #message=input('You can say:')
            #json标注的模板
            json_data={  
                "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致
                "type":"HEARBEAT"
            }
            #print( json_data)
            message = json.dumps(json_data)
            data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
            data=hex_to_bytes(data)

            #进行定时发送
            self.send_msg(data)
            # self.lock.acquire()
            # self.tcp_socket.send(data)#将发送的数据进行编码
            # self.lock.release()
            try:
                #进行定时发送
                self.lock.acquire()
                a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1k
                self.lock.release()
                time.sleep(self.sleep_time)
                print("ack: ",a.decode('utf-8'))
            except ConnectionRefusedError:
                print('服务器拒绝本次连接!!!!!')
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            except TimeoutError:
                print('连接超时!!!!!')
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            except OSError:
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
                print('智能终端无网络连接!!!!!')

    def send_msg(self,msg):
        if self.check_connection() is False:
            print('服务器掉线!!!!!')
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        try:
            #进行定时发送
            self.lock.acquire()
            self.tcp_socket.send(msg)
            self.lock.release()
        except ConnectionRefusedError:
            print('服务器拒绝本次连接!!!!!')
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        except TimeoutError:
            print('连接超时!!!!!')
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        except OSError:
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            print('智能终端无网络连接!!!!!')

2、任务池实现

任务池的实现代码如下所示,主要包含3个函数(其中将remove_task封装为一个子线程,用于实时移除已经完成计算任务的线程),append函数用于将新创建的任务添加大任务池pool中,stop函数用于停止并移除正在运行中的任务。
在这里插入图片描述
其具体实现代码如下所示,其作为MySocket类中的一个成员属性,每当MySocket接收到服务器信息创建任务ocrtask后都调用TaskPool.append(ocrtask)将任务添加到任务池中。由任务池管理任务的声明周期,具体可见其append函数可以启动task或终止task。remove_task线程会自动将已经完成的任务移除。

#ocr任务线程池
class TaskPool:
    def __init__(self,sleep_time=0.5):
        self.pool=[]
        self.sleep_time=sleep_time
        task_msg=threading.Thread(target=self.remove_task)
        task_msg.daemon = True
        task_msg.start()

    #删除已经结束的任务
    def remove_task(self):
        while True:
            names=[]
            for task in self.pool:
                if task.get_count()==0: #生存时间为0,认为该任务已经完成需要被删除
                    task.stop()
                    self.pool.remove(task)
                else:
                    names.append(task.taskname)
            if len(names)>0:
                print(names)
            time.sleep(self.sleep_time)
            
    def append(self,ocrtask):
        if ocrtask.state==0:
            #终止任务
            self.stop(ocrtask)
        else:
            #启动任务
            ocrtask.start()
            self.pool.append(ocrtask)

    #终止任务
    def stop(self,ocrtask):
        for task in self.pool:
            if task.taskname==ocrtask.taskname:
                task.stop()
                self.pool.remove(task)

3、具体任务线程

任务的实现代码如下所示,其支持3中任务模式,使用state区分任务,state为0-停止识别,1-连续识别count张,2-持续识别(故而在state为2时将count设置的特别大)。这里以count控制任务的运行,任务每运行一次count减少1。当count小于等于0,则表示任务运行完成。在TaskPool的remove_task中检测到count为0时则会自动删除任务。

#ocr任务
class OCRTask(Thread):
    def __init__(self,json):
        super().__init__()
        self.streamAddr=json["streamAddr"]
        self.state=json["state"] # 0-停止识别,1-连续识别count张,2-持续识别
        if json["state"]==2:
            self.count=9999999999999999999999999
        else:
            self.count=json["count"]
        if "taskname" in json.keys():
            self.taskname=json["taskname"]
        else:
            self.taskname=json["streamAddr"]

        self.jsonname=json["jsonname"]
        self.lock = threading.RLock()

    def run(self):
        while self.get_count()>0:
            print('run %s'%self.taskname,end='*')
            time.sleep(2)
            self.lock.acquire()
            self.count-=1
            self.lock.release()
        print('%s finish!! '%self.taskname)

    #获取任务的生存时间
    def get_count(self):
        self.lock.acquire()
        now_count=self.count
        self.lock.release()
        #削减count
        return now_count

    #停止任务
    def stop(self):
        self.lock.acquire()
        self.count=-1
        self.lock.release()
        #停止任务
        pass

4、完整代码与使用效果

完整代码如下所示

from socket import *
import time,json
import yaml
import threading,struct
from threading import Thread
 
def hex_to_bytes(hex_str):
    """
    :param hex_str: 16进制字符串
    :return: byte_data 字节流数据
    """
    bytes_data = bytes()
    while hex_str :
        """16进制字符串转换为字节流"""
        temp = hex_str[0:2]
        s = int(temp, 16)
        bytes_data += struct.pack('B', s)
        hex_str = hex_str[2:]
    return bytes_data

# 读取Yaml文件方法
def read_yaml(yaml_path):
    with open(yaml_path, encoding="utf-8", mode="r") as f:
        result = yaml.load(stream=f,Loader=yaml.FullLoader)
        return result

#ocr任务
class OCRTask(Thread):
    def __init__(self,json):
        super().__init__()
        self.streamAddr=json["streamAddr"]
        self.state=json["state"] # 0-停止识别,1-连续识别count张,2-持续识别
        if json["state"]==2:
            self.count=9999999999999999999999999
        else:
            self.count=json["count"]
        if "taskname" in json.keys():
            self.taskname=json["taskname"]
        else:
            self.taskname=json["streamAddr"]

        self.jsonname=json["jsonname"]
        self.lock = threading.RLock()

    def run(self):
        while self.get_count()>0:
            print('run %s'%self.taskname,end='*')
            time.sleep(2)
            self.lock.acquire()
            self.count-=1
            self.lock.release()
        print('%s finish!! '%self.taskname)

    #获取任务的生存时间
    def get_count(self):
        self.lock.acquire()
        now_count=self.count
        self.lock.release()
        #削减count
        return now_count

    #停止任务
    def stop(self):
        self.lock.acquire()
        self.count=-1
        self.lock.release()
        #停止任务
        pass

#ocr任务线程池
class TaskPool:
    def __init__(self,sleep_time=0.5):
        self.pool=[]
        self.sleep_time=sleep_time
        task_msg=threading.Thread(target=self.remove_task)
        task_msg.daemon = True
        task_msg.start()

    #删除已经结束的任务
    def remove_task(self):
        while True:
            names=[]
            for task in self.pool:
                if task.get_count()==0:
                    task.stop()
                    self.pool.remove(task)
                else:
                    names.append(task.taskname)
            if len(names)>0:
                print(names)
            time.sleep(self.sleep_time)
            
    def append(self,ocrtask):
        if ocrtask.state==0:
            #终止任务
            self.stop(ocrtask)
        else:
            #启动任务
            ocrtask.start()
            self.pool.append(ocrtask)

    #终止任务
    def stop(self,ocrtask):
        for task in self.pool:
            if task.taskname==ocrtask.taskname:
                task.stop()
                self.pool.remove(task)

#socket客户端
class MySocket(Thread):
    def __init__(self,config):
        super().__init__()
        # 1.创建套接字
        self.tcp_socket = socket(AF_INET,SOCK_STREAM)
        self.tcp_socket.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) #在客户端开启心跳维护
        # 2.准备连接服务器,建立连接
        self.serve_ip = config["serve_ip"]#当前"118.24.111,149"
        self.serve_port = config["serve_port"]  #端口当前7900
        self.sleep_time = config["sleep_time"]
        print("connect to : ",self.serve_ip,self.serve_port)
        self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        self.lock = threading.RLock()
        
        self.taskpool=TaskPool()

        task_msg=threading.Thread(target=self.thread_msg)
        task_msg.daemon = True
        task_msg.start()
            #定时发送信息
    
    #通信线程-用于接收服务器的指令
    def run(self):
        while True:
            a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1k
            a=a.decode('utf-8')
            print("------主线程-----",a)
            jdata=json.loads(a)
            #jdata={"streamAddr":"rtmp://adasdasdxcvsdfj.sdfdsfsd","state":1,"count":5,"taskname":"aaa","jsonname":"a.json"}
            task=OCRTask(jdata)
            self.taskpool.append(task)
            
            json_data={  
                "type":"OCR_STATE_ACK",
                "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致
                "streamAddr": jdata["streamAddr"]
            }
            #print( json_data)
            message = json.dumps(json_data)
            data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
            data=hex_to_bytes(data)
            self.send_msg(data)

    #检测socket连接是否断开
    def check_connection(self):
        try:
            self.tcp_socket.getpeername()
            return True
        except socket.error:
            return False
    
    #定时发送心跳信息--子线程
    def thread_msg(self):
        while True:
            #message=input('You can say:')
            #json标注的模板
            json_data={  
                "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致
                "type":"HEARBEAT"
            }
            #print( json_data)
            message = json.dumps(json_data)
            data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
            data=hex_to_bytes(data)

            #进行定时发送
            self.send_msg(data)
            # self.lock.acquire()
            # self.tcp_socket.send(data)#将发送的数据进行编码
            # self.lock.release()
            try:
                #进行定时发送
                self.lock.acquire()
                a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1k
                self.lock.release()
                time.sleep(self.sleep_time)
                print("ack: ",a.decode('utf-8'))
            except ConnectionRefusedError:
                print('服务器拒绝本次连接!!!!!')
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            except TimeoutError:
                print('连接超时!!!!!')
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            except OSError:
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
                print('智能终端无网络连接!!!!!')

    #发送信息
    def send_msg(self,msg):
        if self.check_connection() is False:
            print('服务器掉线!!!!!')
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        try:
            #进行定时发送
            self.lock.acquire()
            self.tcp_socket.send(msg)
            self.lock.release()
        except ConnectionRefusedError:
            print('服务器拒绝本次连接!!!!!')
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        except TimeoutError:
            print('连接超时!!!!!')
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
        except OSError:
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            print('智能终端无网络连接!!!!!')

if "__main__"==__name__:
    #进行定时通信测试
    config=read_yaml("config.yaml")
    socket_client=MySocket(config)
    socket_client.start()

使用效果如下所示,这里基于socket调试工具作为客户端

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1461153.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Hypervisor是什么

Hypervisor 通常指的是虚拟机监视器(VirtualMachine Monitor),它是一种软件或硬件,可以在物理服务器上创建和管理多个虚拟机(VirtualMachine)。 Hypervisor 提供了一个抽象层,将物理服务器的资源…

基于SpringBoot的农产品智慧物流系统

文章目录 项目介绍主要功能截图:部分代码展示设计总结项目获取方式 🍅 作者主页:超级无敌暴龙战士塔塔开 🍅 简介:Java领域优质创作者🏆、 简历模板、学习资料、面试题库【关注我,都给你】 &…

Redis之缓存击穿问题解决方案

文章目录 一、书接上文二、介绍三、解决方案1. 单例双检锁2. 缓存预热和定时任务 一、书接上文 Redis之缓存雪崩问题解决方案 二、介绍 缓存击穿就是大量并发访问同一个热点数据,一旦这个热点数据缓存失效,则请求压力都来到数据库。 三、解决方案 1…

​LeetCode解法汇总106. 从中序与后序遍历序列构造二叉树

目录链接: 力扣编程题-解法汇总_分享记录-CSDN博客 GitHub同步刷题项目: https://github.com/September26/java-algorithms 原题链接: 力扣(LeetCode)官网 - 全球极客挚爱的技术成长平台 描述: 给定两个…

惠尔顿 网络安全审计系统 任意文件读取漏洞复现

0x01 产品简介 惠尔顿网络安全审计产品致力于满足军工四证、军工保密室建设、国家涉密网络建设的审计要求,规范网络行为,满足国家的规范;支持1-3线路的internet接入、1-3对网桥;含强大的上网行为管理、审计、监控模块&#xff1b…

Dockerfile文件中只指定挂载点会发生什么?

当你在VOLUME指令中只指定容器内的路径(挂载点)而不指定宿主机的目录时,Docker会为该挂载点自动生成一个匿名卷。这个匿名卷存储在宿主机的某个位置,但这个具体位置是由Docker自动管理的,用户通常不需要关心这个存储位…

哪些工具可以改变手机电脑网络IP地址?

在互联网时代,网络已经成为了我们日常生活中不可或缺的一部分。然而,随着网络的普及和技术的不断发展,网络安全问题也日益凸显。为了保护个人隐私和信息安全,我们需要了解一些工具可以改变手机电脑网络IP地址的知识。 首先&#x…

谈谈我对低代码开发平台的理解

目录 一、前言 二、低代码谜团 三、低代码能解决哪些问题 四、好用的低代码平台 五、总结 一、前言 低代码“灵活、快速、低门槛”的标签,为其带来了诸多争议。在低代码平台上是否只能搭建极其简单、无亮点的小功能?低代码带来的“全民程序员”化是…

Java线程池知识点总结

1、线程池优势: 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗…

Unity MVC开发模式与开发流程详解

在Unity游戏开发中,采用MVC(Model-View-Controller)模式是一种非常常见的设计模式。MVC模式将应用程序分为三个部分:模型(Model)、视图(View)和控制器(Controller&#x…

Linux网络编程(三-UDP协议)

目录 一、UDP概述 二、UDP的首部格式 三、UDP缓冲区 四、基于UDP的应用层协议 五、常见问题 一、UDP概述 UDP(User Datagram Protocol,用户数据协议报)是传输层协议,提供不可靠服务,其特点包括: 无连接:知道对端…

手把手教你如何搭建性能测试环境

前言 在进行性能则试前,需要完成性能测试的搭建工作,一般包括硬件环境、软件环境及网络环境,可以要求配置和开发工程师协助完成,但是作为一个优秀性能测试工程师,这也是你的必备技能之一。 性能测试环境与功能测试环…

Apipost多host服务配置如何使用

最近Apipost新增同环境下多host服务的配置功能,本篇文章带来该功能的使用场景及使用方法。 配置方法: 点击右上角眼睛标识进入环境管理 点击添加服务,输入服务名和URL 配置完成后需要在接口目录中选择该目录下需要使用的host服务&#xff0…

w29pikachu-ssrf实例

SSRF简介 SSRF是服务器端请求伪造 危害: 1.可以对服务器所在内网、本地进行端口扫描,获取一些服务的信息等 2.目标网站本地敏感数据的读取 3.内外网主机应用程序漏洞的利用 4.内外网web站点漏洞的利用 ssrf常用的相关协议: gopher://: 发…

IDEA实现ssh远程连接本地Linux服务器

文章目录 1. 检查Linux SSH服务2. 本地连接测试3. Linux 安装Cpolar4. 创建远程连接公网地址5. 公网远程连接测试6. 固定连接公网地址7. 固定地址连接测试 本文主要介绍如何在IDEA中设置远程连接服务器开发环境,并结合Cpolar内网穿透工具实现无公网远程连接&#xf…

2024.2.21 C++QT 作业

思维导图 练习题 1>使用手动连接,将登录框中的取消按钮使用qt4版本的连接到自定义的槽函数中,在自定义的槽函数中调用关闭函数,将登录按钮使用qt5版本的连接到自定义的槽函数中,在槽函数中判断ui界面上输入的账号是否为"…

C语言----字符数组指针

1.char arr[] {a,b,c,d,e,f}; sizeof分析类型就可以计算所占的内存空间的大小; (1)printf("%d\n", sizeof(arr)); 数组名单独放进里面,计算整个数组大小,所以是6字节; (2&#xff…

微信小程序 ---- 慕尚花坊 项目初始化

目录 项目介绍 01. 项目概述 02. 项目演示 03. 项目技术栈 04. 接口文档 申请开发权限 项目初始化 01. 创建项目与项目初始化 02. 自定义构建 npm 集成Sass 03. 集成项目页面文件 04. VsCode 开发小程序项目 项目介绍 01. 项目概述 [慕尚花坊] 是一款 同城鲜花订购…

JAVA设计模式结构型模式

一、前言 java设计模式主要分为创建型模式,结构型模式和行为型模式。上一篇主要总结了行为型设计模式,本章总结,结构型模式。像创建型模式就不写了,比较简单。大概知道是工厂模式和建造者模式,原型模式就行&#xff0…

相机图像质量研究(39)常见问题总结:编解码对成像的影响--运动模糊

系列文章目录 相机图像质量研究(1)Camera成像流程介绍 相机图像质量研究(2)ISP专用平台调优介绍 相机图像质量研究(3)图像质量测试介绍 相机图像质量研究(4)常见问题总结:光学结构对成像的影响--焦距 相机图像质量研究(5)常见问题总结:光学结构对成…