haas506 2.0开发教程-example-蓝牙多设备扫描
- 案例说明
- 蓝牙信息克隆
- 1.手机蓝牙改名
- 信息克隆
- 代码
- 测试
案例说明
- 开发板扫描蓝牙设备,获取并打印蓝牙设备mac地址。mac地址每个设备不同,且不能更改。
- 本案例仅适用于M320开发板和HD1-RTU。
- 案例使用手机与iBeacon作为从机。开发板根据设备名称进行连接,要同时扫描到2个设备,手机的蓝牙名称需要改成与iBeacon相同的名称。
蓝牙信息克隆
使用手机克隆iBeacon的名字信息。
1.手机蓝牙改名
手机下载nRF connect蓝牙调试工具。
iBeacon设备名称,将手机蓝牙改成一样的名字
选择ADVERTISER后点击图标改名
信息克隆
点开iBeacon设备后,点击CLONE
这样就得到了2个同名的设备。
代码
- 注意更改代码中的设备名称
main.py
# This example finds and connects to a BLE temperature sensor (e.g. the one in ble_temperature.py).
# from ast import While
# from logging.config import valid_ident
# from multiprocessing.sharedctypes import Value
# from sys import platlibdir
# from this import d
# from traceback import print_list
import bluetooth
from bluetooth import BLE
from bluetooth import UUID
# import random
import ustruct
#import time
import micropython
import utime as time
#from ble_advertising import decode_services, decode_name
from micropython import const
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
_IRQ_GATTC_DESCRIPTOR_DONE = const(14)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_GATTC_INDICATE = const(19)
_ADV_IND = const(0x00)
_ADV_DIRECT_IND = const(0x01)
_ADV_SCAN_IND = const(0x02)
_ADV_NONCONN_IND = const(0x03)
# org.bluetooth.service.environmental_sensing
# _ENV_SENSE_UUID = bluetooth.UUID(0x30AF)
_ENV_SENSE_UUID = bluetooth.UUID("FDA50693-A4E2-4FB1-AFCF-C6EB07647825")
# org.bluetooth.characteristic.temperature
_TEMP_UUID = bluetooth.UUID(0x2A6E)
_TEMP_CHAR = (
_TEMP_UUID,
bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY,
)
_ENV_SENSE_SERVICE = (
_ENV_SENSE_UUID,
(_TEMP_CHAR,),
)
_UART_SERVICE_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA77")
_UART_RX_CHAR_UUID = bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA77")
_UART_TX_CHAR_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA77")
_MY_SERVICE_UUID = bluetooth.UUID(0x181A)
# org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
_ADV_TYPE_FLAGS = const(0x01)
_ADV_TYPE_NAME = const(0x09)
_ADV_TYPE_UUID16_COMPLETE = const(0x3)
_ADV_TYPE_UUID32_COMPLETE = const(0x5)
_ADV_TYPE_UUID128_COMPLETE = const(0x7)
_ADV_TYPE_UUID16_MORE = const(0x2)
_ADV_TYPE_UUID32_MORE = const(0x4)
_ADV_TYPE_UUID128_MORE = const(0x6)
_ADV_TYPE_APPEARANCE = const(0x19)
def decode_field(payload, adv_type):
i = 0
result = []
while i + 1 < len(payload):
if payload[i + 1] == adv_type:
result.append(payload[i + 2 : i + payload[i] + 1])
i += 1 + payload[i]
return result
def decode_name(payload):
n = decode_field(payload, _ADV_TYPE_NAME)
# print("n:",n)
return str(n[0], "utf-8") if n else ""
def decode_mac(addr):
if isinstance(addr, memoryview):
addr = bytes(addr)
assert isinstance(addr,bytes) and len(addr) == 6,ValueError("mac address value error")
return ":".join(['%02X' % byte for byte in addr])
def print_hex(bytes):
l = [hex(int(i)) for i in bytes]
return " ".join(l)
def decode_services(payload):
services = []
for u in decode_field(payload, _ADV_TYPE_UUID16_COMPLETE):
services.append(bluetooth.UUID(ustruct.unpack("<h", u)[0]))
for u in decode_field(payload, _ADV_TYPE_UUID32_COMPLETE):
services.append(bluetooth.UUID(ustruct.unpack("<d", u)[0]))
for u in decode_field(payload, _ADV_TYPE_UUID128_COMPLETE):
services.append(bluetooth.UUID(u))
return services
class BLETemperatureCentral:
def __init__(self, ble):
self._ble = ble
self._ble.gap_scan_name('X622080084')
self._ble.gattc_set_uuids(0xFFF2, 0xFFF1)
self._ble.active(True)
time.sleep_ms(3000)
self._ble.irq(self._irq)
self._reset()
def _reset(self):
# Cached name and address from a successful scan.
self._name = None
self._addr_type = None
self._addr = None
self._mac = None
self._adv_data = None
# Cached value (if we have one)
self._value = None
# Callbacks for completion of various operations.
# These reset back to None after being invoked.
self._scan_callback = None
self._conn_callback = None
self._read_callback = None
# Persistent callback for when new data is notified from the device.
self._notify_callback = None
# Connected device.
self._conn_handle = None
self._start_handle = None
self._end_handle = None
self._value_handle = None
def _irq(self, event, data):
if event == _IRQ_SCAN_RESULT:
# time.sleep_ms(1)
addr_type, addr, adv_type, rssi, adv_data = data
# print("ble data:{}".format(adv_data))
# if adv_type in (_ADV_IND, _ADV_DIRECT_IND) and _ENV_SENSE_UUID in decode_services(adv_data):
if adv_type in (_ADV_IND, _ADV_DIRECT_IND):
# if adv_type in (_ADV_DIRECT_IND): #ibeacon的类型
# Found a potential device, remember it and stop scanning.
self._addr_type = addr_type
self._addr = bytes(
addr
) # Note: addr buffer is owned by caller so need to copy it.
self._name = decode_name(adv_data)
self._mac = decode_mac(addr)
# self._adv_data = bytes(
# adv_data
# ) # Note: addr buffer is owned by caller so need to copy it.
# self._name = decode_name(adv_data) or "?"
# print("ble addr:",self._mac )
# print("ble data:",data )
elif event == _IRQ_SCAN_DONE:
#time.sleep_ms(1000)
self._ble.gap_scan(None)
#print("##### scan done #####")
val = 1
if self._scan_callback:
if self._addr:
# Found a device during the scan (and the scan was explicitly stopped).
self._scan_callback(self._addr_type, self._addr, self._name)
self._scan_callback = None
else:
# Scan timed out.
self._scan_callback(None, None, None)
elif event == _IRQ_PERIPHERAL_CONNECT:
print('connect')
elif event == _IRQ_PERIPHERAL_DISCONNECT:
print('disconnect')
elif event == _IRQ_GATTC_SERVICE_RESULT:
# Connected device returned a service.
conn_handle, start_handle, end_handle, uuid = data
if conn_handle == self._conn_handle and uuid == _ENV_SENSE_UUID:
self._start_handle, self._end_handle = start_handle, end_handle
elif event == _IRQ_GATTC_SERVICE_DONE:
# Service query complete.
if self._start_handle and self._end_handle:
self._ble.gattc_discover_characteristics(
self._conn_handle, self._start_handle, self._end_handle
)
else:
print("Failed to find environmental sensing service.")
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
# Connected device returned a characteristic.
conn_handle, def_handle, value_handle, properties, uuid = data
if conn_handle == self._conn_handle and uuid == _TEMP_UUID:
self._value_handle = value_handle
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
# Characteristic query complete.
if self._value_handle:
# We've finished connecting and discovering device, fire the connect callback.
if self._conn_callback:
self._conn_callback()
else:
print("Failed to find temperature characteristic.")
elif event == _IRQ_GATTC_READ_RESULT:
buffer = self._ble.gattc_read(0,0)
print('recv data:',buffer)
elif event == _IRQ_GATTC_READ_DONE:
# Read completed (no-op).
conn_handle, value_handle, status = data
elif event == _IRQ_GATTC_NOTIFY:
# The ble_temperature.py demo periodically notifies its value.
conn_handle, value_handle, notify_data = data
if conn_handle == self._conn_handle and value_handle == self._value_handle:
self._update_value(notify_data)
if self._notify_callback:
self._notify_callback(self._value)
elif event == _IRQ_GATTC_WRITE_DONE:
conn_handle, value_handle, status = data
print("TX complete")
# Returns true if we've successfully connected and discovered characteristics.
def is_connected(self):
return self._conn_handle is not None and self._value_handle is not None
# Find a device advertising the environmental sensor service.
def scan(self, callback=None):
self._addr_type = None
self._addr = None
self._scan_callback = callback
self._ble.gap_scan(4000, 6000, 6000)
# Connect to the specified device (otherwise use cached address from a scan).
def connect(self, addr_type=None, addr=None, callback=None):
self._addr_type = addr_type or self._addr_type
self._addr = addr or self._addr
self._conn_callback = callback
if self._addr_type is None or self._addr is None:
return False
self._ble.gap_connect(self._addr_type, self._addr)
return True
# Disconnect from current device.
def disconnect(self):
if not self._conn_handle:
return
self._ble.gap_disconnect(self._conn_handle)
self._reset()
# Issues an (asynchronous) read, will invoke callback with data.
def read(self, callback):
if not self.is_connected():
return
self._read_callback = callback
self._ble.gattc_read(self._conn_handle, self._value_handle)
# Sets a callback to be invoked when the device notifies us.
def on_notify(self, callback):
self._notify_callback = callback
def _update_value(self, data):
# Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius.
self._value = ustruct.unpack("<h", data)[0] / 100
return self._value
def value(self):
return self._value
def write(self, v, response=False):
# if not self.is_connected():
# return
self._ble.gattc_write(0, 0, v, 1 if response else 0)
print("data_send:", v)
global v
def demo():
time.sleep_ms(2000)
ble = bluetooth.BLE()
central = BLETemperatureCentral(ble)
time.sleep_ms(2000)
not_found = False
def on_scan(addr_type, addr, name):
if addr_type is not None:
print("Found sensor addr_type:", addr_type)
print("Found sensor addr:", print_hex(addr))
print("Found sensor name:", name)
global v
v = 1
else:
nonlocal not_found
not_found = True
print("No sensor found.")
while True :
central.scan(callback=on_scan)
time.sleep_ms(2000)
# if v == 1:
# central.connect()
# while True:
# ble.gattc_notify(1, 1)
# # central.write('123', False)
# time.sleep_ms(1000)
if __name__ == "__main__":
demo()
测试
Haas506可以搜索到2个不同mac地址的设备,并将mac地址打印。