3.28 haas506 2.0开发教程-example-蓝牙多设备扫描(仅支持M320,HD1)

news2024/10/5 13:47:59

haas506 2.0开发教程-example-蓝牙多设备扫描

  • 案例说明
  • 蓝牙信息克隆
      • 1.手机蓝牙改名
      • 信息克隆
  • 代码
  • 测试

案例说明

  • 开发板扫描蓝牙设备,获取并打印蓝牙设备mac地址。mac地址每个设备不同,且不能更改。
  • 本案例仅适用于M320开发板和HD1-RTU。
  • 案例使用手机与iBeacon作为从机。开发板根据设备名称进行连接,要同时扫描到2个设备,手机的蓝牙名称需要改成与iBeacon相同的名称。

蓝牙信息克隆

使用手机克隆iBeacon的名字信息。

1.手机蓝牙改名

手机下载nRF connect蓝牙调试工具。
在这里插入图片描述
iBeacon设备名称,将手机蓝牙改成一样的名字
在这里插入图片描述

选择ADVERTISER后点击图标改名
在这里插入图片描述

在这里插入图片描述

信息克隆

点开iBeacon设备后,点击CLONE
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
这样就得到了2个同名的设备。

代码

  • 注意更改代码中的设备名称

main.py

# This example finds and connects to a BLE temperature sensor (e.g. the one in ble_temperature.py).

# from ast import While
# from logging.config import valid_ident
# from multiprocessing.sharedctypes import Value
# from sys import platlibdir
# from this import d
# from traceback import print_list
import bluetooth
from bluetooth import BLE
from bluetooth import UUID
# import random
import ustruct

#import time
import micropython

import utime as time

#from ble_advertising import decode_services, decode_name

from micropython import const

_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
_IRQ_GATTC_DESCRIPTOR_DONE = const(14)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_GATTC_INDICATE = const(19)

_ADV_IND = const(0x00)
_ADV_DIRECT_IND = const(0x01)  
_ADV_SCAN_IND = const(0x02)
_ADV_NONCONN_IND = const(0x03)

# org.bluetooth.service.environmental_sensing
# _ENV_SENSE_UUID = bluetooth.UUID(0x30AF)
_ENV_SENSE_UUID = bluetooth.UUID("FDA50693-A4E2-4FB1-AFCF-C6EB07647825")
# org.bluetooth.characteristic.temperature
_TEMP_UUID = bluetooth.UUID(0x2A6E)
_TEMP_CHAR = (
    _TEMP_UUID,
    bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY,
)
_ENV_SENSE_SERVICE = (
    _ENV_SENSE_UUID,
    (_TEMP_CHAR,),
)

_UART_SERVICE_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA77")
_UART_RX_CHAR_UUID = bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA77")
_UART_TX_CHAR_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA77")

_MY_SERVICE_UUID = bluetooth.UUID(0x181A)

# org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)

_ADV_TYPE_FLAGS = const(0x01)
_ADV_TYPE_NAME = const(0x09)
_ADV_TYPE_UUID16_COMPLETE = const(0x3)
_ADV_TYPE_UUID32_COMPLETE = const(0x5)
_ADV_TYPE_UUID128_COMPLETE = const(0x7)
_ADV_TYPE_UUID16_MORE = const(0x2)
_ADV_TYPE_UUID32_MORE = const(0x4)
_ADV_TYPE_UUID128_MORE = const(0x6)
_ADV_TYPE_APPEARANCE = const(0x19)

def decode_field(payload, adv_type):
    i = 0
    result = []
    while i + 1 < len(payload):
        if payload[i + 1] == adv_type:
            result.append(payload[i + 2 : i + payload[i] + 1])
        i += 1 + payload[i]
    return result

def decode_name(payload):
    n = decode_field(payload, _ADV_TYPE_NAME)
    # print("n:",n)
    return str(n[0], "utf-8") if n else ""

def decode_mac(addr):
    if isinstance(addr, memoryview):
        addr = bytes(addr)
    assert isinstance(addr,bytes) and len(addr) == 6,ValueError("mac address value error")
    return ":".join(['%02X' % byte for byte in addr])

def print_hex(bytes):
  l = [hex(int(i)) for i in bytes]
  return " ".join(l)

def decode_services(payload):
    services = []
    for u in decode_field(payload, _ADV_TYPE_UUID16_COMPLETE):
        services.append(bluetooth.UUID(ustruct.unpack("<h", u)[0]))
    for u in decode_field(payload, _ADV_TYPE_UUID32_COMPLETE):
        services.append(bluetooth.UUID(ustruct.unpack("<d", u)[0]))
    for u in decode_field(payload, _ADV_TYPE_UUID128_COMPLETE):
        services.append(bluetooth.UUID(u))
    return services

class BLETemperatureCentral:
    def __init__(self, ble):
        self._ble = ble
        self._ble.gap_scan_name('X622080084')
        self._ble.gattc_set_uuids(0xFFF2, 0xFFF1)
        self._ble.active(True)
        time.sleep_ms(3000)
        self._ble.irq(self._irq)
        self._reset()

    def _reset(self):
        # Cached name and address from a successful scan.
        self._name = None
        self._addr_type = None
        self._addr = None
        self._mac = None
        self._adv_data = None
        # Cached value (if we have one)
        self._value = None

        # Callbacks for completion of various operations.
        # These reset back to None after being invoked.
        self._scan_callback = None
        self._conn_callback = None
        self._read_callback = None

        # Persistent callback for when new data is notified from the device.
        self._notify_callback = None

        # Connected device.
        self._conn_handle = None
        self._start_handle = None
        self._end_handle = None
        self._value_handle = None

    def _irq(self, event, data):
        
        if event == _IRQ_SCAN_RESULT:
            # time.sleep_ms(1)
            addr_type, addr, adv_type, rssi, adv_data = data

            # print("ble data:{}".format(adv_data))
            # if adv_type in (_ADV_IND, _ADV_DIRECT_IND) and _ENV_SENSE_UUID in decode_services(adv_data):
            if adv_type in (_ADV_IND, _ADV_DIRECT_IND):
            # if adv_type in (_ADV_DIRECT_IND): #ibeacon的类型
                # Found a potential device, remember it and stop scanning.
                self._addr_type = addr_type
                self._addr = bytes(
                    addr
                )  # Note: addr buffer is owned by caller so need to copy it.
                self._name = decode_name(adv_data)
                self._mac = decode_mac(addr)
                # self._adv_data = bytes(
                #     adv_data
                # )  # Note: addr buffer is owned by caller so need to copy it.
                # self._name = decode_name(adv_data) or "?"
                # print("ble addr:",self._mac )
                # print("ble data:",data )
        elif event == _IRQ_SCAN_DONE:
            #time.sleep_ms(1000)
            self._ble.gap_scan(None)
            #print("##### scan done #####")
            val = 1
            if self._scan_callback:
                if self._addr:
                    # Found a device during the scan (and the scan was explicitly stopped).
                    self._scan_callback(self._addr_type, self._addr, self._name)
                    self._scan_callback = None
                else:
                    # Scan timed out.
                    self._scan_callback(None, None, None)

        elif event == _IRQ_PERIPHERAL_CONNECT:
            print('connect')

        elif event == _IRQ_PERIPHERAL_DISCONNECT:
            print('disconnect')

        elif event == _IRQ_GATTC_SERVICE_RESULT:
            # Connected device returned a service.
            conn_handle, start_handle, end_handle, uuid = data
            if conn_handle == self._conn_handle and uuid == _ENV_SENSE_UUID:
                self._start_handle, self._end_handle = start_handle, end_handle

        elif event == _IRQ_GATTC_SERVICE_DONE:
            # Service query complete.
            if self._start_handle and self._end_handle:
                self._ble.gattc_discover_characteristics(
                    self._conn_handle, self._start_handle, self._end_handle
                )
            else:
                print("Failed to find environmental sensing service.")

        elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
            # Connected device returned a characteristic.
            conn_handle, def_handle, value_handle, properties, uuid = data
            if conn_handle == self._conn_handle and uuid == _TEMP_UUID:
                self._value_handle = value_handle

        elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
            # Characteristic query complete.
            if self._value_handle:
                # We've finished connecting and discovering device, fire the connect callback.
                if self._conn_callback:
                    self._conn_callback()
            else:
                print("Failed to find temperature characteristic.")

        elif event == _IRQ_GATTC_READ_RESULT:
            buffer = self._ble.gattc_read(0,0)
            print('recv data:',buffer)

        elif event == _IRQ_GATTC_READ_DONE:
            # Read completed (no-op).
            conn_handle, value_handle, status = data

        elif event == _IRQ_GATTC_NOTIFY:
            # The ble_temperature.py demo periodically notifies its value.
            conn_handle, value_handle, notify_data = data
            if conn_handle == self._conn_handle and value_handle == self._value_handle:
                self._update_value(notify_data)
                if self._notify_callback:
                    self._notify_callback(self._value)
        elif event == _IRQ_GATTC_WRITE_DONE:
            conn_handle, value_handle, status = data
            print("TX complete")

    # Returns true if we've successfully connected and discovered characteristics.
    def is_connected(self):
        return self._conn_handle is not None and self._value_handle is not None

    # Find a device advertising the environmental sensor service.
    def scan(self, callback=None):
        self._addr_type = None
        self._addr = None
        self._scan_callback = callback
        self._ble.gap_scan(4000, 6000, 6000)

    # Connect to the specified device (otherwise use cached address from a scan).
    def connect(self, addr_type=None, addr=None, callback=None):
        self._addr_type = addr_type or self._addr_type
        self._addr = addr or self._addr
        self._conn_callback = callback
        if self._addr_type is None or self._addr is None:
            return False
        self._ble.gap_connect(self._addr_type, self._addr)
        return True

    # Disconnect from current device.
    def disconnect(self):
        if not self._conn_handle:
            return
        self._ble.gap_disconnect(self._conn_handle)
        self._reset()

    # Issues an (asynchronous) read, will invoke callback with data.
    def read(self, callback):
        if not self.is_connected():
            return
        self._read_callback = callback
        self._ble.gattc_read(self._conn_handle, self._value_handle)

    # Sets a callback to be invoked when the device notifies us.
    def on_notify(self, callback):
        self._notify_callback = callback

    def _update_value(self, data):
        # Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius.
        self._value = ustruct.unpack("<h", data)[0] / 100
        return self._value

    def value(self):
        return self._value

    def write(self, v, response=False):
        # if not self.is_connected():
        #     return
        self._ble.gattc_write(0, 0, v, 1 if response else 0)
        print("data_send:", v)

global v
def demo():
    time.sleep_ms(2000)
    ble = bluetooth.BLE()
    central = BLETemperatureCentral(ble)
    time.sleep_ms(2000)
    not_found = False
    
    def on_scan(addr_type, addr, name):
        if addr_type is not None:
            print("Found sensor addr_type:", addr_type)
            print("Found sensor addr:", print_hex(addr))
            print("Found sensor name:", name)
            global v
            v = 1
        else:
            nonlocal not_found
            not_found = True
            print("No sensor found.")

    while True :
        central.scan(callback=on_scan)
        time.sleep_ms(2000)
    # if v == 1:
    #     central.connect()

    # while True:
    #     ble.gattc_notify(1, 1)
    #     # central.write('123', False)
    #     time.sleep_ms(1000)

if __name__ == "__main__":
    demo()

测试

Haas506可以搜索到2个不同mac地址的设备,并将mac地址打印。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/342542.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LeetCode(剑指offer) Day1

1.用两个栈实现一个队列。队列的声明如下&#xff0c;请实现它的两个函数 appendTail 和 deleteHead &#xff0c;分别完成在队列尾部插入整数和在队列头部删除整数的功能。(若队列中没有元素&#xff0c;deleteHead 操作返回 -1 ) 解题过程记录&#xff1a;本题就是用两个栈&…

如何激励你的内容团队产出更好的创意

对于一个品牌而言&#xff0c;如何创造吸引受众并对受众有价值内容是十分关键的。随着市场数字化的推进&#xff0c;优质的创意和内容输出对一个品牌在市场中有着深远的影响。对于很多内容策划和创作者来说&#xff0c;不断地产出高质量有创意的内容是一件非常有挑战性的事情。…

【零基础入门前端系列】—超链接和文本格式化标签(四)

【零基础入门前端系列】—超链接和文本格式化标签&#xff08;四&#xff09; 一、超链接 HTML使用标签 <a>来设置超文本链接。超链接可以是一个字&#xff0c;一个词&#xff0c;或者一组词&#xff0c;也可以是一幅图像&#xff0c;您可以点击这些内容来跳转到新的文…

别再问我供应商质量工程师(SQE)是干什么的了,这是最好的解释。

供应商质量工程师&#xff08;SQE&#xff09;是指一种负责监督供应商质量的职业。SQE的工作主要包括以下几项内容&#xff1a; 核查供应商质量&#xff1a;通过对供应商的产品、服务和生产流程的评估&#xff0c;来确保供应商的质量符合公司的标准和要求。开展质量审核&#…

【STM32笔记】HAL库低功耗STOP停止模式的串口唤醒(解决串口唤醒和回调无法一起使用的问题)

【STM32笔记】HAL库低功耗停止模式的串口唤醒&#xff08;解决串口唤醒时钟问题&#xff09; 前文&#xff1a; blog.csdn.net/weixin_53403301/article/details/128216064 【STM32笔记】HAL库低功耗模式配置&#xff08;ADC唤醒无法使用、低功耗模式无法烧录解决方案&#xf…

家政服务小程序实战教程10-分类展示

小程序一般底部菜单栏会有一个分类的功能&#xff0c;点击分类&#xff0c;以侧边栏导航的形式列出所有类目&#xff0c;点击某个类目可以做数据筛选&#xff0c;我们本篇就实现一下该功能 01 优化数据源 在我们家政服务小程序里&#xff0c;我们已经建立了类型和服务的数据源…

企业财务管理升级,智慧税务和数据可视化打造新标准

一、引言在发展社会主义市场经济的过程中&#xff0c;税收承担着组织财政收入、调控经济、调节社会分配的职能。中国每年财政收入的90%以上来自税收&#xff0c;其地位和作用越来越重要&#xff0c;可称之为国家经济的“晴雨表”&#xff0c;有效进行税务管理、充分挖掘税务大数…

面试碰壁15次,作为一个已经28岁的测试工程师,路究竟该怎么走....

3年测试经验原来什么都不是&#xff0c;只是给你的简历上画了一笔&#xff0c;一直觉得经验多&#xff0c;无论在哪都能找到满意的工作&#xff0c;但是现实却是给我打了一个大巴掌&#xff01;事后也不会给糖的那种... 先说一下自己的个人情况&#xff0c;普通二本计算机专业…

深度学习知识补充

候选位置(proposal) RCNN 什么时ROI&#xff1f; 在图像处理领域&#xff0c;感兴趣区域(region of interest &#xff0c; ROI) 是从图像中选择的一个图像区域&#xff0c;这个区域是你的图像分析所关注的重点。圈定该区域以便进行进一步处理。使用ROI圈定你想读的目标&…

dfs(十)矩阵最长递增路径 (注意dfs前后状态的更新)

描述 给定一个 n 行 m 列矩阵 matrix &#xff0c;矩阵内所有数均为非负整数。 你需要在矩阵中找到一条最长路径&#xff0c;使这条路径上的元素是递增的。并输出这条最长路径的长度。 这个路径必须满足以下条件&#xff1a; 1. 对于每个单元格&#xff0c;你可以往上&#…

全解析 ESM 模块语法,出去还是进来都由你说了算

模块语法是ES6的一个重要特性&#xff0c;它的出现让JavaScript的模块化编程成为了可能。 在JavaScript中可以直接使用import和export关键字来导入和导出模块&#xff0c;但是这种语法并不是ES6的标准&#xff0c;而是ESM&#xff08;ECMAScript Module&#xff09;模块语法的…

【CICD】Jenkins 构建部署前端项目

出于对 CICD 的研究与学习&#xff0c;在初步学习了解并安装 jenkins 后&#xff0c;记录一下对于使用 jenkins 部署前端项目的过程。 1.目标 希望能够实现的是&#xff1a;在本地使用 git 工具将项目代码推送到远程仓库&#xff08;本篇使用 gitee 演示&#xff09;&#xf…

[C][KEIL5][IAR] 全局取消结构体对齐

文章目录一、 正常编译&#xff1a;二、 -fpack-struct 全局取消结构体对齐三、 结论&#xff1a;结构体字节不进行对齐的用途&#xff08;1&#xff09;减小内存占用的空间&#xff08;2&#xff09;直接将结构体作为通信协议&#xff08;在低带宽下通讯&#xff09;&#xff…

【Java】代码块的细节你搞懂了吗(基础知识七)

希望像唠嗑一样&#xff0c;one step one futher。 目录 &#xff08;1&#xff09;代码块的应用场景 &#xff08;2&#xff09;代码块的细节 1.static 代码块只加载一次 2.当调用类的静态成员时&#xff0c;类会加载 3. 使用类的静态成员时&#xff0c;static代码块会被执…

大数据第一轮复习笔记

linux: 添加用户 useradd 删除用户 userdel useradd -d指定组 添加组 groupadd 删除组 groupdel 创建目录 mkdir -p 删除目录 rm -rf 创建目录 touch cat -n 查看文件(显示行号)

Axure 9 收录不同效果的制作过程

效果类别 一、默认选中实现单选效果 1、默认选中 点击组件&#xff0c;右键选择selected字样&#xff1b; 2、实现单选效果 点击所有组件&#xff0c;右键选择selected group&#xff0c;填好命名&#xff0c;并设置选中时的组件样式&#xff1b;选择其中一个组件&#xf…

EMQX Cloud Serverless 正式上线:三秒部署、按量计费的 MQTT Serverless 云服务

近日&#xff0c;全球领先的开源物联网数据基础设施软件供应商 EMQ 正式发布了 MQTT Serverless 云服务 —— EMQX Cloud Serverless 的 Beta 版本&#xff0c;开创性地采用弹性多租户技术&#xff0c;用户无需关心服务器基础设施和服务规格伸缩所需资源&#xff0c;仅用三秒即…

十个程序员编程时的简单方法与技巧

你要记住&#xff0c;你写的代码是给人看的 作为一名程序员&#xff0c;希望在你某天离开公司后回想起的若干个开心时刻中&#xff0c;有一个会是因为你面对自己刚刚出炉了一份让自己心动的代码的那份感动&#xff0c;而不要成为上面提到的那个“离开后&#xff0c;公司才知道…

day11_面向对象

今日内容 零、 复习昨日 一、一日一题(数组,OOP) 二、面向对象练习&#xff08;方法参数返回值&#xff09; 三、局部变量&成员变量 四、this关键字 五、构造方法 六、重载 七、封装 小破站同步上课视频: https://space.bilibili.com/402601570/channel/collectiondetail?…

Spring MVC

一、Spring MVC介绍 a. Spring MVC是一个Web框架 b. Spring MVC是基于Servlet API构成的 MVC 是 Model View Controller 的缩写。 MVC 是⼀种思想&#xff0c;⽽ Spring MVC 是对 MVC 思想的具体实现。 学习Spring MVC目标&#xff1a; a.连接功能&#xff1a;将用户&#xff…