车联网架构设计(二)_消息缓存

news2025/1/23 7:21:33

在上一篇博客车联网架构设计(一)_消息平台的搭建-CSDN博客中,我介绍了车联网平台需要实现的一些功能,并介绍了如何用EMQX+HAPROXY来搭建一个MQTT消息平台。车联网平台的应用需要消费车辆发布的消息,同时也会下发消息给车辆,以实现车辆控制等功能。通常我们会在MQTT消息平台收到车辆消息后对消息进行缓存,以供上层应用使用。我们可以直接把消息保存到数据库,或者引入一个消息队列,这样可以方便对应用和车辆之间进行解耦合。

这里我将介绍一下如何引入一个Kafka消息队列,把车辆以及上层应用之间需要交互的消息缓存到这个消息队列之中。

在EMQX的企业版中,提供了丰富的数据桥接功能,可以支持把MQTT消息桥接到其他外部系统,例如Kafka或数据库中。但是在开源版,只提供了很有限的数据桥接,不支持Kafka。为此我们可以通过给EMQX开发Hook extension的方式,来加载我们的插件,实现把数据桥接到Kafak。

在EMQX官网的介绍中,Hook扩展是通过gRPC的方式来实现的,支持多种编程语言。如下图:

这里我以Python为例子,来定义一个扩展。

搭建Kafka

首先是在K8S上部署一个Kafka集群,这里我选择了Strimizi的Kafka operator来部署

先创建一个namespace

kubectl create namespace kafka

安装Operator, CRD以及定义RBAC等

kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka

创建一个只包含一个节点的Kafka

kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml -n kafka 

打开两个终端,分别运行以下的订阅和发布的指令,测试Kafka是否正常工作

kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.38.0-kafka-3.6.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic
kubectl -n kafka run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.38.0-kafka-3.6.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning

开发ExHook

首先是获取当前EMQX版本定义的gPRC proto。在EMQX服务器的/opt/emqx/lib/emqx_exhook-5.0.14/priv/protos/目录下面有一个exhook.proto文件。

运行以下命令来基于这个proto生成python文件

python -m grpc_tools.protoc -I./ --python_out=. --pyi_out=. --grpc_python
_out=. ./exhook.proto

运行之后,在当前目录下会新生成三个文件,exhook_pb2_grpc.py,exhook_pb2.py,exhook_pb2.pyi

新建一个exhook_server.py文件,继承exhook_pb2_grpc里面的HookProviderServicer,注册对应事件的处理方法,如以下代码:

from concurrent import futures
import logging
from multiprocessing.sharedctypes import Value

import grpc

import exhook_pb2
import exhook_pb2_grpc

import pickle
from kafka import KafkaProducer

class HookProvider(exhook_pb2_grpc.HookProviderServicer):
    def __init__(self):
        self.producer = KafkaProducer(bootstrap_servers='my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092')

    def OnProviderLoaded(self, request, context):
        print("OnProviderLoaded:", request)
        '''
        specs = [exhook_pb2.HookSpec(name="client.connect"),
                 exhook_pb2.HookSpec(name="client.connack"),
                 exhook_pb2.HookSpec(name="client.connected"),
                 exhook_pb2.HookSpec(name="client.disconnected"),
                 exhook_pb2.HookSpec(name="client.authenticate"),
                 exhook_pb2.HookSpec(name="client.authorize"),
                 exhook_pb2.HookSpec(name="client.subscribe"),
                 exhook_pb2.HookSpec(name="client.unsubscribe"),

                 exhook_pb2.HookSpec(name="session.created"),
                 exhook_pb2.HookSpec(name="session.subscribed"),
                 exhook_pb2.HookSpec(name="session.unsubscribed"),
                 exhook_pb2.HookSpec(name="session.resumed"),
                 exhook_pb2.HookSpec(name="session.discarded"),
                 exhook_pb2.HookSpec(name="session.takenover"),
                 exhook_pb2.HookSpec(name="session.terminated"),

                 exhook_pb2.HookSpec(name="message.publish"),
                 exhook_pb2.HookSpec(name="message.delivered"),
                 exhook_pb2.HookSpec(name="message.acked"),
                 exhook_pb2.HookSpec(name="message.dropped")
                ]
        '''
        specs = [exhook_pb2.HookSpec(name="message.publish")]
        return exhook_pb2.LoadedResponse(hooks=specs)

    def OnProviderUnloaded(self, request, context):
        print("OnProviderUnloaded:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientConnect(self, request, context):
        print("OnClientConnect:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientConnack(self, request, context):
        print("OnClientConnack:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientConnected(self, request, context):
        print("OnClientConnected:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientDisconnected(self, request, context):
        print("OnClientDisconnected:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientAuthenticate(self, request, context):
        print("OnClientAuthenticate:", request)
        reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", bool_result=True)
        return reply

    def OnClientAuthorize(self, request, context):
        print("OnClientAuthorize:", request)
        reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", bool_result=True)
        return reply

    def OnClientSubscribe(self, request, context):
        print("OnClientSubscribe:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientUnsubscribe(self, request, context):
        print("OnClientUnsubscribe:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionCreated(self, request, context):
        print("OnSessionCreated:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionSubscribed(self, request, context):
        print("OnSessionSubscribed:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionUnsubscribed(self, request, context):
        print("OnSessionUnsubscribed:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionResumed(self, request, context):
        print("OnSessionResumed:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionDiscarded(self, request, context):
        print("OnSessionDiscarded:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionTakenover(self, request, context):
        print("OnSessionTakenover:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionTerminated(self, request, context):
        print("OnSessionTerminated:", request)
        return exhook_pb2.EmptySuccess()

    def OnMessagePublish(self, request, context):
        self.producer.send('testtopic', pickle.dumps(nmsg))
        reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", message=nmsg)
        return reply

    ## case2: stop publish the 't/d' messages
    #def OnMessagePublish(self, request, context):
    #    nmsg = request.message
    #    if nmsg.topic == 't/d':
    #        nmsg.payload = b""
    #        nmsg.headers['allow_publish'] = b"false"
    #
    #    reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", message=nmsg)
    #    return reply

    def OnMessageDelivered(self, request, context):
        print("OnMessageDelivered:", request)
        return exhook_pb2.EmptySuccess()

    def OnMessageDropped(self, request, context):
        print("OnMessageDropped:", request)
        return exhook_pb2.EmptySuccess()

    def OnMessageAcked(self, request, context):
        print("OnMessageAcked:", request)
        return exhook_pb2.EmptySuccess()

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    exhook_pb2_grpc.add_HookProviderServicer_to_server(HookProvider(), server)
    server.add_insecure_port('[::]:9000')
    server.start()

    print("Started gRPC server on [::]:9000")

    server.wait_for_termination()


if __name__ == '__main__':
    logging.basicConfig()
    serve()

 解释一下代码,在OnProvidedLoader里面是加载各种事件的钩子,这里只加载message.publish事件。在OnMessagePublish是对应事件的处理函数,这里把收到的MQTT消息通过Pickle进行序列化,发送到Kafka的对应topic

部署ExHook

写一个Dockerfile,把代码打包为一个镜像

FROM python:3.7-slim
WORKDIR /app
COPY requirements.txt ./
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "./exhook_server.py"]

 requirements.txt文件内容为

grpcio==1.59.3
grpcio-tools==1.59.3
kafka-python==2.0.2

运行以下命令来构建镜像

docker build --network=host -t emqx_plugin_test:v1.0 .

创建一个部署这个镜像的deployment和service,然后部署到K8S

apiVersion: apps/v1
kind: Deployment
metadata:
  name: emqx-hookserver-deployment
  labels:
    app: hookserver
  namespace: emqx
spec:
  replicas: 1
  selector:
    matchLabels:
      app: hookserver
  template:
    metadata:
      labels:
        app: hookserver
    spec:
      containers:
      - name: hookserver
        image: emqx_plugin_test:v1.0
        imagePullPolicy: Never
        resources:
          requests:
            memory: "250Mi"
            cpu: "100m"
          limits:
            memory: "250Mi"
            cpu: "100m"
        ports:
        - name: rpc
          containerPort: 9000
---
apiVersion: v1
kind: Service
metadata:
  name: hookserver-service
  namespace: emqx
spec:
  selector:
    app: hookserver
  ports:
    - name: rpc
      port: 9000

回到EMQX的控制面板Dashboard,在ExHook里面添加,url填入http://hookserver-service.emqx.svc.cluster.local:9000,然后选择启用即可,可以看到状态为连接成功,并且显示注册了1个钩子。

在minikube上部署,一开始是显示连接中,等了很久仍然无法连接成功,最后查了资料,原来是coredns的问题,运行以下命令重启即可:

kubectl -n kube-system rollout restart deployment coredns

之后打开订阅Kafka的testtopic,然后通过MQTT连接到EMQX发送消息,可以看到Kafka能成功收到EMQX转发的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1289344.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Hive】——数据仓库

1.1 数仓概念 数据仓库(data warehouse):是一个用于存储,分析,报告的数据系统 目的:是构建面向分析的集成化数据环境,分析结果为企业提供决策支持 特点: 数据仓库本身不产生任何数据…

robotFramwork 中如何禁用或跳过其中某个 testcase

在 Robot Framework 中,你可以通过添加一个特殊的标签(tag)来禁用某个测试用例。这个标签是 robot:skip。 robotframework *** Settings *** Test Setup Open Application*** Test Cases *** My Test Case[Tags] robot:skipDo Some…

Linux 环境下,jdbc连接mysql问题

1. 下载MySQL的JDBC驱动: 从MySQL官网下载最新的MySQL Connector/J,并将其解压到某个目录,比如/usr/local/mysql/。 2. 将JDBC驱动添加到类路径: 将JDBC驱动添加到类路径,可以使用以下命令: export CLA…

分布式系统硬件资源池原理和接入实践

一、硬件资源池理念产生背景 随着智能设备的发展和普及,越来越多的智能设备已经深入人们的生活,比如手机,PC,平板,各类穿戴设备等。当前单个设备的外设资源已经非常丰富,硬件能力也很强大。像我们的手机、…

什么是数字化工厂?

数字化工厂通常需要资金和技术支持,对大企业来说更容易投入建设。 中小企业难道就毫无机会了吗? 当然不是。中小企业也能够实现数字化工厂,只是可能需要采取不同的策略。虽然中小企业可能面临资源有限的挑战,但通过渐进式的方式和…

win10与 vm虚拟机win7共享文件夹创建

1:在win10(主机)电脑先随意共享一个文件夹 2:在win10(主机)上创建一个网络映射 右键此电脑选择映射网络驱动器 成功后会多出这个网络位置 3:win7虚拟机设置 在虚拟机中点击计算机右键添加一个网络位置

11月榜单丨飞瓜数据B站UP主排行榜(哔哩哔哩平台)发布!

飞瓜轻数发布2023年11月飞瓜数据UP主排行榜(B站平台),通过充电数、涨粉数、成长指数、带货数据等维度来体现UP主账号成长的情况,为用户提供B站号综合价值的数据参考,根据UP主成长情况用户能够快速找到运营能力强的B站U…

《WebGIS快速开发教程》第5版“惊喜”更新啦

我的书籍《WebGIS快速开发教程》第5版,经过忙碌的编写,终于发布啦! 先给大家看看新书的封面: 这次的封面我们经过了全新的设计,不同于以往的任何一个版本。从封面就可以看出第5版肯定有不小的更新。 那么我们话不多说…

【musl-pwn】msul-pwn 刷题记录 -- musl libc 1.2.2

前言 本文不分析 musl libc 相关源码,仅仅为刷题记录,请读者自行学习相关知识(看看源码就行了,代码量也不大) starCTF2022_babynote 保护:保护全开 程序与漏洞分析: 程序实现了一个菜单堆&…

SL4010升压恒压控制器芯片 2.5V启动 最大10A电流 支持300W大功率

SL4010是一款升压恒压控制器芯片,它具有2.5V启动、最大10A电流、支持300W大功率等特点。该芯片采用先进的控制技术,能够实现高效的电能转换,同时保持稳定的输出电压和电流。 SL4010芯片的主要功能是将输入的直流电压升高到所需的电压&#xf…

mysql中year函数有什么用

YEAR()函数用于提取日期或日期时间值中的年份。可以用于提取DATE、DATETIME或TIMESTAMP列中的年份。 SELECT YEAR(date_column) FROM table;# 提取字符串中的数据SELECT YEAR(2023-07-19) FROM table_name;

Spring-Boot---配置文件

文章目录 配置文件的作用配置文件的格式PropertiesProperties基本语法读取Properties配置文件 ymlyml基本语法读取yml配置文件 Properties VS Yml 配置文件的作用 整个项目中所有重要的数据都是在配置文件中配置的,具有非常重要的作用。比如: 数据库的…

[UIM]论文解读:subword Regularization: Multiple Subword Candidates

文章目录 一、完整代码二、论文解读2.1 介绍2.2 NMT2.3 Unigram language model2.4 subword 抽样2.5 效果 三、整体总结 论文:Subword Regularization: Improving Neural Network Translation Models with Multiple Subword Candidates 作者:Taku Kudo 时…

策略梯度简明教程

策略梯度方法 (PG:Policy Gradient) 是强化学习 (RL:Reinforcement Learning) 中常用的算法。 1、从库里的本能开始 PG的原理很简单:我们观察,然后行动。人类根据观察采取行动。 引用斯蒂芬库里的一句话: 你必须依靠…

SQL Server 数据库,创建数据库并使用索引查询学员考试成绩

5.1索引 索引提供指针以指向存储在表中指定列的数据值,然后根据指定的次序排列这些指针,再跟随 指针到达包含该值的行。 5.1.1什么是索引 数据库中的索引与书籍中的目录类似。在一本书中,无须阅读整本书,利用目录就可以快速查 找…

如何实现加盐加密

自己实现 传统MD5可通过彩虹表暴力破解, 加盐加密算法是一种常用的密码保护方法,它将一个随机字符串(盐)添加到原始密码中,然后再进行加密处理。 1. 每次调用方法产生一个唯一盐值(UUID )密码…

海关查验到底查些什么,又有哪些注意事项呢?

“海关查验”是什么? 海关查验是指海关在接受报关单位的申报后,依法为确定进出境货物的性质、原产地、货物状况、数量和价值是否与货物申报单上已填报的详细内容相符,对货物进行实际检查的行政执法行为。查验是国家赋予海关的一种依法行政的…

显卡算力总结

2023年12月 显卡天梯图 FP32浮点性能 性能排行榜 | TopCPU.net2023年12月 最新的显卡天梯图和 FP32浮点性能 性能排行榜,包括浮点性能排名、测试得分和规格数据。跑分对比、基准测试比较。 https://www.topcpu.net/cpu-r5 显卡显存(G)浮点算…

电商用户行为可视化分析

1、导包 import pandas as pd import numpy as np import seaborn as sns import matplotlib.pyplot as plt import pyecharts.options as opts from pyecharts.charts import Line from pyecharts.charts import Grid 2、导数据 t_f_user pd.read_csv("tianchi_fresh…