使用esp32+micropython+microdot搭建web(http+websocket)服务器(超详细)第三部分

news2025/1/8 21:31:10

使用esp32+micropython+microdot搭建web(http+websocket)服务器(超详细)第三部分

  • microdot文档速查 什么是Microdot?Microdot是一个可以在micropython中搭建物联网web服务器的框架
  • micropyton文档api速查 Quick reference for the ESP32

实现websocket服务器

上一章的http服务有个缺点,就是每次都要发送一个请求,

如果我们让这个请求保持长连接,这样效率就提高了,

于是websocket就是为此而生的

演示视频

准备工作

和上一章一样

  • esp32开发板一个
  • esp32拓展板一个
  • 舵机四个
  • 简单组装
  • 注意接口(线头颜色千万别插反了!!!)

esp32 websocket服务搭建

在MicroPython设备 新建目录结构

  • lib 存放一些库文件

    • microdot.py (microdot-main\src中)前面章节有image.png
    • microdot_websocket.py (microdot-main\src中)前面章节有
  • common

    • connect_wifi.py (连接热点)前面章节有
    • servo.py (操作舵机移动角度)前面章节有
  • public 存放网页内容

    • index.html 网页
  • main.py (程序主入口)

microdot_websocket.py

import binascii
import hashlib
from lib.microdot import Response
​
​
class WebSocket:
    CONT = 0
    TEXT = 1
    BINARY = 2
    CLOSE = 8
    PING = 9
    PONG = 10
​
    def __init__(self, request):
        self.request = request
        self.closed = False
​
    def handshake(self):
        response = self._handshake_response()
        self.request.sock.send(b'HTTP/1.1 101 Switching Protocols\r\n')
        self.request.sock.send(b'Upgrade: websocket\r\n')
        self.request.sock.send(b'Connection: Upgrade\r\n')
        self.request.sock.send(
            b'Sec-WebSocket-Accept: ' + response + b'\r\n\r\n')
​
    def receive(self):
        while True:
            opcode, payload = self._read_frame()
            send_opcode, data = self._process_websocket_frame(opcode, payload)
            if send_opcode:  # pragma: no cover
                self.send(data, send_opcode)
            elif data:  # pragma: no branch
                return data
​
    def send(self, data, opcode=None):
        frame = self._encode_websocket_frame(
            opcode or (self.TEXT if isinstance(data, str) else self.BINARY),
            data)
        self.request.sock.send(frame)
​
    def close(self):
        if not self.closed:  # pragma: no cover
            self.closed = True
            self.send(b'', self.CLOSE)
​
    def _handshake_response(self):
        connection = False
        upgrade = False
        websocket_key = None
        for header, value in self.request.headers.items():
            h = header.lower()
            if h == 'connection':
                connection = True
                if 'upgrade' not in value.lower():
                    return self.request.app.abort(400)
            elif h == 'upgrade':
                upgrade = True
                if not value.lower() == 'websocket':
                    return self.request.app.abort(400)
            elif h == 'sec-websocket-key':
                websocket_key = value
        if not connection or not upgrade or not websocket_key:
            return self.request.app.abort(400)
        d = hashlib.sha1(websocket_key.encode())
        d.update(b'258EAFA5-E914-47DA-95CA-C5AB0DC85B11')
        return binascii.b2a_base64(d.digest())[:-1]
​
    @classmethod
    def _parse_frame_header(cls, header):
        fin = header[0] & 0x80
        opcode = header[0] & 0x0f
        if fin == 0 or opcode == cls.CONT:  # pragma: no cover
            raise OSError(32, 'Continuation frames not supported')
        has_mask = header[1] & 0x80
        length = header[1] & 0x7f
        if length == 126:
            length = -2
        elif length == 127:
            length = -8
        return fin, opcode, has_mask, length
​
    def _process_websocket_frame(self, opcode, payload):
        if opcode == self.TEXT:
            payload = payload.decode()
        elif opcode == self.BINARY:
            pass
        elif opcode == self.CLOSE:
            raise OSError(32, 'Websocket connection closed')
        elif opcode == self.PING:
            return self.PONG, payload
        elif opcode == self.PONG:  # pragma: no branch
            return None, None
        return None, payload
​
    @classmethod
    def _encode_websocket_frame(cls, opcode, payload):
        frame = bytearray()
        frame.append(0x80 | opcode)
        if opcode == cls.TEXT:
            payload = payload.encode()
        if len(payload) < 126:
            frame.append(len(payload))
        elif len(payload) < (1 << 16):
            frame.append(126)
            frame.extend(len(payload).to_bytes(2, 'big'))
        else:
            frame.append(127)
            frame.extend(len(payload).to_bytes(8, 'big'))
        frame.extend(payload)
        return frame
​
    def _read_frame(self):
        header = self.request.sock.recv(2)
        if len(header) != 2:  # pragma: no cover
            raise OSError(32, 'Websocket connection closed')
        fin, opcode, has_mask, length = self._parse_frame_header(header)
        if length < 0:
            length = self.request.sock.recv(-length)
            length = int.from_bytes(length, 'big')
        if has_mask:  # pragma: no cover
            mask = self.request.sock.recv(4)
        payload = self.request.sock.recv(length)
        if has_mask:  # pragma: no cover
            payload = bytes(x ^ mask[i % 4] for i, x in enumerate(payload))
        return opcode, payload
​
​
def websocket_upgrade(request):
    """Upgrade a request handler to a websocket connection.
​
    This function can be called directly inside a route function to process a
    WebSocket upgrade handshake, for example after the user's credentials are
    verified. The function returns the websocket object::
​
        @app.route('/echo')
        def echo(request):
            if not authenticate_user(request):
                abort(401)
            ws = websocket_upgrade(request)
            while True:
                message = ws.receive()
                ws.send(message)
    """
    ws = WebSocket(request)
    ws.handshake()
​
    @request.after_request
    def after_request(request, response):
        return Response.already_handled
​
    return ws
​
​
def with_websocket(f):
    """Decorator to make a route a WebSocket endpoint.
​
    This decorator is used to define a route that accepts websocket
    connections. The route then receives a websocket object as a second
    argument that it can use to send and receive messages::
​
        @app.route('/echo')
        @with_websocket
        def echo(request, ws):
            while True:
                message = ws.receive()
                ws.send(message)
    """
    def wrapper(request, *args, **kwargs):
        ws = websocket_upgrade(request)
        try:
            f(request, ws, *args, **kwargs)
            ws.close()  # pragma: no cover
        except OSError as exc:
            if exc.errno not in [32, 54, 104]:  # pragma: no cover
                raise
        return ''
    return wrapper
​

main.py

# 导入Microdot
from lib.microdot import Microdot,send_file,Request
from lib.microdot_websocket import with_websocket
# 连接wifi
from common.connect_wifi import do_connect
from common.servo import Servo
import time
# 导入引脚
from machine import Pin
# 不加报错
Request.socket_read_timeout = None
​
# 对应四个电机 从左上角顺时针排序
s1 = Servo(Pin(15))
s2 = Servo(Pin(17))
s3 = Servo(Pin(25))
s4 = Servo(Pin(27))
# 复位
s1.write_angle(0)
s2.write_angle(180-0)
s3.write_angle(180-0)
s4.write_angle(0)
# esp32 引脚2是一颗自带的 led的灯
light = Pin(2,Pin.OUT)
​
# 开始连接wifi
do_connect()
# 实例化这个类
app = Microdot()
​
# get请求返回一个网页
@app.route('/')
def index(request):
    return send_file('public/index.html')
​
# 使用@with_websocket生成websocket服务
@app.route('/move')
@with_websocket
def echo(request, ws):
    while True:
        # 拿到客户端发送的数据
        data = ws.receive()
        print(data,type(data))
        s1.write_angle(int(data))
        s2.write_angle(180-int(data))
        s3.write_angle(180-int(data))
        s4.write_angle(int(data))    
        ws.send("移动:"+(data))
        
​
# 启动后指示灯闪烁
def blink():
    for i in range(5):
        light.value(int(not light.value()))
        time.sleep(1)
blink()
​
# 端口号为5000
app.run(host='0.0.0.0', port=5000, debug=False, ssl=None)
​

index.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>滑动</title>
</head>
<body>
    <h1>滑动四个电机</h1>
    <input type="range" min="0" max="180" value="1" step="1" name="" id="">
    <div class="content">
​
    </div>
    <script>
​
        let ipt = document.querySelector("input")
        let content = document.querySelector(".content")
​
        // websocket连接
        const socket = new WebSocket('ws://' + location.host + '/move');
        // 防抖函数 防止服务器接受大量请求
        const throttle = (func,interval)=>{
            let last = 0;
            return function(){
                let args = arguments
                let now = Date.now();
                if(now - last > interval){
                    func.apply(this,args)
                    last = now
                }
            }
        }
        ipt.addEventListener("input",throttle((e)=>{
            // 拿到滑动条的 数据
            let value = ipt.value
            // 给服务器发送数据
            socket.send(value)
        },100))
        const setContent = (text,color)=>{
            content.innerHTML = `<span style="color: ${color}">${text}</span><br>`
        }
        socket.addEventListener('message', ev => {
            setContent('收到消息' + ev.data, 'blue');
        });
        socket.addEventListener('close', ev => {
            setContent('连接关闭' + ev.data, 'blue');
        });
    </script>
</body>
</html>
​
​

开发完成

演示视频

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/671170.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

优雅组合,高效交互:Gradio Combining Interfaces模块解析

❤️觉得内容不错的话&#xff0c;欢迎点赞收藏加关注&#x1f60a;&#x1f60a;&#x1f60a;&#xff0c;后续会继续输入更多优质内容❤️ &#x1f449;有问题欢迎大家加关注私戳或者评论&#xff08;包括但不限于NLP算法相关&#xff0c;linux学习相关&#xff0c;读研读博…

Jdk9版本以上如何查看java对象所占内存大小

想要查看java对象在运行时的实际占用内存大小。网上大部分方法都是雷同&#xff0c;都是出自 查看java对象所占内存大小-云社区-华为云 这里面的提供的4种方法仅仅适合jdk8及以下版本。 如果项目使用的是dk11、jdk18等高级版本就无法使用&#xff0c;上面帖子中第一种和第二…

蓝奥声核心技术—— 用电异常监控技术

1.技术背景 用电异常监控技术主要通过电能监测节点作为目标监测节点对其关联绑定的用电负载对象的异常状态进行快速响应与准确监控&#xff0c;以解决用电监控的安全性问题。该项技术涉及无线物联网边缘智能与测控的技术领域&#xff0c;主要涉及面向电能监测及安全监控的边缘…

编译原理笔记12:自上而下语法分析(2)非递归预测分析器、FIRST FOLLOW 集合计算

目录 使用预测分析器的自上而下分析格局 使用预测分析器进行分析的实例FIRST、FOLLOW 集合的构造FIRST 集合FOLLOW 集合 使用预测分析器的自上而下分析 使用预测分析器进行的自上而下分析是非递归的。预测分析器模型其实是一种 PDA&#xff08;下推自动机&#xff0c;Pushdown…

uni-number-box【数字输入框组件】,change事件 自定义传参

关键代码&#xff1a; change"(value)>{twobindChange(item,value)}" <uni-number-box :min"1" :value"item.num" change"(value)>{twobindChange(item,value)}" /><script>//数量选择twobindChange(item, value) …

易语言读写富士通MB89R118卡 NXP15693标签源码

本示例发卡器介绍&#xff1a;Android Linux RFID读写器NFC发卡器WEB可编程NDEF文本/智能海报/-淘宝网 (taobao.com) DLL命令定义表 .版本 2 .DLL命令 蜂鸣器嘀一声, 字节型, "OUR_MIFARE.dll", "pcdbeep" .参数 xms, 整数型 .DLL命令 读取设备编号…

slam中用到的Pangolin安装问题

sudo apt-get install libglew-dev sudo apt-get install cmake sudo apt-get install libboost-dev libboost-thread-dev libboost-filesystem-dev cd ~/orbslam_ws/src$ git clone https://github.com/zzx2GH/Pangolin.git把Pangolin/src/CMakeLists.txt注释掉以下…

村田将电动汽车静噪对策用树脂成型表面贴装型MLCC商品化

株式会社村田制作所已开发出电动汽车静噪对策用树脂成型表面贴装型多层陶瓷电容器“EVA系列”。该产品虽然体积小、厚度薄(12.7 x 6.0 x 3.7 mm)&#xff0c;但是仍然确保了高电压负载所需的爬电距离(10 mm)&#xff0c;并且支持国际标准“IEC60384-14”中的Y2级。 ​ 这是一款…

【高性能计算】无监督学习之层次聚类实验

【高性能计算】基于K均值的划分聚类实验 实验目的实验内容实验步骤1、层次聚类算法1.1 层次聚类算法的基本思想1.2 层次聚类的聚类过程 2、使用Python语言编写层次聚类的源程序代码并分析其分类原理2.1 层次聚类 Python代码2.1.1 计算欧式距离函数euler_distance2.1.2 层次聚类…

每一次Http请求,Java线程是如何处理的?

每一次Http请求&#xff0c;Java线程是如何处理的&#xff1f; 文章目录 每一次Http请求&#xff0c;Java线程是如何处理的&#xff1f;前言一、Http请求处理二、两种服务器模型及处理方式1、两种服务&#xff1a;2.更好的处理方式 总结 前言 当我们写好一个项目时&#xff0c…

【go】Excelize处理excel表

文章目录 1 Excelize介绍2 相关需求与实现2.1 数据的excel文件导出2.2 带数据校验的excel文件导出 1 Excelize介绍 Excelize 是 Go 语言编写的用于操作 Office Excel 文档基础库。官方文档&#xff1a;https://xuri.me/excelize/zh-hans/ 引入方法 go get "github.com/…

【MYSQL篇】一文了解mysql事务

文章目录 MYSQL事务事务的四大特性1、原子性2、一致性3、隔离性4、持久性 事务的并发1、脏读2、不可重复读3、幻读 隔离级别Read UncommittedRead CommittedRepeatable ReadSerializable MySQL Innodb 对隔离级别的支持实现方案LBCCMVCC 总结 关于 MYSQL 事务在面试的时候&…

软件系统三基座之二:组织架构

软件系统三基座包含&#xff1a;权限管理、组织架构、用户管理。 一、组织的来源 组织是由若干个人或群体所组成的、有共同目标和一定边界的社会实体。组织是为了提升劳动效率而产生的。 从一个日常案例&#xff0c;讲讲组织是如何提升劳动效率的。 唯美食与美景不可辜负&#…

Java 基础进阶篇(十八):正则表达式匹配规则和应用

文章目录 一、正则表达式概述二、正则表达式的匹配规则三、正则表达式在方法中的应用3.1 校验手机号、邮箱和座机电话号码3.2 字符串的内容替换和分割 四、编程题目4.1 表示数值的字符串4.2 非严格递增连续数字序列 一、正则表达式概述 正则表达式是对字符串&#xff08;包括普…

I/O多路转接之select

初识select 系统提供select函数来实现多路复用输入/输出模型&#xff1a; select系统调用是用来让我们的程序监视多个文件描述符的状态变化的;程序会停在select这里等待&#xff0c;直到被监视的文件描述符有一个或多个发生了状态改变; 文章目录 初识select一&#xff1a;Sel…

Goby 漏洞发布|Avaya Aura Device Services r软件 PhoneBackup 任意文件上传漏洞

漏洞名称&#xff1a;Avaya Aura Device Services r软件 PhoneBackup 任意文件上传漏洞 English Name&#xff1a;Avaya Aura Device Services PhoneBackup File Upload Vulnerability CVSS core: 9.0 影响资产数&#xff1a;565 漏洞描述&#xff1a; Avaya Aura Device …

【linux 新机配置】

1&#xff0c;安装 node https://juejin.cn/post/7102790458132135944 2 linux 安装 Yarn https://juejin.cn/post/7102793669425496077 3 安装Nginx 安装 dnf install nginx 启动 systemctl start nginx systemctl status nginx systemctl enable nginx 配置&#xff0…

python机器人编程——差速AGV机器、基于视觉和预测控制的循迹、自动行驶(下篇)

目录 一、前言二、基于轨迹与路面重心偏离度误差的预测自动差速小车循迹控制策略三、轨迹图像的处理要点四、本篇部分核心控制策略python代码&#xff1a;五、结论 一、前言 基于最近的测试&#xff0c;得到了一种粗略控制的算法&#xff0c;其控制效果适合单线路和急转弯的情…

LLM探索:GPT类模型的几个常用参数 Top-k, Top-p, Temperature

Top-k抽样模型从最可能的"k"个选项中随机选择一个如果k10&#xff0c;模型将从最可能的10个单词中选择一个Top-p抽样模型从累计概率大于或等于“p”的最小集合中随机选择一个如果p0.9&#xff0c;选择的单词集将是概率累计到0.9的那部分Temperature控制生成文本随机性…

对比之前的组件优化说明React.memo的作用

我们之前写的react PureComponent讲述了 PureComponent 组件优化特性的强大功能 还有就是 shouldComponentUpdate 生命周期的一个解决方案 那么呢 今天我们要说另一个 也是类似于组件性能优化的新特性 打开我们的react项目 在src下的components创建一个组件 例如 我这里叫 dom…