应急指挥与调度子模块示例

news2025/1/8 12:18:37

下面给出一个 “应急指挥与调度模块” 的示例实现,展示如何通过消息队列数据库相结合,来完成应急事件的创建、任务分派、资源调度、进度汇报等功能。该模块与之前的“数据采集与管理”、“实时监测与预警”、“智能分析与辅助决策”等模块一样,可被视为微服务之一,接入到整个自然灾害应急管理系统当中。

重要说明

  1. 这是一个演示或原型级示例,适合初步了解“应急指挥与调度”在消息驱动场景下的整体思路。
  2. 实际生产环境需要补充:安全鉴权、RBAC/多角色访问控制、日志审计、分布式部署、高可用、CI/CD 集成、监控告警等工程化环节。
  3. 以下示例将代码集中在一个 main.py 文件里,便于一次性查看;实际开发中应进行更好的模块化拆分和工程化管理。
  4. 演示使用 RabbitMQ 作为消息队列(“消息的 Python 代码”),也可替换成 Kafka、Redis Stream、或其他消息中间件。

一、目录结构示例

假设我们建立一个名为 command-dispatch-service 的文件夹,用于存放本微服务代码。示例目录如下:

command-dispatch-service
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
├── .env
├── app
│   ├── main.py       # FastAPI入口 + RabbitMQ 消费/生产示例
│   ├── config.py     # 可选: 存放数据库、MQ等配置
│   └── models.py     # 可选: ORM等
└── README.md

下面的示例会将所有核心内容合并到一个文件 main.py 中,方便展示。


二、依赖文件

1. requirements.txt

示例依赖(版本仅供参考):

fastapi==0.95.2
uvicorn==0.22.0
pydantic==1.10.7
SQLAlchemy==1.4.46
psycopg2-binary==2.9.6
python-dotenv==1.0.0

# RabbitMQ/AMQP 客户端
pika==1.3.1

若你打算使用 Kafka,可安装 kafka-pythonconfluent-kafka 并在代码中做相应改动。


2. .env(示例)

存放环境变量,如数据库配置、RabbitMQ配置等:

DB_HOST=localhost
DB_PORT=5432
DB_NAME=dispatch_db
DB_USER=myuser
DB_PASS=mypass

RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASS=guest

# RabbitMQ中的Exchange、Queue名称等
RABBITMQ_EXCHANGE=emergency_exchange
RABBITMQ_QUEUE=task_queue

3. docker-compose.yml(示例)

可快速启动 PostgreSQL、RabbitMQ、以及本服务;示例如下:

version: '3.8'

services:
  db:
    image: postgres:14
    container_name: dispatch_db
    environment:
      POSTGRES_USER: myuser
      POSTGRES_PASSWORD: mypass
      POSTGRES_DB=dispatch_db
    ports:
      - "5432:5432"
    volumes:
      - db_data:/var/lib/postgresql/data

  rabbitmq:
    image: rabbitmq:3.9-management
    container_name: dispatch_rabbitmq
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest
    ports:
      - "5672:5672"
      - "15672:15672"  # RabbitMQ管理UI端口
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq

  dispatch_api:
    build: .
    container_name: dispatch_api
    depends_on:
      - db
      - rabbitmq
    environment:
      - DB_HOST=db
      - DB_PORT=5432
      - DB_NAME=dispatch_db
      - DB_USER=myuser
      - DB_PASS=mypass
      - RABBITMQ_HOST=rabbitmq
      - RABBITMQ_PORT=5672
      - RABBITMQ_USER=guest
      - RABBITMQ_PASS=guest
      - RABBITMQ_EXCHANGE=emergency_exchange
      - RABBITMQ_QUEUE=task_queue
    ports:
      - "8300:8300"
    command: ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8300"]

volumes:
  db_data:
  rabbitmq_data:

三、app/main.py:应急指挥与调度模块完整示例

"""
示例:应急指挥与调度模块 (Command & Dispatch Module with RabbitMQ)

主要功能:
1. 事件(Incident)管理:创建应急事件、记录基本信息(类型、等级、地点等)。
2. 任务(Task)管理:指挥员在某个事件下创建/分配任务,指定执行部门/小组。
3. 资源(Resource)与调度:登记救援队、车辆、物资等,对接RabbitMQ进行分发或状态变更。
4. 消息队列交互:当有新任务或任务状态更新时,通过RabbitMQ发送消息,其他系统或内部消费线程可接收并处理。
5. 任务进度/报告回传:任务执行方可以发布进度到队列或通过API回传,指挥员在系统内查看执行情况。

注意:仅为原型示例,实际项目需补充安全、日志、权限、容灾、高可用等。
"""

import os
import time
import threading
from datetime import datetime
from typing import Optional, List

from fastapi import FastAPI, Body, Depends, HTTPException
from pydantic import BaseModel, Field
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean, ForeignKey
from sqlalchemy.orm import sessionmaker, declarative_base, Session, relationship
from sqlalchemy.exc import SQLAlchemyError

# 环境变量加载
from dotenv import load_dotenv
load_dotenv()

# ============== 数据库配置 ==============
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = os.getenv("DB_PORT", "5432")
DB_NAME = os.getenv("DB_NAME", "dispatch_db")
DB_USER = os.getenv("DB_USER", "myuser")
DB_PASS = os.getenv("DB_PASS", "mypass")

DATABASE_URL = f"postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

engine = create_engine(DATABASE_URL, echo=False)
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
Base = declarative_base()

# ============== RabbitMQ配置 ==============
RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "localhost")
RABBITMQ_PORT = int(os.getenv("RABBITMQ_PORT", "5672"))
RABBITMQ_USER = os.getenv("RABBITMQ_USER", "guest")
RABBITMQ_PASS = os.getenv("RABBITMQ_PASS", "guest")
RABBITMQ_EXCHANGE = os.getenv("RABBITMQ_EXCHANGE", "emergency_exchange")
RABBITMQ_QUEUE = os.getenv("RABBITMQ_QUEUE", "task_queue")

import pika
import json

# ============== 数据库ORM模型 ==============
class Incident(Base):
    """
    应急事件表,如地震、洪水、火灾等事件的基本信息
    """
    __tablename__ = "incidents"

    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    incident_type = Column(String(50))     # 事件类型,如 'earthquake', 'flood', 'fire'
    severity_level = Column(String(20))    # 事件严重程度,例: 'red', 'orange', 'yellow', 'blue' 或数字分级
    location = Column(String(100))         # 事件地点
    description = Column(String(255))      # 事件简要描述
    created_at = Column(DateTime, default=datetime.utcnow)

    tasks = relationship("Task", back_populates="incident")

class Task(Base):
    """
    任务表,记录在某个事件下创建的具体任务
    """
    __tablename__ = "tasks"

    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    incident_id = Column(Integer, ForeignKey("incidents.id"))
    task_name = Column(String(100))
    assigned_department = Column(String(50))  # 指派给哪个部门或小组
    status = Column(String(20), default="pending")  # 任务状态: pending, in_progress, completed, canceled
    created_at = Column(DateTime, default=datetime.utcnow)

    incident = relationship("Incident", back_populates="tasks")

class Resource(Base):
    """
    资源表:记录可用的救援队、车辆、物资等
    """
    __tablename__ = "resources"

    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    resource_name = Column(String(100))
    resource_type = Column(String(50))   # 'vehicle', 'team', 'equipment' 等
    is_available = Column(Boolean, default=True) 
    location = Column(String(100), nullable=True)
    created_at = Column(DateTime, default=datetime.utcnow)

# ============== 初始化数据库 ==============
def init_db():
    Base.metadata.create_all(bind=engine)

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# ============== Pydantic Schemas ==============
class IncidentCreate(BaseModel):
    incident_type: str
    severity_level: str
    location: str
    description: Optional[str]

class IncidentResponse(BaseModel):
    id: int
    incident_type: str
    severity_level: str
    location: str
    description: Optional[str]
    created_at: datetime

    class Config:
        orm_mode = True

class TaskCreate(BaseModel):
    incident_id: int
    task_name: str
    assigned_department: str

class TaskUpdateStatus(BaseModel):
    status: str  # pending, in_progress, completed, canceled

class TaskResponse(BaseModel):
    id: int
    incident_id: int
    task_name: str
    assigned_department: str
    status: str
    created_at: datetime

    class Config:
        orm_mode = True

class ResourceCreate(BaseModel):
    resource_name: str
    resource_type: str
    location: Optional[str]

class ResourceUpdate(BaseModel):
    is_available: Optional[bool]
    location: Optional[str]

class ResourceResponse(BaseModel):
    id: int
    resource_name: str
    resource_type: str
    is_available: bool
    location: Optional[str]
    created_at: datetime

    class Config:
        orm_mode = True


# ============== FastAPI应用初始化 ==============
app = FastAPI(
    title="Command & Dispatch Service",
    description="应急指挥与调度模块示例 (使用RabbitMQ进行任务消息分发)",
    version="0.1.0",
)

# ============== RabbitMQ工具函数 ==============
def get_rabbitmq_connection():
    """
    创建并返回一个RabbitMQ连接
    """
    credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(
            host=RABBITMQ_HOST, port=RABBITMQ_PORT, credentials=credentials
        )
    )
    return connection

def publish_task_message(task_data: dict):
    """
    将新创建的任务或任务状态更新发布到RabbitMQ中
    其他下游服务或工作者可订阅 queue 获取任务信息。
    """
    try:
        connection = get_rabbitmq_connection()
        channel = connection.channel()
        # 声明Exchange和Queue并进行绑定
        channel.exchange_declare(exchange=RABBITMQ_EXCHANGE, exchange_type='fanout', durable=True)
        channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True)
        channel.queue_bind(queue=RABBITMQ_QUEUE, exchange=RABBITMQ_EXCHANGE)

        message_body = json.dumps(task_data)
        channel.basic_publish(
            exchange=RABBITMQ_EXCHANGE,
            routing_key="",
            body=message_body,
            properties=pika.BasicProperties(
                delivery_mode=2  # 使消息可持久化
            )
        )
        channel.close()
        connection.close()
        print(f"[MQ] 已发布任务消息: {task_data}")
    except Exception as e:
        print("[ERROR] 发布到RabbitMQ失败:", e)


# ============== RabbitMQ 消费线程(可选) ==============
def consume_task_updates():
    """
    作为后台线程示例,用于接收下游(如救援队APP)对任务的反馈/进度更新消息。
    模拟:若真实需求需要双向通信,可建立另一个队列或Exchange接收消息后更新数据库。
    """
    try:
        connection = get_rabbitmq_connection()
        channel = connection.channel()
        channel.queue_declare(queue="task_updates", durable=True)  # 示例: 另一个队列

        def callback(ch, method, properties, body):
            msg = json.loads(body.decode("utf-8"))
            print(f"[MQ-Consume] 收到下游反馈: {msg}")
            # 此处可更新数据库中对应任务的状态或进度
            # TODO: 根据具体需求自行实现
            ch.basic_ack(delivery_tag=method.delivery_tag)

        channel.basic_consume(queue="task_updates", on_message_callback=callback, auto_ack=False)
        print("[MQ] consume_task_updates 线程已启动,等待下游进度反馈...")
        channel.start_consuming()
    except Exception as e:
        print("[ERROR] 消费下游进度队列失败:", e)
        time.sleep(5)


# ============== 启动事件 ==============
@app.on_event("startup")
def on_startup():
    init_db()
    print("[INFO] 数据表初始化完成(若无则自动创建)")

    # 启动一个后台线程以监听下游反馈 (可选)
    consumer_thread = threading.Thread(target=consume_task_updates, daemon=True)
    consumer_thread.start()


# ============== Incident 相关接口 ==============
@app.post("/incidents", response_model=IncidentResponse, summary="创建应急事件")
def create_incident(incident_data: IncidentCreate, db: Session = Depends(get_db)):
    new_incident = Incident(
        incident_type=incident_data.incident_type,
        severity_level=incident_data.severity_level,
        location=incident_data.location,
        description=incident_data.description
    )
    try:
        db.add(new_incident)
        db.commit()
        db.refresh(new_incident)
        return new_incident
    except SQLAlchemyError as e:
        db.rollback()
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/incidents", response_model=List[IncidentResponse], summary="查看所有应急事件")
def list_incidents(db: Session = Depends(get_db)):
    incidents = db.query(Incident).order_by(Incident.created_at.desc()).all()
    return incidents

@app.get("/incidents/{incident_id}", response_model=IncidentResponse, summary="根据ID查看事件详情")
def get_incident(incident_id: int, db: Session = Depends(get_db)):
    incident = db.query(Incident).filter(Incident.id == incident_id).first()
    if not incident:
        raise HTTPException(status_code=404, detail="Incident not found")
    return incident


# ============== Task 相关接口 ==============
@app.post("/tasks", response_model=TaskResponse, summary="在事件下创建任务")
def create_task(task_data: TaskCreate, db: Session = Depends(get_db)):
    # 检查incident是否存在
    incident = db.query(Incident).filter(Incident.id == task_data.incident_id).first()
    if not incident:
        raise HTTPException(status_code=404, detail="Incident not found")

    new_task = Task(
        incident_id=task_data.incident_id,
        task_name=task_data.task_name,
        assigned_department=task_data.assigned_department,
        status="pending"
    )
    try:
        db.add(new_task)
        db.commit()
        db.refresh(new_task)
        # 发布消息到RabbitMQ,通知下游(比如部门调度系统或人员App)
        publish_task_message({
            "event": "task_created",
            "task_id": new_task.id,
            "incident_id": new_task.incident_id,
            "task_name": new_task.task_name,
            "assigned_department": new_task.assigned_department,
            "status": new_task.status
        })
        return new_task
    except SQLAlchemyError as e:
        db.rollback()
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/tasks", response_model=List[TaskResponse], summary="查看所有任务")
def list_tasks(db: Session = Depends(get_db)):
    tasks = db.query(Task).order_by(Task.created_at.desc()).all()
    return tasks

@app.get("/tasks/{task_id}", response_model=TaskResponse, summary="查看任务详情")
def get_task(task_id: int, db: Session = Depends(get_db)):
    task = db.query(Task).filter(Task.id == task_id).first()
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")
    return task

@app.put("/tasks/{task_id}/status", response_model=TaskResponse, summary="更新任务状态")
def update_task_status(task_id: int, update_data: TaskUpdateStatus, db: Session = Depends(get_db)):
    task = db.query(Task).filter(Task.id == task_id).first()
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    if update_data.status not in ["pending", "in_progress", "completed", "canceled"]:
        raise HTTPException(status_code=400, detail="Invalid status value")

    task.status = update_data.status
    try:
        db.commit()
        db.refresh(task)
        # 发布消息到RabbitMQ,通知下游(比如监控大屏或其它服务)
        publish_task_message({
            "event": "task_status_updated",
            "task_id": task.id,
            "incident_id": task.incident_id,
            "task_name": task.task_name,
            "assigned_department": task.assigned_department,
            "status": task.status
        })
        return task
    except SQLAlchemyError as e:
        db.rollback()
        raise HTTPException(status_code=500, detail=str(e))


# ============== Resource 相关接口 ==============
@app.post("/resources", response_model=ResourceResponse, summary="登记可用资源")
def create_resource(res_data: ResourceCreate, db: Session = Depends(get_db)):
    new_res = Resource(
        resource_name=res_data.resource_name,
        resource_type=res_data.resource_type,
        location=res_data.location
    )
    try:
        db.add(new_res)
        db.commit()
        db.refresh(new_res)
        return new_res
    except SQLAlchemyError as e:
        db.rollback()
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/resources", response_model=List[ResourceResponse], summary="查看资源清单")
def list_resources(db: Session = Depends(get_db)):
    rs = db.query(Resource).order_by(Resource.created_at.desc()).all()
    return rs

@app.get("/resources/{resource_id}", response_model=ResourceResponse, summary="查看资源详情")
def get_resource(resource_id: int, db: Session = Depends(get_db)):
    r = db.query(Resource).filter(Resource.id == resource_id).first()
    if not r:
        raise HTTPException(status_code=404, detail="Resource not found")
    return r

@app.put("/resources/{resource_id}", response_model=ResourceResponse, summary="更新资源信息")
def update_resource(resource_id: int, update_data: ResourceUpdate, db: Session = Depends(get_db)):
    r = db.query(Resource).filter(Resource.id == resource_id).first()
    if not r:
        raise HTTPException(status_code=404, detail="Resource not found")

    if update_data.is_available is not None:
        r.is_available = update_data.is_available
    if update_data.location is not None:
        r.location = update_data.location
    try:
        db.commit()
        db.refresh(r)
        return r
    except SQLAlchemyError as e:
        db.rollback()
        raise HTTPException(status_code=500, detail=str(e))

说明

  • consume_task_updates() 函数中,我们声明了一个名为 task_updates 的队列来模拟下游反馈。当救援队、物资运输等执行方通过某种方式(APP、设备端)向此队列发送进度消息时,我们可在此后台线程中消费,并更新数据库中的任务状态。
  • 如果你只需要单向下行分发(仅指挥中心→执行方),也可以不写此消费者线程,或者将它替换成更轻量的做法。
  • publish_task_message() 用于将关键任务信息推送到 RabbitMQ,其他微服务或工作节点只需订阅相同 exchange / queue 即可实时获取任务变更信息,用于更新前端大屏、手机APP、或其他子系统。

四、如何运行

  1. 手动安装/运行

    • 安装 Python 3.9+,并安装依赖:

      pip install -r requirements.txt
      
    • 保证你有一套本地/远程 PostgreSQL、RabbitMQ 环境,并在 .env 中正确配置连接信息。

    • 运行服务:

      uvicorn app.main:app --host 0.0.0.0 --port 8300
      
    • 访问 http://localhost:8300/docs 打开自动生成的Swagger API文档进行调试。

  2. Docker Compose 方式

    • 运行:

      docker-compose up -d --build
      
    • 这会启动:

      • db (PostgreSQL) - 端口 5432
      • rabbitmq (RabbitMQ) - 端口 5672 (以及 15672 供Web管理)
      • dispatch_api (本FastAPI服务) - 端口 8300
    • 访问 http://localhost:8300/docs 测试接口。


五、测试调用示例

  1. 创建一个应急事件

    curl -X POST http://localhost:8300/incidents \
    -H "Content-Type: application/json" \
    -d '{
      "incident_type": "flood",
      "severity_level": "orange",
      "location": "River-A2",
      "description": "River water level is rising fast"
    }'
    
  2. 在此事件下创建任务

    curl -X POST http://localhost:8300/tasks \
    -H "Content-Type: application/json" \
    -d '{
      "incident_id": 1,
      "task_name": "Evacuate nearby residents",
      "assigned_department": "CivilAffairs"
    }'
    
    • 成功后,会自动向 RabbitMQ 发布 task_created 消息;其他订阅者会收到此任务信息。
  3. 更新任务状态

    curl -X PUT http://localhost:8300/tasks/1/status \
    -H "Content-Type: application/json" \
    -d '{
      "status": "in_progress"
    }'
    
    • 成功后,会发布 task_status_updated 消息给MQ,显示该任务从 pending -> in_progress
  4. 登记可用资源(如某救援队或车辆)

    curl -X POST http://localhost:8300/resources \
    -H "Content-Type: application/json" \
    -d '{
      "resource_name": "Rescue Team A",
      "resource_type": "team",
      "location": "Station-B1"
    }'
    
  5. 查看 RabbitMQ

    • 打开浏览器访问 http://localhost:15672(用户名密码默认为 guest/guest),可查看 emergency_exchangetask_queue 中的消息发布情况;
    • 如有下游系统订阅了 task_queue,就能获取到 task_created / task_status_updated 的消息进行相应处理。

六、关键逻辑

  1. 事件 (Incident)

    • 表示一次实际发生的紧急情况,如洪水或地震;包含事件类型、严重度、地点、描述等信息。
    • 通过 /incidents 系列接口进行增删改查。
  2. 任务 (Task)

    • 每个事件下会有若干任务,如“搜救某区域” “紧急撤离” “修复堤坝” 等;
    • status 用于记录当前任务所处的阶段 (pending, in_progress, completed, canceled)。
    • 创建或更新时,会通过 RabbitMQ 广播消息,方便其他系统或团队监听并执行。
  3. 资源 (Resource)

    • 登记救援队、车辆、船只、直升机、医疗物资等;也可设置 is_availablelocation 等属性随时更新。
    • 在更复杂的场景下,可实现资源自动/半自动分配给任务,并在更新资源状态时也可通知其他模块(如地图可视化、调度优化算法等)。
  4. 消息驱动

    • 本示例使用 publish_task_message() 将关键事件 (task_created, task_status_updated) 推送到 RabbitMQ;
    • 其他微服务或设备可以订阅 task_queue 实时获取调度指令,实现松耦合实时性
    • 同理,还演示了一个可选的消费者 consume_task_updates(),若需要执行方回传进度,就可以在另一个队列 task_updates 中收消息,更新数据库对应的 status

七、补充扩展

  1. 权限管理

    • 不同角色(指挥中心、政府部门、社会组织、志愿者)在调用 API 时可能有不同操作权限;
    • 建议采用 OAuth2/JWT 或自定义 RBAC 体系进行安全管控。
  2. 多灾种耦合/多服务协同

    • 本服务可与“实时监测与预警”服务联动:当监测到触发条件后,自动创建 Incident 并分派初始任务;
    • 可与“智能分析与辅助决策”服务联动:在创建任务或调度资源时,调用优化/预测模型,给出调度建议或路线规划。
  3. 更完善的资源管理

    • 资源的数量、容量、属性分配算法
    • 多个任务对同一资源的竞争与优先级调度;
    • 实时位置追踪(GPS)并结合GIS地图。
  4. 日志与审计

    • 关键操作(事件创建、任务分配、状态变更)需记录审计日志;
    • 满足政府或行业对于应急管理流程合规性的要求。
  5. 多副本与分布式

    • 在灾害高峰时请求量激增,可进行微服务多副本部署、RabbitMQ集群化、数据库读写分离等。

使用消息队列进行任务分发进度同步的“应急指挥与调度模块”,实现其核心功能:事件管理、任务管理、资源管理,并通过 RabbitMQ 或者其他消息中间件将指令/状态改变实时广播给相关服务或客户端。

在真实环境中,结合安全鉴权、分布式部署、高可用、日志审计、可观测性等工程实践,才能形成一套可靠的多灾种应急管理指挥系统,助力更高效地统筹资源、指挥调度、应对自然灾害或突发事件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2273206.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何快速上手一个鸿蒙工程

作为一名鸿蒙程序猿,当你换了一家公司,或者被交接了一个已有的业务。前辈在找你之前十分钟写了一个他都看不懂的交接文档,然后把一个鸿蒙工程交接给你了,说以后就是你负责了。之后几天你的状态大概就是下边这样的,一堆…

FPGA实现UART对应的电路和单片机内部配合寄存器实现的电路到底有何区别?

一、UART相关介绍 UART是我们常用的全双工异步串行总线,常用TTL电平标准,由TXD和RXD两根收发数据线组成。 那么,利用硬件描述语言实现UART对应的电路和51单片机内部配合寄存器实现的电路到底有何区别呢?接下来我们对照看一下。 …

patchwork++地面分割学习笔记

参考资料:古月居 - ROS机器人知识分享社区 https://zhuanlan.zhihu.com/p/644297447 patchwork算法一共包含四部分内容:提出了以下四个部分:RNR、RVPF、A-GLE 和 TGR。 1)基于 3D LiDAR 反射模型的反射噪声消除 (RNR)&#xff…

【VScode】设置代理,通过代理连接服务器

文章目录 VScode编辑器设置代理1.图形化界面1.1 进入proxy设置界面1.2 配置代理服务器 2.配置文件(推荐)2.1 打开setting.json 文件2.2 配置代理 VScode编辑器设置代理 根据情况安装nmap 1.图形化界面 1.1 进入proxy设置界面 或者使用快捷键ctrl , 。…

【HarmonyOS】鸿蒙应用点9图的处理(draw9patch)

【HarmonyOS】鸿蒙应用点9图的处理(draw9patch) 一、前言: 首先在鸿蒙中是不支持安卓 .9图的图片直接使用。只有类似拉伸的处理方案,鸿蒙提供的Image组件有与点九图相同功能的API设置。 可以通过设置resizable属性来设置Resiza…

光伏仿真与设计系统应用架构深度剖析

在光伏产业蓬勃发展的时代背景下,绿虫光伏仿真与设计系统成为推动其高效发展的核心力量。其应用架构涵盖多个关键步骤,每个环节都紧密相扣,共同构建起精准且高效的设计体系。 气象分析作为开篇之笔,起着基石般的重要作用。系统全…

k8s dashboard离线部署步骤

确定k8s版本,以1.23为例。 部署metrics-server服务,最好用v0.5.2。 用v0.6.0,可能会报以下错误: nodekubemaster:~/Desktop/metric$ kubectl top nodes Error from server (ServiceUnavailable): the server is currently unabl…

05-Linux系统编程之进程(下)

一、子进程资源回收 1.概述 在每个进程退出的时候,内核释放该进程所有的资源,包括一些存储在栈区、全局区的数据、打开的文件、占用的内存等。但是仍有一部分信息没有释放,这些信息主要指进程控制块 PCB 的信息(包括进程号、退出…

HDFS异构存储和存储策略

一、HDFS异构存储类型 1.1 冷、热、温、冻数据 通常,公司或者组织总是有相当多的历史数据占用昂贵的存储空间。典型的数据使用模式是新传入的数据被应用程序大量使用,从而该数据被标记为"热"数据。随着时间的推移,存储的数据每周…

【51单片机】02LED流水灯实验

点亮你的LED 一、点亮第一个LED1.GPIO介绍2.P1、P2、P3端口 二、LED实验2.尝试点亮LED3.LED流水灯 一、点亮第一个LED 1.GPIO介绍 这块内容这里可以做简单的了解,与数电知识强相关。后续可以再回过头来学习 GPIO (general purpose input output) 通用输入输出端口…

springboot 集成 etcd

springboot 集成 etcd 往期内容 ETCD 简介docker部署ETCD 前言 好久不见各位小伙伴们,上两期内容中,我们对于分布式kv存储中间件有了简单的认识,完成了docker-compose 部署etcd集群以及可视化工具 etcd Keeper,既然有了认识&a…

云安全相关博客阅读(一)

2024-03-04 Cloudflare announces Firewall for AI 关注问题: 传统的WAF功能能够保护web和api安全,但是随着LLM等AI模型等出现,保护这些AI相关应用等安全是一个新出现的问题虽然AI应用是新的场景,但是以往的攻击方法也能够直接用…

2025年01月07日Github流行趋势

项目名称:khoj 项目地址url:https://github.com/khoj-ai/khoj项目语言:Python历史star数:20105今日star数:363项目维护者:debanjum, sabaimran, MythicalCow, aam-at, shantanuSakpal项目简介:你…

从零手写线性回归模型:PyTorch 实现深度学习入门教程

系列文章目录 01-PyTorch新手必看:张量是什么?5 分钟教你快速创建张量! 02-张量运算真简单!PyTorch 数值计算操作完全指南 03-Numpy 还是 PyTorch?张量与 Numpy 的神奇转换技巧 04-揭秘数据处理神器:PyTor…

【python】matplotlib(radar chart)

文章目录 1、功能描述和原理介绍2、代码实现3、效果展示4、完整代码5、多个雷达图绘制在一张图上6、参考 1、功能描述和原理介绍 基于 matplotlib 实现雷达图的绘制 一、雷达图的基本概念 雷达图(Radar Chart),也被称为蛛网图或星型图&…

数据库环境安装(day1)

网址:MySQL 下载(环境准备): (2-5点击此处,然后选择合适的版本) 1.linux在线YUM仓库 下载/安装: wget https://repo.mysql.com//mysql84-community-release-el9-1.noarch.rpm rpm -i https://r…

Fabric链码部署测试

参考链接:运行 Fabric 应用程序 — Hyperledger Fabric Docs 主文档 (hyperledger-fabric.readthedocs.io) (2)fabric2.4.3部署运行自己的链码 - 知乎 (zhihu.com) Fabric2.0测试网络部署链码 - 辉哥哥~ - 博客园 (cnblogs.com) 1.启动测试…

数据结构与算法之二叉树: LeetCode 107. 二叉树的层序遍历 II (Ts版)

二叉树的层序遍历 II https://leetcode.cn/problems/binary-tree-level-order-traversal-ii/description/ 描述 给你二叉树的根节点 root ,返回其节点值 自底向上的层序遍历 。 (即按从叶子节点所在层到根节点所在的层,逐层从左向右遍历&a…

Python插件化开发实战:开发个图片浏览器

在本篇教程中,我将详细介绍如何使用Python开发一个基于插件架构的图片浏览器。这个项目将展示如何实现插件系统、如何处理图片显示,以及如何使用wxPython构建GUI界面。 “C:\pythoncode\pythonplugin\your_project\main_app.py” 项目概述 我们将开发一个具有以下…

根据python代码自动生成类图的实现方法[附带python源码]

概述 利用python库抽象语法树(AST)和类图描述语言(PlantUML),实现自动将python代码生成类图的目的。 环境 windowsvscodepythonplantuml ✒️网上好像大部分都是用Pyreverse库来实现的,但是我实际测试发现只能在一个文件中才能行,当然应该有解决方法…