ESP32-NOW-类 -发送端-持续发送-不考虑接收端是否收到-避免程序因接收端没有返回信息而意外停止发送。
import network
import espnow
import time
class esp_now_rs(object): # 定义一个ESP-NOW通信类
def __init__(self): # 初始化方法
self.sta = network.WLAN(network.STA_IF) # 创建STA接口的WLAN对象
self.sta.active(True) # 激活STA接口
self.sta.disconnect() # 断开STA接口的WiFi连接,避免干扰ESP-NOW
self.e = espnow.ESPNow() # 创建ESP-NOW对象
self.e.active(True) # 激活ESP-NOW
self.host_mac = self.sta.config('mac') # 获取本机的MAC地址
def send_msg(self, data, peer): # 修改发送消息方法,不依赖接收确认
try:
self.e.send(peer, data) # 使用ESP-NOW发送数据到目标设备
except OSError as err:
if err.errno == 116: # ETIMEDOUT
print("Send timeout, continuing...")
else:
raise
def add_peer(self, addr): # 添加对端设备方法
self.e.add_peer(addr) # 向ESP-NOW添加对端设备
# 主程序入口
if __name__ == '__main__':
esp_sender = esp_now_rs() # 创建ESP-NOW通信对象
peer_addr = b'\xcc{\\$\xaf,' # 接收端ESP32的MAC地址
esp_sender.add_peer(peer_addr) # 注册接收端ESP32的MAC地址
counter = 0 # 初始化计数器
while True: # 主循环
# 构建要发送的消息
message = str(counter).encode('utf-8')
# 使用esp_now_rs类的send_msg方法发送消息,忽略发送失败
esp_sender.send_msg(message, peer_addr)
# 输出发送的信息(可选)
print("Sent:", message.decode('utf-8'))
# 计数器递增,然后模10以循环发送0至9
counter = (counter + 1) % 10
# 等待1秒
time.sleep(1)
ESP32-NOW-类 -接收端 - 没有信号时也持续接收-不会因为异常而停止程序的运行
import network
import espnow
import time
import json
import binascii
from machine import Pin
import ssd1306
from machine import SoftI2C
# OLED屏幕初始化
i2c = SoftI2C(scl=Pin(22), sda=Pin(21))
oled = ssd1306.SSD1306_I2C(128, 64, i2c)
class esp_now_rs(object): # 定义一个ESP-NOW通信类
def __init__(self): # 初始化方法
self.sta = network.WLAN(network.STA_IF) # 创建STA接口的WLAN对象
self.sta.active(True) # 激活STA接口
self.sta.disconnect() # 断开STA接口的WiFi连接,避免干扰ESP-NOW
self.e = espnow.ESPNow() # 创建ESP-NOW对象
self.e.active(True) # 激活ESP-NOW
self.host_mac = self.sta.config('mac') # 获取本机的MAC地址
def send_msg(self, data): # 发送消息方法,仅发送给最近注册的设备
self.e.send(self.peer, data) # 使用ESP-NOW发送数据到目标设备
def send_dict_msg(self, key, value): # 发送字典格式的消息
data = self.bytes_dictjson_bytes(key, value) # 将键值对转换为字节化的字典数据
self.e.send(self.peer, data) # 发送数据
return data # 返回发送的数据
def recv_dict_msg(self): # 接收并解析字典格式的消息
if self.e.any() > 0: # 检查是否有待接收的消息
host_adr, recv_msg = self.e.recv() # 接收消息
try:
host_adr, recv_msg = self.bytes_dictjson_dict(recv_msg) # 解析消息
except:
pass
try:
for peer, info in self.e.peers_table.items(): # 遍历对端设备表,获取RSSI等信息
mac_address = ':'.join('{:02x}'.format(b) for b in peer) # 格式化MAC地址
rssi = info[0] # 获取RSSI值
time_ms = info[1] # 获取时间戳
#print(f"MAC: {mac_address}, RSSI: {rssi} dBm, Time: {time_ms} ms") # 打印设备信息
#print(rssi)
return host_adr, recv_msg, rssi # 返回发送者地址、消息和RSSI值
except: # 兼容8266设备,8266没有RSSI功能
return host_adr, recv_msg, 0 # 返回发送者地址、消息和默认RSSI值0
def add_peer(self, addr): # 添加对端设备方法
self.peer = addr # 记录对端设备的MAC地址
self.e.add_peer(addr) # 向ESP-NOW添加对端设备
# 以下是一些辅助方法,用于数据的转换和处理
def hex_to_bytes(self, b_data): # 将字节串转换为十六进制字符串
hex_key = binascii.hexlify(b_data).decode('ascii') # 转换并解码
return hex_key
def bytes_to_hex(self, b_data): # 将十六进制字符串转换为字节串
b_hex = binascii.unhexlify(b_data) # 转换
return b_hex
def bytes_dictjson_bytes(self, key, value): # 将字典键值对转换为字节化的JSON数据
key = self.hex_to_bytes(key) # 将键转换为十六进制字符串
value = self.hex_to_bytes(value) # 将值转换为十六进制字符串
b_data = json.dumps({key: value}).encode('utf-8') # 序列化为JSON并编码为字节串
return b_data
def bytes_dictjson_dict(self, b_data): # 将字节化的JSON数据转换为字典键值对
b_dict = json.loads(b_data.decode('utf-8')) # 反序列化为字典
key = binascii.unhexlify(list(b_dict.keys())[0]) # 将键从十六进制字符串还原为字节串
value = binascii.unhexlify(b_dict[list(b_dict.keys())[0]]) # 将值从十六进制字符串还原为字节串
return key, value # 返回键值对
def recv_msg(self): # 接收消息方法
if self.e.any() > 0: # 检查是否有待接收的消息
host_adr, recv_msg = self.e.recv() # 接收消息
try:
for peer, info in self.e.peers_table.items(): # 遍历对端设备表,获取RSSI等信息
mac_address = ':'.join('{:02x}'.format(b) for b in peer) # 格式化MAC地址
rssi = info[0] # 获取RSSI值
time_ms = info[1] # 获取时间戳
#print(f"MAC: {mac_address}, RSSI: {rssi} dBm, Time: {time_ms} ms") # 打印设备信息
#print(rssi)
return host_adr, recv_msg, rssi # 返回发送者地址、消息和RSSI值
except: # 兼容8266设备,8266没有RSSI功能
return host_adr, recv_msg, 0 # 返回发送者地址、消息和默认RSSI值0
def recv_msg(self): # 修改接收消息方法,增强异常处理
try:
if self.e.any() > 0: # 检查是否有待接收的消息
host_adr, recv_msg = self.e.recv() # 接收消息
try:
for peer, info in self.e.peers_table.items(): # 遍历对端设备表,获取RSSI等信息
mac_address = ':'.join('{:02x}'.format(b) for b in peer) # 格式化MAC地址
rssi = info[0] # 获取RSSI值
time_ms = info[1] # 获取时间戳
#print(f"MAC: {mac_address}, RSSI: {rssi} dBm, Time: {time_ms} ms") # 打印设备信息
#print(rssi)
return host_adr, recv_msg, rssi # 返回发送者地址、消息和RSSI值
except Exception as e:
print("Error processing received message:", e)
return None, None, None # 返回None以指示处理失败
except OSError as e:
if e.errno == 116: # ETIMEDOUT
print("Receive timeout, continuing...")
else:
print("Other receive error:", e)
return None, None, None # 返回None以指示接收失败
# 主程序入口
if __name__ == '__main__':
aa = esp_now_rs() # 创建ESP-NOW通信对象
aa.add_peer(b'\xcc{\\$\xa3\xfc') # 注册要去连接通信的设备MAC
adddd = 0 # 初始化计数器变量
while True: # 主循环
host_adr, data, rssi = aa.recv_msg() # 接收消息,使用增强的recv_msg方法
if data is not None: # 只有当数据不为空时才显示
oled.fill(0) # 清除屏幕
oled.text('Received: {}'.format(data.decode()), 0, 0) # 显示接收数据
oled.show() # 更新屏幕显示
time.sleep(0.5) # 等待0.5秒
aa.send_msg(b'%d' % adddd) # 发送计数器值作为消息
adddd += 1 # 计数器自增
参考了:
micropython - espnow