【Python】基于ML307A的位置读取系统(通过UART串口实现AT指令和flask来实现自动化读取并推流)

news2024/10/6 8:30:39

【Python】基于ML307A的位置读取系统(通过UART串口实现AT指令和flask来实现自动化读取并推流)

Python下的串口serial库

串行口的属性:
name:设备名字
portstr:已废弃,用name代替
port:读或者写端口
baudrate:波特率
bytesize:字节大小
parity:校验位
stopbits:停止位
timeout:读超时设置
writeTimeout:写超时
xonxoff:软件流控
rtscts:硬件流控
dsrdtr:硬件流控
interCharTimeout:字符间隔超时

属性的使用方法:
ser=serial.Serial(“/dev/ttyAMA0”,9600,timeout=0.5)
ser.open()

print ser.name
print ser.port
print ser.baudrate#波特率
print ser.bytesize#字节大小
print ser.parity#校验位N-无校验,E-偶校验,O-奇校验
print ser.stopbits#停止位
print ser.timeout#读超时设置
print ser.writeTimeout#写超时
print ser.xonxoff#软件流控
print ser.rtscts#硬件流控
print ser.dsrdtr#硬件流控
print ser.interCharTimeout#字符间隔超时

ser.close()

串口接收要用到多线程库(类似单片机中的中断):

def thread_com_receive():
    global ML307A_RX
    while True:
        try:
            rx_buf = ''
            rx_buf = COMM.read()  # 转化为整型数字
            if rx_buf != b'':
                time.sleep(0.01)
                rx_buf = rx_buf + COMM.read_all()
                print("串口收到消息:", rx_buf)
                ML307A_RX=str(rx_buf).split("\\r\\n")[1]
                print(ML307A_RX)
            time.sleep(0.01)
        except:
            pass
    pass

初始化串口:

# 打开串口
def serial_open(n=0):
    global COMM
    serial_port = set_com_port(n)
    COMM = serial.Serial(serial_port, 115200, timeout=0.01)
    if COMM.isOpen():
        print(serial_port, "open success")
        return 0
    else:
        print("open failed")
        return 255
def init_com():
    global ML307A_RX
    get_com_list()
    len = port_list.__len__()
    device = port_list[0].device
    print(len, device)
    serial_open()
    thread1 = threading.Thread(target=thread_com_receive)
    thread1.start()
    COMM.write("AT\r\n".encode("UTF-8"))
    time.sleep(0.1)
    if ML307A_RX=="OK":
        com_jugg()

AT的命令格式

AT指令格式:AT指令都以”AT”开头,以0x0D 0x0A(即\r\n,换行回车符)结束,模块运行后,串口默认的设置为:8位数据位、1位停止位、无奇偶校验位、硬件流控制(CTS/RTS).
注意为了发送AT命令,最后还要加上0x0D 0x0A(即\r\n,换行回车符)这是串口终端要求.
有一些命令后面可以加额外信息来.如电话号码

每个AT命令执行后,通常DCE都给状态值,用于判断命令执行的结果.

AT返回状态包括三种情况 OK,ERROR,和命令相关的错误原因字符串.返回状态前后都有一个字符.
如 OK 表示AT命令执行成功.
ERROR 表示AT命令执行失败
NO DIAL TONE 只出现在ATD命令返回状态中,表示没有拨号音,这类返回状态要查命令手册

还有一些命令本身是要向DCE查询数据,数据返回时,一般是+打头命令.返回格式
+命令:命令结果
如:AT+CMGR=8 (获取第8条信息)
返回 +CMGR: “REC UNREAD”,“+8613508485560”,“01/07/16,15:37:28+32”,Once more

AT指令串口通信代码如下:

# -*- coding: utf-8 -*-
import serial
import serial.tools.list_ports
import time
import threading


com_rx_buf = ''				# 接收缓冲区
com_tx_buf = ''				# 发送缓冲区
COMM = serial.Serial()		# 定义串口对象
port_list: list				# 可用串口列表
port_select: list			# 选择好的串口

ML307A_RX=''

# 无串口返回0,
# 返回可用的串口列表
def get_com_list():
    global port_list
    # a = serial.tools.list_ports.comports()
    # print(a)
    # port_list = list(serial.tools.list_ports.comports())
    port_list = serial.tools.list_ports.comports()
    return port_list


def set_com_port(n=0):
    global port_list
    global port_select
    port_select = port_list[n]
    return port_select.device


# 打开串口
def serial_open(n=0):
    global COMM
    serial_port = set_com_port(n)
    COMM = serial.Serial(serial_port, 115200, timeout=0.01)
    if COMM.isOpen():
        print(serial_port, "open success")
        return 0
    else:
        print("open failed")
        return 255


# 关闭串口
def serial_close():
    global COMM
    COMM.close()
    print(COMM.name + "closed.")


def set_com_rx_buf(buf=''):
    global com_rx_buf
    com_rx_buf = buf


def set_com_tx_buf(buf=''):
    global com_tx_buf
    com_tx_buf = buf


def get_com_rx_buf():
    global com_rx_buf
    return com_rx_buf


def get_com_tx_buf():
    global com_tx_buf
    return com_tx_buf


def thread_com_receive():
    global ML307A_RX
    while True:
        try:
            rx_buf = ''
            rx_buf = COMM.read()  # 转化为整型数字
            if rx_buf != b'':
                time.sleep(0.01)
                rx_buf = rx_buf + COMM.read_all()
                print("串口收到消息:", rx_buf)
                ML307A_RX=str(rx_buf).split("\\r\\n")[1]
                print(ML307A_RX)
            time.sleep(0.01)
        except:
            pass
    pass


# def serial_encode(addr=0, command=0, param1=0, param0=0):
#     buf = [addr, command, param1, param0, 0, 0, 0, 0]
#     print(buf)
#     return buf


def serial_send_command(addr=0, command=0, param1=0, param0=0, data3=0, data2=0, data1=0, data0=0):
    buf = [addr, command, param1, param0, data3, data2, data1, data0]
    COMM.write(buf)
    pass


def serial_init():
    buf = "AT+CG\r\n"
    COMM.write(buf)
    time.sleep(0.05)
    buf = COMM.read_all()
    if buf != "OK\r\n":
        return 254  # 进入调试模式失败

    buf = "AT+CAN_MODE=0\r\n"
    COMM.write(buf)
    time.sleep(0.05)
    buf = COMM.read_all()
    if buf != "OK\r\n":
        return 253          # 进入正常模式失败,模块处于1状态,即环回模式中

    buf = "AT+CAN_BAUD=500000\r\n"
    COMM.write(buf)
    time.sleep(0.05)
    buf = COMM.read_all()
    if buf != "OK\r\n":
        return 253          # 波特率设置失败

    buf = "AT+FRAMEFORMAT=1,0,\r\n"
    COMM.write(buf)
    time.sleep(0.05)
    buf = COMM.read_all()
    if buf != "OK\r\n":
        return 253          # 波特率设置失败

    buf = "AT+ET\r\n"       # 进入透传模式
    COMM.write(buf)
    time.sleep(0.05)
    buf = COMM.read_all()
    if buf != "OK\r\n":
        return 255  # 不是CAN模块

def com_jugg():
    global ML307A_RX
    while True:
        COMM.write("""AT+MUESTATS="radio"\r\n""".encode("UTF-8"))
        time.sleep(5)
        print(ML307A_RX)
    
def init_com():
    global ML307A_RX
    get_com_list()
    len = port_list.__len__()
    device = port_list[0].device
    print(len, device)
    serial_open()
    thread1 = threading.Thread(target=thread_com_receive)
    thread1.start()
    COMM.write("AT\r\n".encode("UTF-8"))
    time.sleep(0.1)
    if ML307A_RX=="OK":
        com_jugg()
    
    
if __name__ == '__main__':
    init_com()
 
   

ML307A的部分AT指令说明

在这里插入图片描述
发送AT+MUESTATS=“radio”\r\n即可查询位置信息

# -*- coding: utf-8 -*-
"""
Created on Mon Apr 10 18:21:52 2023

@author: ZHOU
"""

import time
from flask import Flask, render_template, request
import threading
import socket
import serial
import serial.tools.list_ports

local_post = 1212
local_ip = None

for i in range(12):
    try:
        s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
        s.connect(("8.8.8.8",80))
        local_ip = str(s.getsockname()[0])
        s.close()
        print("Network Enable")
        network_flag = 1
        break
    except:        
        print("Network Error...")
        network_flag = 0
        time.sleep(5)

app = Flask(__name__)

com_rx_buf = ''				# 接收缓冲区
com_tx_buf = ''				# 发送缓冲区
COMM = serial.Serial()		# 定义串口对象
port_list: list				# 可用串口列表
port_select: list			# 选择好的串口

ML307A_RX=''

location_info = '请输入用户名和密码并成功登陆后,刷新网页查看定位信息'
location_info_flag=0
# 无串口返回0,
# 返回可用的串口列表
def get_com_list():
    global port_list
    # a = serial.tools.list_ports.comports()
    # print(a)
    # port_list = list(serial.tools.list_ports.comports())
    port_list = serial.tools.list_ports.comports()
    return port_list


def set_com_port(n=0):
    global port_list
    global port_select
    port_select = port_list[n]
    return port_select.device


# 打开串口
def serial_open(n=0):
    global COMM
    serial_port = set_com_port(n)
    COMM = serial.Serial(serial_port, 115200, timeout=0.01)
    if COMM.isOpen():
        print(serial_port, "open success")
        return 0
    else:
        print("open failed")
        return 255


# 关闭串口
def serial_close():
    global COMM
    COMM.close()
    print(COMM.name + "closed.")


def set_com_rx_buf(buf=''):
    global com_rx_buf
    com_rx_buf = buf


def set_com_tx_buf(buf=''):
    global com_tx_buf
    com_tx_buf = buf


def get_com_rx_buf():
    global com_rx_buf
    return com_rx_buf


def get_com_tx_buf():
    global com_tx_buf
    return com_tx_buf


def thread_com_receive():
    global ML307A_RX
    while True:
        try:
            rx_buf = ''
            rx_buf = COMM.read()  # 转化为整型数字
            if rx_buf != b'':
                time.sleep(0.01)
                rx_buf = rx_buf + COMM.read_all()
                print("串口收到消息:", rx_buf)
                ML307A_RX=str(rx_buf).split("\\r\\n")[1]
            time.sleep(0.01)
        except:
            pass
    pass


# def serial_encode(addr=0, command=0, param1=0, param0=0):
#     buf = [addr, command, param1, param0, 0, 0, 0, 0]
#     print(buf)
#     return buf


def serial_send_command(addr=0, command=0, param1=0, param0=0, data3=0, data2=0, data1=0, data0=0):
    buf = [addr, command, param1, param0, data3, data2, data1, data0]
    COMM.write(buf)
    pass


def serial_init():
    buf = "AT+CG\r\n"
    COMM.write(buf)
    time.sleep(0.05)
    buf = COMM.read_all()
    if buf != "OK\r\n":
        return 254  # 进入调试模式失败

    buf = "AT+CAN_MODE=0\r\n"
    COMM.write(buf)
    time.sleep(0.05)
    buf = COMM.read_all()
    if buf != "OK\r\n":
        return 253          # 进入正常模式失败,模块处于1状态,即环回模式中

    buf = "AT+CAN_BAUD=500000\r\n"
    COMM.write(buf)
    time.sleep(0.05)
    buf = COMM.read_all()
    if buf != "OK\r\n":
        return 253          # 波特率设置失败

    buf = "AT+FRAMEFORMAT=1,0,\r\n"
    COMM.write(buf)
    time.sleep(0.05)
    buf = COMM.read_all()
    if buf != "OK\r\n":
        return 253          # 波特率设置失败

    buf = "AT+ET\r\n"       # 进入透传模式
    COMM.write(buf)
    time.sleep(0.05)
    buf = COMM.read_all()
    if buf != "OK\r\n":
        return 255  # 不是CAN模块

def com_jugg():
    global ML307A_RX
    global location_info
    global location_info_flag
    while True:
        COMM.write("""AT+MUESTATS="radio"\r\n""".encode("UTF-8"))
        time.sleep(0.5)
        if location_info_flag == 1:
            location_info = ML307A_RX.split("""+MUESTATS: "radio",""")[1]
            print(location_info)
        time.sleep(2.5)
        print(1)
    
def init_com():
    global ML307A_RX
    get_com_list()
    len = port_list.__len__()
    device = port_list[0].device
    print(len, device)
    serial_open(0)
    thread1 = threading.Thread(target=thread_com_receive)
    thread1.setDaemon(True)
    thread1.start()
    COMM.write("AT\r\n".encode("UTF-8"))
    time.sleep(0.1)
    print(321)
    if ML307A_RX=="OK":
        print(111)
        com_jugg()


#@app.route('/login/')   # 登录
#def login():
#    return render_template('login.html')

    
@app.route('/', methods=['GET', 'POST'])
def index():
    global location_info_flag
    global location_info
    command_str=''
    if request.method == 'POST':
        c0 = str(request.form.get('send'))
        c1 = str(request.form.get('user'))
        c2 = str(request.form.get('pass'))
        for i in [c0,c1,c2]:
            if i != "None":
                command_str = i
                break
    if command_str == "登陆":
        if str(request.form.get('user'))=="admin" and str(request.form.get('pass'))=="123":
            location_info_flag=1
            print("登陆成功")
        else:
            print("登陆失败")
        
    now_today = time.time()
    time_hour = time.localtime(now_today).tm_hour
    time_min = time.localtime(now_today).tm_min
    time_sec = time.localtime(now_today).tm_sec
    
    if time_hour < 10:
        time_hour = "0"+str(time_hour)
    if time_min < 10:
        time_min = "0"+str(time_min)
    local_time_str = str(time.localtime(now_today).tm_year)+"-"+str(time.localtime(now_today).tm_mon)+"-"+str(time.localtime(now_today).tm_mday)+" "+str(time_hour)+":"+str(time_min)+":"+str(time_sec)
                
    data = {
        '当前时间:': [local_time_str],
        '位置信息:': [location_info],
	        }
    return render_template('index.html',data_dict=data)


def app_run():
    app.run(host=local_ip, port=local_post)
    
if __name__ == "__main__":
    thread0 = threading.Thread(target=app_run)
    thread0.setDaemon(True)
    thread0.start()
    print(123)
    init_com()
![在这里插入图片描述](https://img-blog.csdnimg.cn/297662e9c9424b1292f623d605d7ee7f.jpeg)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/413113.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CUDA效率优化之CUDA Graph的使用

CUDA系列文章 文章目录CUDA系列文章前言一、优化方案简单顺序调用二、Overlapping三、使用CUDA Graph总结前言 GPU 架构的性能随着每一代的更新而不断提高。 现代 GPU 每个操作&#xff08;如kernel运行或内存复制&#xff09;所花费的时间现在以微秒为单位。 但是&#xff0c…

【C++跬步积累】——时间复杂度

&#x1f30f;博客主页&#xff1a;PH_modest的博客主页 &#x1f6a9;当前专栏&#xff1a;C跬步积累 &#x1f48c;其他专栏&#xff1a; &#x1f534; 每日一题 &#x1f7e1; 每日反刍 &#x1f7e2; 读书笔记 &#x1f308;座右铭&#xff1a;广积粮&#xff0c;缓称王&a…

马云回国,首谈ChatGPT

马云今天回国了&#xff0c;这是一个备受关注的消息。 作为中国最具代表性的企业家之一&#xff0c;马云在过去的二十多年里&#xff0c;带领阿里巴巴从一个小小的创业公司&#xff0c;发展成为全球最大的电商平台之一&#xff0c;同时也推动了中国互联网行业的发展。 他的回…

使用向量机(SVM)算法的推荐系统

系统整体结构 运行环境 包括Python环境、TensorFlow环境、安装模块、MySQL数据库。 Python环境 需要Python 3.6及以上配置&#xff0c;在Windows环境下推荐下载Anaconda完成Python所需的配置&#xff0c;下载地址为https://www.anaconda.com/&#xff0c;也可下载虚拟机在Li…

项目经理如何做好项目数据分析?

“周一到周五哪天更适合工作&#xff1f;” 周一困倦&#xff0c;周二认命&#xff0c;周三亢奋&#xff0c;周四疲惫&#xff0c;周五又像打了鸡血…… 项目经理&#xff1a;哪天都不适合工作&#xff0c;项目日报、周报、月报一个都少不了&#xff1b;不是在加班整理项目数…

Direct3D 12——灯光——镜面光照

反射的发生是根据一种名为菲涅耳效应(Fresnel effect,也译作菲涅 尔效应)的物理现象。当光线到达两种不同折射率(index of refraction )介质之间的界面时&#xff0c;一部分光将 被反射&#xff0c;而剩下的光则发生折射(refract),折射率是一种介质的物理性质&#xff0c;即 光…

Day939.如何小步安全地升级数据库框架 -系统重构实战

如何小步安全地升级数据库框架 Hi&#xff0c;我是阿昌&#xff0c;今天学习记录的是关于如何小步安全地升级数据库框架的内容。 当消息组件的数据存储都是采用 SQL 拼写的方式来操作&#xff0c;这样不便于后续的扩展及维护。除此之外&#xff0c;相比前面的其他重构&#x…

线性代数 --- Gram-Schmidt, 格拉姆-施密特正交化(上)

在求解最小二乘的问题时&#xff0c;已经介绍了类似于Gram-Schmidt的一些想法。在这里要继续介绍这些想法&#xff0c;那就是如何“改写”矩阵A中的列向量&#xff0c;使得最小二乘解的计算越来越简单&#xff0c;甚至可以直接写出答案。 标准正交基(Orthonormal Bases) 上一篇…

3.1 多维度随机变量及其分布

思维导图&#xff1a; 学习目标&#xff1a; 要学习二维随机变量及联合分布&#xff0c;我会按照以下步骤进行&#xff1a; 了解基本概念&#xff1a;首先要了解二维随机变量的概念&#xff0c;即同时包含两个随机变量的变量。还要了解二维随机变量的取值范围以及联合概率密…

【CSS】定位 ④ ( 绝对定位特点 | 相对定位不脱标示例 | 绝对定位脱标示例 )

文章目录一、绝对定位特点二、相对定位不脱标示例三、绝对定位脱标示例一、绝对定位特点 绝对定位 以 带有定位的 父级元素 为基准 , 通过 边偏移 移动位置 ; 如果 绝对定位 的元素 的 父级元素 没有定位 , 那么会 一直向上查找有定位的父级元素 , 直到浏览器 ; 绝对定位 元素…

【Linux】组管理和权限管理

目录1 Linux组的基本介绍2 文件/目录所有者2.1 查看文件的所有者2.2 修改文件所有者3 组的创建3.1 基本指令3.2 应用实例4 文件/目录 所在组4.1 查看文件/目录所在组4.2修改文件/目录所在的组5 其他组6 改变用户所在组6.1 改变用户所在的组6.2 应用实例7 权限介绍8 rwx权限详解…

多线程(八):常见锁策略

目录 前言 1. 乐观锁 VS 悲观锁 乐观锁 悲观锁 2. 轻量级锁 VS 重量级锁 轻量级锁 3. 自旋锁 VS 挂起等待锁 自旋锁 挂起等待锁 4. 读写锁 VS 互斥锁 5. 可重入锁 vs 不可重入锁 死锁 发生死锁的情况 死锁产生的四个必要条件如下&#xff1a; 6. 公平锁和非公平锁…

Hibernate多表关联——(一对多关系)

Hibernate多表关联——&#xff08;一对多关系&#xff09; 文章目录Hibernate多表关联——&#xff08;一对多关系&#xff09;1.分别在类中添加属性&#xff1a;2.hibernate建表3.使用测试类在表中添加数据hibernate是连接数据库使得更容易操作数据库数据的一个框架&#xff…

ERROR:org.apache.hadoop.hbase.PleaseHoldException: Master is initializing错误

一、问题 重新安装hbase后&#xff0c;在hbase shell中查看所有命名空间时&#xff0c;出现了ERROR:org.apache.hadoop.hbase.PleaseHoldException: Master is initializing错误。 二、方法 1、root用户下&#xff0c;关闭hbase stop-hbase.sh 2、执行以下命令删除HDFS下的hb…

go进阶篇gin框架系列三

一、模板引擎的语法 {{.}} 模板语法都包含在{{和}}中间&#xff0c;其中{{.}}中的点表示当前对象。 当我们传入一个结构体对象时&#xff0c;我们可以根据.来访问结构体的对应字段。 pipeline pipeline是指产生数据的操作。比如{{.}}、{{.Name}}等。Go的模板语法中支持使用管道…

JavaWeb开发 —— Element组件

目录 一、什么是Element&#xff1f; 二、快速入门 三、常见组件 一、什么是Element&#xff1f; ① Element&#xff1a;是饿了么团队研发的&#xff0c;一套为开发者、设计时和产品经理准备的基于Vue 2.0 的桌面端组件库。 ② 组件&#xff1a;组成网页的部件&#xff0…

FIFO的工作原理及其设计

1.简介 FIFO( First Input First Output)简单说就是指先进先出。FIFO存储器是一个先入先出的双口缓冲器&#xff0c;即第一个进入其内的数据第一个被移出&#xff0c;其中一个口是存储器的输入口&#xff0c;另一个口是存储器的输出口。 对于单片FIFO来说&#xff0c;主要有两种…

文字转语音软件的优缺点及如何选择最适合的工具

随着科技的进步&#xff0c;文字转语音技术已经越来越成熟&#xff0c;越来越多的人开始使用文字转语音软件来转换文本为语音。这种技术可以帮助人们在许多方面&#xff0c;例如改善阅读体验、方便学习、提高生产效率等。然而&#xff0c;文字转语音软件有其优缺点&#xff0c;…

一文读懂推荐系统用户画像

1.推荐系统用户画像 用户画像这个词具有广泛性。 它被应用于推荐&#xff0c;广告&#xff0c;搜索&#xff0c;个性化营销等各个领域。任何时候&#xff0c;不管出于什么目的&#xff0c;我们想描述我们的用户是谁的时候&#xff0c;大家都会用到用户画像这个词。 比如&…

VUE_学习笔记

一、 xx 二、模板语法 1.模板语法之差值语法 &#xff1a;{{ }} 主要研究&#xff1a;{{ 这里可以写什么}} 在data中声明的变量、函数等都可以。常量只要是合法的javascript表达式&#xff0c;都可以。模板表达式都被放在沙盒中&#xff0c;只能访问全局变量的一个白名单&a…