下面给出一个 “应急指挥与调度模块” 的示例实现,展示如何通过消息队列与数据库相结合,来完成应急事件的创建、任务分派、资源调度、进度汇报等功能。该模块与之前的“数据采集与管理”、“实时监测与预警”、“智能分析与辅助决策”等模块一样,可被视为微服务之一,接入到整个自然灾害应急管理系统当中。
重要说明
- 这是一个演示或原型级示例,适合初步了解“应急指挥与调度”在消息驱动场景下的整体思路。
- 实际生产环境需要补充:安全鉴权、RBAC/多角色访问控制、日志审计、分布式部署、高可用、CI/CD 集成、监控告警等工程化环节。
- 以下示例将代码集中在一个
main.py
文件里,便于一次性查看;实际开发中应进行更好的模块化拆分和工程化管理。- 演示使用 RabbitMQ 作为消息队列(“消息的 Python 代码”),也可替换成 Kafka、Redis Stream、或其他消息中间件。
一、目录结构示例
假设我们建立一个名为 command-dispatch-service
的文件夹,用于存放本微服务代码。示例目录如下:
command-dispatch-service
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
├── .env
├── app
│ ├── main.py # FastAPI入口 + RabbitMQ 消费/生产示例
│ ├── config.py # 可选: 存放数据库、MQ等配置
│ └── models.py # 可选: ORM等
└── README.md
下面的示例会将所有核心内容合并到一个文件 main.py
中,方便展示。
二、依赖文件
1. requirements.txt
示例依赖(版本仅供参考):
fastapi==0.95.2
uvicorn==0.22.0
pydantic==1.10.7
SQLAlchemy==1.4.46
psycopg2-binary==2.9.6
python-dotenv==1.0.0
# RabbitMQ/AMQP 客户端
pika==1.3.1
若你打算使用 Kafka,可安装 kafka-python
或 confluent-kafka
并在代码中做相应改动。
2. .env
(示例)
存放环境变量,如数据库配置、RabbitMQ配置等:
DB_HOST=localhost
DB_PORT=5432
DB_NAME=dispatch_db
DB_USER=myuser
DB_PASS=mypass
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASS=guest
# RabbitMQ中的Exchange、Queue名称等
RABBITMQ_EXCHANGE=emergency_exchange
RABBITMQ_QUEUE=task_queue
3. docker-compose.yml
(示例)
可快速启动 PostgreSQL、RabbitMQ、以及本服务;示例如下:
version: '3.8'
services:
db:
image: postgres:14
container_name: dispatch_db
environment:
POSTGRES_USER: myuser
POSTGRES_PASSWORD: mypass
POSTGRES_DB=dispatch_db
ports:
- "5432:5432"
volumes:
- db_data:/var/lib/postgresql/data
rabbitmq:
image: rabbitmq:3.9-management
container_name: dispatch_rabbitmq
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
ports:
- "5672:5672"
- "15672:15672" # RabbitMQ管理UI端口
volumes:
- rabbitmq_data:/var/lib/rabbitmq
dispatch_api:
build: .
container_name: dispatch_api
depends_on:
- db
- rabbitmq
environment:
- DB_HOST=db
- DB_PORT=5432
- DB_NAME=dispatch_db
- DB_USER=myuser
- DB_PASS=mypass
- RABBITMQ_HOST=rabbitmq
- RABBITMQ_PORT=5672
- RABBITMQ_USER=guest
- RABBITMQ_PASS=guest
- RABBITMQ_EXCHANGE=emergency_exchange
- RABBITMQ_QUEUE=task_queue
ports:
- "8300:8300"
command: ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8300"]
volumes:
db_data:
rabbitmq_data:
三、app/main.py
:应急指挥与调度模块完整示例
"""
示例:应急指挥与调度模块 (Command & Dispatch Module with RabbitMQ)
主要功能:
1. 事件(Incident)管理:创建应急事件、记录基本信息(类型、等级、地点等)。
2. 任务(Task)管理:指挥员在某个事件下创建/分配任务,指定执行部门/小组。
3. 资源(Resource)与调度:登记救援队、车辆、物资等,对接RabbitMQ进行分发或状态变更。
4. 消息队列交互:当有新任务或任务状态更新时,通过RabbitMQ发送消息,其他系统或内部消费线程可接收并处理。
5. 任务进度/报告回传:任务执行方可以发布进度到队列或通过API回传,指挥员在系统内查看执行情况。
注意:仅为原型示例,实际项目需补充安全、日志、权限、容灾、高可用等。
"""
import os
import time
import threading
from datetime import datetime
from typing import Optional, List
from fastapi import FastAPI, Body, Depends, HTTPException
from pydantic import BaseModel, Field
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean, ForeignKey
from sqlalchemy.orm import sessionmaker, declarative_base, Session, relationship
from sqlalchemy.exc import SQLAlchemyError
# 环境变量加载
from dotenv import load_dotenv
load_dotenv()
# ============== 数据库配置 ==============
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = os.getenv("DB_PORT", "5432")
DB_NAME = os.getenv("DB_NAME", "dispatch_db")
DB_USER = os.getenv("DB_USER", "myuser")
DB_PASS = os.getenv("DB_PASS", "mypass")
DATABASE_URL = f"postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_engine(DATABASE_URL, echo=False)
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
Base = declarative_base()
# ============== RabbitMQ配置 ==============
RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "localhost")
RABBITMQ_PORT = int(os.getenv("RABBITMQ_PORT", "5672"))
RABBITMQ_USER = os.getenv("RABBITMQ_USER", "guest")
RABBITMQ_PASS = os.getenv("RABBITMQ_PASS", "guest")
RABBITMQ_EXCHANGE = os.getenv("RABBITMQ_EXCHANGE", "emergency_exchange")
RABBITMQ_QUEUE = os.getenv("RABBITMQ_QUEUE", "task_queue")
import pika
import json
# ============== 数据库ORM模型 ==============
class Incident(Base):
"""
应急事件表,如地震、洪水、火灾等事件的基本信息
"""
__tablename__ = "incidents"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
incident_type = Column(String(50)) # 事件类型,如 'earthquake', 'flood', 'fire'
severity_level = Column(String(20)) # 事件严重程度,例: 'red', 'orange', 'yellow', 'blue' 或数字分级
location = Column(String(100)) # 事件地点
description = Column(String(255)) # 事件简要描述
created_at = Column(DateTime, default=datetime.utcnow)
tasks = relationship("Task", back_populates="incident")
class Task(Base):
"""
任务表,记录在某个事件下创建的具体任务
"""
__tablename__ = "tasks"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
incident_id = Column(Integer, ForeignKey("incidents.id"))
task_name = Column(String(100))
assigned_department = Column(String(50)) # 指派给哪个部门或小组
status = Column(String(20), default="pending") # 任务状态: pending, in_progress, completed, canceled
created_at = Column(DateTime, default=datetime.utcnow)
incident = relationship("Incident", back_populates="tasks")
class Resource(Base):
"""
资源表:记录可用的救援队、车辆、物资等
"""
__tablename__ = "resources"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
resource_name = Column(String(100))
resource_type = Column(String(50)) # 'vehicle', 'team', 'equipment' 等
is_available = Column(Boolean, default=True)
location = Column(String(100), nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
# ============== 初始化数据库 ==============
def init_db():
Base.metadata.create_all(bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# ============== Pydantic Schemas ==============
class IncidentCreate(BaseModel):
incident_type: str
severity_level: str
location: str
description: Optional[str]
class IncidentResponse(BaseModel):
id: int
incident_type: str
severity_level: str
location: str
description: Optional[str]
created_at: datetime
class Config:
orm_mode = True
class TaskCreate(BaseModel):
incident_id: int
task_name: str
assigned_department: str
class TaskUpdateStatus(BaseModel):
status: str # pending, in_progress, completed, canceled
class TaskResponse(BaseModel):
id: int
incident_id: int
task_name: str
assigned_department: str
status: str
created_at: datetime
class Config:
orm_mode = True
class ResourceCreate(BaseModel):
resource_name: str
resource_type: str
location: Optional[str]
class ResourceUpdate(BaseModel):
is_available: Optional[bool]
location: Optional[str]
class ResourceResponse(BaseModel):
id: int
resource_name: str
resource_type: str
is_available: bool
location: Optional[str]
created_at: datetime
class Config:
orm_mode = True
# ============== FastAPI应用初始化 ==============
app = FastAPI(
title="Command & Dispatch Service",
description="应急指挥与调度模块示例 (使用RabbitMQ进行任务消息分发)",
version="0.1.0",
)
# ============== RabbitMQ工具函数 ==============
def get_rabbitmq_connection():
"""
创建并返回一个RabbitMQ连接
"""
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=RABBITMQ_HOST, port=RABBITMQ_PORT, credentials=credentials
)
)
return connection
def publish_task_message(task_data: dict):
"""
将新创建的任务或任务状态更新发布到RabbitMQ中
其他下游服务或工作者可订阅 queue 获取任务信息。
"""
try:
connection = get_rabbitmq_connection()
channel = connection.channel()
# 声明Exchange和Queue并进行绑定
channel.exchange_declare(exchange=RABBITMQ_EXCHANGE, exchange_type='fanout', durable=True)
channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True)
channel.queue_bind(queue=RABBITMQ_QUEUE, exchange=RABBITMQ_EXCHANGE)
message_body = json.dumps(task_data)
channel.basic_publish(
exchange=RABBITMQ_EXCHANGE,
routing_key="",
body=message_body,
properties=pika.BasicProperties(
delivery_mode=2 # 使消息可持久化
)
)
channel.close()
connection.close()
print(f"[MQ] 已发布任务消息: {task_data}")
except Exception as e:
print("[ERROR] 发布到RabbitMQ失败:", e)
# ============== RabbitMQ 消费线程(可选) ==============
def consume_task_updates():
"""
作为后台线程示例,用于接收下游(如救援队APP)对任务的反馈/进度更新消息。
模拟:若真实需求需要双向通信,可建立另一个队列或Exchange接收消息后更新数据库。
"""
try:
connection = get_rabbitmq_connection()
channel = connection.channel()
channel.queue_declare(queue="task_updates", durable=True) # 示例: 另一个队列
def callback(ch, method, properties, body):
msg = json.loads(body.decode("utf-8"))
print(f"[MQ-Consume] 收到下游反馈: {msg}")
# 此处可更新数据库中对应任务的状态或进度
# TODO: 根据具体需求自行实现
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue="task_updates", on_message_callback=callback, auto_ack=False)
print("[MQ] consume_task_updates 线程已启动,等待下游进度反馈...")
channel.start_consuming()
except Exception as e:
print("[ERROR] 消费下游进度队列失败:", e)
time.sleep(5)
# ============== 启动事件 ==============
@app.on_event("startup")
def on_startup():
init_db()
print("[INFO] 数据表初始化完成(若无则自动创建)")
# 启动一个后台线程以监听下游反馈 (可选)
consumer_thread = threading.Thread(target=consume_task_updates, daemon=True)
consumer_thread.start()
# ============== Incident 相关接口 ==============
@app.post("/incidents", response_model=IncidentResponse, summary="创建应急事件")
def create_incident(incident_data: IncidentCreate, db: Session = Depends(get_db)):
new_incident = Incident(
incident_type=incident_data.incident_type,
severity_level=incident_data.severity_level,
location=incident_data.location,
description=incident_data.description
)
try:
db.add(new_incident)
db.commit()
db.refresh(new_incident)
return new_incident
except SQLAlchemyError as e:
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
@app.get("/incidents", response_model=List[IncidentResponse], summary="查看所有应急事件")
def list_incidents(db: Session = Depends(get_db)):
incidents = db.query(Incident).order_by(Incident.created_at.desc()).all()
return incidents
@app.get("/incidents/{incident_id}", response_model=IncidentResponse, summary="根据ID查看事件详情")
def get_incident(incident_id: int, db: Session = Depends(get_db)):
incident = db.query(Incident).filter(Incident.id == incident_id).first()
if not incident:
raise HTTPException(status_code=404, detail="Incident not found")
return incident
# ============== Task 相关接口 ==============
@app.post("/tasks", response_model=TaskResponse, summary="在事件下创建任务")
def create_task(task_data: TaskCreate, db: Session = Depends(get_db)):
# 检查incident是否存在
incident = db.query(Incident).filter(Incident.id == task_data.incident_id).first()
if not incident:
raise HTTPException(status_code=404, detail="Incident not found")
new_task = Task(
incident_id=task_data.incident_id,
task_name=task_data.task_name,
assigned_department=task_data.assigned_department,
status="pending"
)
try:
db.add(new_task)
db.commit()
db.refresh(new_task)
# 发布消息到RabbitMQ,通知下游(比如部门调度系统或人员App)
publish_task_message({
"event": "task_created",
"task_id": new_task.id,
"incident_id": new_task.incident_id,
"task_name": new_task.task_name,
"assigned_department": new_task.assigned_department,
"status": new_task.status
})
return new_task
except SQLAlchemyError as e:
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
@app.get("/tasks", response_model=List[TaskResponse], summary="查看所有任务")
def list_tasks(db: Session = Depends(get_db)):
tasks = db.query(Task).order_by(Task.created_at.desc()).all()
return tasks
@app.get("/tasks/{task_id}", response_model=TaskResponse, summary="查看任务详情")
def get_task(task_id: int, db: Session = Depends(get_db)):
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
@app.put("/tasks/{task_id}/status", response_model=TaskResponse, summary="更新任务状态")
def update_task_status(task_id: int, update_data: TaskUpdateStatus, db: Session = Depends(get_db)):
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
if update_data.status not in ["pending", "in_progress", "completed", "canceled"]:
raise HTTPException(status_code=400, detail="Invalid status value")
task.status = update_data.status
try:
db.commit()
db.refresh(task)
# 发布消息到RabbitMQ,通知下游(比如监控大屏或其它服务)
publish_task_message({
"event": "task_status_updated",
"task_id": task.id,
"incident_id": task.incident_id,
"task_name": task.task_name,
"assigned_department": task.assigned_department,
"status": task.status
})
return task
except SQLAlchemyError as e:
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
# ============== Resource 相关接口 ==============
@app.post("/resources", response_model=ResourceResponse, summary="登记可用资源")
def create_resource(res_data: ResourceCreate, db: Session = Depends(get_db)):
new_res = Resource(
resource_name=res_data.resource_name,
resource_type=res_data.resource_type,
location=res_data.location
)
try:
db.add(new_res)
db.commit()
db.refresh(new_res)
return new_res
except SQLAlchemyError as e:
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
@app.get("/resources", response_model=List[ResourceResponse], summary="查看资源清单")
def list_resources(db: Session = Depends(get_db)):
rs = db.query(Resource).order_by(Resource.created_at.desc()).all()
return rs
@app.get("/resources/{resource_id}", response_model=ResourceResponse, summary="查看资源详情")
def get_resource(resource_id: int, db: Session = Depends(get_db)):
r = db.query(Resource).filter(Resource.id == resource_id).first()
if not r:
raise HTTPException(status_code=404, detail="Resource not found")
return r
@app.put("/resources/{resource_id}", response_model=ResourceResponse, summary="更新资源信息")
def update_resource(resource_id: int, update_data: ResourceUpdate, db: Session = Depends(get_db)):
r = db.query(Resource).filter(Resource.id == resource_id).first()
if not r:
raise HTTPException(status_code=404, detail="Resource not found")
if update_data.is_available is not None:
r.is_available = update_data.is_available
if update_data.location is not None:
r.location = update_data.location
try:
db.commit()
db.refresh(r)
return r
except SQLAlchemyError as e:
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
说明:
- 在
consume_task_updates()
函数中,我们声明了一个名为task_updates
的队列来模拟下游反馈。当救援队、物资运输等执行方通过某种方式(APP、设备端)向此队列发送进度消息时,我们可在此后台线程中消费,并更新数据库中的任务状态。- 如果你只需要单向下行分发(仅指挥中心→执行方),也可以不写此消费者线程,或者将它替换成更轻量的做法。
publish_task_message()
用于将关键任务信息推送到 RabbitMQ,其他微服务或工作节点只需订阅相同exchange
/queue
即可实时获取任务变更信息,用于更新前端大屏、手机APP、或其他子系统。
四、如何运行
-
手动安装/运行
-
安装 Python 3.9+,并安装依赖:
pip install -r requirements.txt
-
保证你有一套本地/远程 PostgreSQL、RabbitMQ 环境,并在
.env
中正确配置连接信息。 -
运行服务:
uvicorn app.main:app --host 0.0.0.0 --port 8300
-
访问 http://localhost:8300/docs 打开自动生成的Swagger API文档进行调试。
-
-
Docker Compose 方式
-
运行:
docker-compose up -d --build
-
这会启动:
db
(PostgreSQL) - 端口5432
rabbitmq
(RabbitMQ) - 端口5672
(以及15672
供Web管理)dispatch_api
(本FastAPI服务) - 端口8300
-
访问 http://localhost:8300/docs 测试接口。
-
五、测试调用示例
-
创建一个应急事件
curl -X POST http://localhost:8300/incidents \ -H "Content-Type: application/json" \ -d '{ "incident_type": "flood", "severity_level": "orange", "location": "River-A2", "description": "River water level is rising fast" }'
-
在此事件下创建任务
curl -X POST http://localhost:8300/tasks \ -H "Content-Type: application/json" \ -d '{ "incident_id": 1, "task_name": "Evacuate nearby residents", "assigned_department": "CivilAffairs" }'
- 成功后,会自动向 RabbitMQ 发布
task_created
消息;其他订阅者会收到此任务信息。
- 成功后,会自动向 RabbitMQ 发布
-
更新任务状态
curl -X PUT http://localhost:8300/tasks/1/status \ -H "Content-Type: application/json" \ -d '{ "status": "in_progress" }'
- 成功后,会发布
task_status_updated
消息给MQ,显示该任务从pending -> in_progress
。
- 成功后,会发布
-
登记可用资源(如某救援队或车辆)
curl -X POST http://localhost:8300/resources \ -H "Content-Type: application/json" \ -d '{ "resource_name": "Rescue Team A", "resource_type": "team", "location": "Station-B1" }'
-
查看 RabbitMQ
- 打开浏览器访问 http://localhost:15672(用户名密码默认为 guest/guest),可查看
emergency_exchange
、task_queue
中的消息发布情况; - 如有下游系统订阅了
task_queue
,就能获取到task_created
/task_status_updated
的消息进行相应处理。
- 打开浏览器访问 http://localhost:15672(用户名密码默认为 guest/guest),可查看
六、关键逻辑
-
事件 (Incident)
- 表示一次实际发生的紧急情况,如洪水或地震;包含事件类型、严重度、地点、描述等信息。
- 通过
/incidents
系列接口进行增删改查。
-
任务 (Task)
- 每个事件下会有若干任务,如“搜救某区域” “紧急撤离” “修复堤坝” 等;
status
用于记录当前任务所处的阶段 (pending, in_progress, completed, canceled)。- 创建或更新时,会通过 RabbitMQ 广播消息,方便其他系统或团队监听并执行。
-
资源 (Resource)
- 登记救援队、车辆、船只、直升机、医疗物资等;也可设置
is_available
和location
等属性随时更新。 - 在更复杂的场景下,可实现资源自动/半自动分配给任务,并在更新资源状态时也可通知其他模块(如地图可视化、调度优化算法等)。
- 登记救援队、车辆、船只、直升机、医疗物资等;也可设置
-
消息驱动
- 本示例使用
publish_task_message()
将关键事件 (task_created, task_status_updated) 推送到 RabbitMQ; - 其他微服务或设备可以订阅
task_queue
实时获取调度指令,实现松耦合与实时性; - 同理,还演示了一个可选的消费者
consume_task_updates()
,若需要执行方回传进度,就可以在另一个队列task_updates
中收消息,更新数据库对应的status
。
- 本示例使用
七、补充扩展
-
权限管理
- 不同角色(指挥中心、政府部门、社会组织、志愿者)在调用 API 时可能有不同操作权限;
- 建议采用 OAuth2/JWT 或自定义 RBAC 体系进行安全管控。
-
多灾种耦合/多服务协同
- 本服务可与“实时监测与预警”服务联动:当监测到触发条件后,自动创建
Incident
并分派初始任务; - 可与“智能分析与辅助决策”服务联动:在创建任务或调度资源时,调用优化/预测模型,给出调度建议或路线规划。
- 本服务可与“实时监测与预警”服务联动:当监测到触发条件后,自动创建
-
更完善的资源管理
- 资源的数量、容量、属性及分配算法;
- 多个任务对同一资源的竞争与优先级调度;
- 实时位置追踪(GPS)并结合GIS地图。
-
日志与审计
- 关键操作(事件创建、任务分配、状态变更)需记录审计日志;
- 满足政府或行业对于应急管理流程合规性的要求。
-
多副本与分布式
- 在灾害高峰时请求量激增,可进行微服务多副本部署、RabbitMQ集群化、数据库读写分离等。
使用消息队列进行任务分发与进度同步的“应急指挥与调度模块”,实现其核心功能:事件管理、任务管理、资源管理,并通过 RabbitMQ 或者其他消息中间件将指令/状态改变实时广播给相关服务或客户端。
在真实环境中,结合安全鉴权、分布式部署、高可用、日志审计、可观测性等工程实践,才能形成一套可靠的多灾种应急管理指挥系统,助力更高效地统筹资源、指挥调度、应对自然灾害或突发事件。