在快速发展的物流与自主系统领域,优化无人机在三维空间中的飞行路径至关重要。无论是在城市环境中导航还是在复杂地形中穿行,确保高效、安全且节能的航线规划能够显著提升运营效率。本文将深入探讨一种创新方法,结合强化学习(Reinforcement Learning, RL)与遗传算法(Genetic Algorithms, GA),在自定义的3D环境中优化无人机飞行路径。我们将详细解析实现细节、RL与GA的协同作用,以及如何通过Plotly实现令人惊叹的3D可视化效果。
项目概述
本项目旨在通过结合强化学习(RL)与遗传算法(GA),在自定义的3D环境中优化无人机的飞行路径。具体来说:
- 环境定义(
low_altitude_env.py
):创建一个模拟无人机在3D网格中飞行的环境,考虑风速、风向、动态障碍物、电池容量等因素。 - 训练与优化(
dqn_ga.py
):使用PPO算法训练无人机的决策模型,并利用遗传算法进行多目标优化,平衡飞行时间、能耗与风险。 - 可视化展示(
main.py
):通过Plotly实现交互式3D可视化,直观展示无人机的飞行路径与环境中的动态障碍物。
这种模块化的设计不仅确保了代码的清晰与可维护性,还为后续的功能扩展提供了便利。
自定义3D环境:LowAltitudeLogisticsEnv
项目的核心是**LowAltitudeLogisticsEnv
**,一个基于OpenAI Gym构建的自定义环境。该环境模拟无人机在3D网格中的飞行,考虑多种环境因素和约束条件。
主要功能
- 状态表示:无人机的状态由12维向量表示,包括位置
(x, y, z)
、速度(vx, vy, vz)
、风向与风速(wind_dir, wind_speed)
、目标位置(goal_x, goal_y, goal_z)
以及剩余电量battery
。 - 动作空间:支持连续动作空间
[ax, ay, az]
(加速度)或离散动作空间(预定义的方向)。 - 动态障碍物:环境中障碍物根据自身速度向量动态移动,增加飞行路径规划的复杂性。
- 奖励机制:通过最小化时间、能耗与风险来奖励无人机,同时为到达目标给予额外奖励,为电量耗尽或高风险行为给予惩罚。
环境实现
以下是low_altitude_env.py
的完整实现:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
文件: low_altitude_env.py
主要功能:
1. 定义升级版低空物流环境 LowAltitudeLogisticsEnv
2. 支持离散或连续动作(可通过 continuous_action 切换)
3. 模拟飞行高度限制、风向风速、动态障碍物、电量消耗等
"""
import gym
import numpy as np
import random
from gym import spaces
class LowAltitudeLogisticsEnv(gym.Env):
"""
升级版低空物流环境:
- 状态 (state): [x, y, z, vx, vy, vz, wind_dir, wind_speed, goal_x, goal_y, goal_z, battery]
- 动作 (action):
* 连续空间 [ax, ay, az] 在 [-1,1] 范围内(若 continuous_action=True)
* 或 8离散方向/动作(若 continuous_action=False)
- 动态障碍物:obstacle_list 中包含 (ox, oy, oz, ovx, ovy, ovz),每步更新位置
- 更多约束:如飞行高度限制 [0, max_alt]
"""
def __init__(self,
grid_size=10,
max_alt=10.0,
max_episode_steps=50,
alpha=1.0,
beta=0.1,
gamma=0.5,
obstacle_list=None,
battery_capacity=100.0,
continuous_action=False):
super(LowAltitudeLogisticsEnv, self).__init__()
self.grid_size = grid_size # x-y 范围
self.max_alt = max_alt # z 高度范围
self.max_episode_steps = max_episode_steps
self.alpha = alpha
self.beta = beta
self.gamma = gamma
self.battery_capacity = battery_capacity
self.continuous_action = continuous_action # 是否使用连续动作
# 初始化障碍物:每个障碍物为 (ox, oy, oz, ovx, ovy, ovz),静态障碍物的速度为0
if obstacle_list is None:
self.obstacle_list = [(grid_size/2, grid_size/2, 2.0, 0, 0, 0)]
else:
self.obstacle_list = obstacle_list
# 动作空间定义
if self.continuous_action:
# 连续动作空间: [ax, ay, az],范围 [-1, 1]
self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32)
else:
# 离散动作空间 (示例:8个方向)
self.action_space = spaces.Discrete(8)
# 状态空间定义
low_state = np.array([
0, 0, 0, # x, y, z
-5, -5, -5, # vx, vy, vz
-180, # wind_dir
0, # wind_speed
0, 0, 0, # goal_x, goal_y, goal_z
0 # battery
], dtype=np.float32)
high_state = np.array([
grid_size, grid_size, max_alt,
5, 5, 5,
180,
20,
grid_size, grid_size, max_alt,
battery_capacity
], dtype=np.float32)
self.observation_space = spaces.Box(low=low_state, high=high_state, dtype=np.float32)
self.steps = 0
self.state = None
self.reset()
def step(self, action):
"""
环境交互逻辑:
1. 根据动作更新 (vx, vy, vz)
2. 结合风速风向,计算新的 (x, y, z)
3. 判断与障碍物距离、计算风险
4. 计算奖励 + 检查 done
"""
x, y, z, vx, vy, vz, wind_dir, wind_speed, goal_x, goal_y, goal_z, battery = self.state
# 1) 动作处理
if self.continuous_action:
# action = [ax, ay, az]
ax, ay, az = action
new_vx = vx + ax * 0.1
new_vy = vy + ay * 0.1
new_vz = vz + az * 0.1
else:
# 离散动作(示例:8个方向)
directions = [
(1, 0, 0), # +x
(-1, 0, 0), # -x
(0, 1, 0), # +y
(0, -1, 0), # -y
(0, 0, 1), # +z
(0, 0, -1), # -z
(1, 1, 0), # x+y
(-1, 1, 0) # -x+y
]
dx, dy, dz = directions[action]
new_vx, new_vy, new_vz = dx, dy, dz
# 2) 风的影响
wind_rad = np.radians(wind_dir)
wind_fx = wind_speed * np.cos(wind_rad) * 0.1
wind_fy = wind_speed * np.sin(wind_rad) * 0.1
# wind_fz 可自行拓展,目前示例只考虑水平方向
# 3) 更新坐标
new_x = x + new_vx + wind_fx
new_y = y + new_vy + wind_fy
new_z = z + new_vz
# 4) 约束
new_x = np.clip(new_x, 0, self.grid_size)
new_y = np.clip(new_y, 0, self.grid_size)
new_z = np.clip(new_z, 0, self.max_alt)
# 5) 计算时间&能耗
move_dist = np.sqrt((new_x - x)**2 + (new_y - y)**2 + (new_z - z)**2)
time_cost = move_dist
energy_cost = move_dist * (1 + 0.1 * wind_speed)
new_battery = max(battery - energy_cost, 0)
# 6) 更新障碍物 (动态)
updated_obstacles = []
risk = 0.0
for obs in self.obstacle_list:
ox, oy, oz, ovx, ovy, ovz = obs
updated_ox = ox + ovx * 0.1
updated_oy = oy + ovy * 0.1
updated_oz = oz + ovz * 0.1
updated_obstacles.append((updated_ox, updated_oy, updated_oz, ovx, ovy, ovz))
# 计算与障碍物距离
dist_obs = np.sqrt((new_x - updated_ox)**2 + (new_y - updated_oy)**2 + (new_z - updated_oz)**2)
if dist_obs < 1.0: # 若过近
risk += 2.0
self.obstacle_list = updated_obstacles
# 风速过大 -> risk
if wind_speed > 5:
risk += 1.0
# 7) 到目标的距离变化
dist_to_goal_before = np.sqrt((goal_x - x)**2 + (goal_y - y)**2 + (goal_z - z)**2)
dist_to_goal_after = np.sqrt((goal_x - new_x)**2 + (goal_y - new_y)**2 + (goal_z - new_z)**2)
# 8) 奖励
reward = -(self.alpha * time_cost + self.beta * energy_cost + self.gamma * risk)
# 若更接近目标 -> 小正奖励
if dist_to_goal_after < dist_to_goal_before:
reward += 0.1
# 电量耗尽 -> 大惩罚
if new_battery <= 0:
reward -= 5.0
# 随机改变风
if random.random() < 0.05:
wind_dir = random.randint(-180, 180)
if random.random() < 0.05:
wind_speed = random.uniform(0, 10)
# 9) 更新状态
self.state = np.array([
new_x, new_y, new_z,
new_vx, new_vy, new_vz,
wind_dir, wind_speed,
goal_x, goal_y, goal_z,
new_battery
], dtype=np.float32)
self.steps += 1
# 10) done 判断
done = False
if dist_to_goal_after < 1.0:
reward += 20.0
done = True
if new_battery <= 0:
done = True
if self.steps >= self.max_episode_steps:
done = True
info = {
"dist_to_goal": dist_to_goal_after,
"remaining_battery": new_battery,
"risk": risk,
"energy_step": energy_cost # 添加能耗信息,供GA使用
}
return self.state, reward, done, info
def reset(self):
"""
重置环境状态:
1. 随机初始位置/目标/风
2. 若需要,也可在这里重置障碍物位置
"""
self.steps = 0
x = np.random.uniform(0, self.grid_size)
y = np.random.uniform(0, self.grid_size)
z = np.random.uniform(0, self.max_alt)
vx, vy, vz = 0, 0, 0
goal_x = np.random.uniform(0, self.grid_size)
goal_y = np.random.uniform(0, self.grid_size)
goal_z = np.random.uniform(0, self.max_alt)
wind_dir = np.random.randint(-180, 181)
wind_speed = np.random.uniform(0, 5)
battery = self.battery_capacity
self.state = np.array([
x, y, z,
vx, vy, vz,
wind_dir, wind_speed,
goal_x, goal_y, goal_z,
battery
], dtype=np.float32)
# 若每次 reset 都重置障碍物位置,可在此处理 self.obstacle_list
return self.state
def render(self, mode='human'):
"""
可视化操作(例如在gym自带Viewer中渲染),暂未实现。
可在外部使用Plotly或Matplotlib进行可视化。
"""
pass
强化学习:使用PPO训练无人机
为了使无人机能够在复杂的3D环境中自主导航,我们采用了**Proximal Policy Optimization (PPO)**算法进行训练。PPO是一种先进的策略优化方法,因其稳定性和效率被广泛应用于强化学习任务中。
PPO训练流程
- 环境初始化:通过
DummyVecEnv
包装自定义环境,确保与稳定基线3(Stable Baselines3)库兼容。 - 模型创建:基于多层感知机策略(MlpPolicy)构建PPO模型,设定学习率、批量大小等超参数。
- 回调函数:利用
EvalCallback
在训练过程中定期评估模型性能,并保存最佳模型。 - 模型训练:通过
model.learn()
方法在设定的总步数内进行训练。 - 模型保存:训练完成后,将模型保存至指定路径,便于后续加载与测试。
训练与测试实现
以下是dqn_ga.py
中PPO训练与测试的关键部分:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
文件: dqn_ga.py
主要升级或修改点:
1. 采用单进程训练 (DummyVecEnv) 而非多进程 (SubprocVecEnv)
2. 在环境中添加 'energy_step' 信息,便于遗传算法使用
3. 简化并优化训练与测试逻辑
"""
import time
import random
import numpy as np
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.callbacks import EvalCallback
from deap import base, creator, tools
# ----------------------------------------------------------------
# 1) PPO 训练逻辑
# ----------------------------------------------------------------
def make_env(env_class, **env_kwargs):
"""
用于创建单个 env 实例给 DummyVecEnv
"""
def _init():
env = env_class(**env_kwargs)
return env
return _init
def train_ppo(env_class, env_kwargs=None, total_timesteps=20000, save_path='ppo_model.zip'):
"""
使用 PPO 进行训练 (单进程示例)
:param env_class: 你的环境类, e.g. LowAltitudeLogisticsEnv
:param env_kwargs: 传给环境的参数dict
:param total_timesteps: 训练总步数
:param save_path: 模型保存路径
"""
if env_kwargs is None:
env_kwargs = {}
# 单个环境
vec_env = DummyVecEnv([make_env(env_class, **env_kwargs)])
# 创建 PPO 模型
model = PPO(
"MlpPolicy",
vec_env,
verbose=1,
learning_rate=3e-4,
n_steps=2048, # 若只有1个env, 这就相当于每2048步做一次更新
batch_size=64,
gamma=0.99,
gae_lambda=0.95,
ent_coef=0.0,
clip_range=0.2
)
# 回调:自动评估并保存最佳模型
eval_env = DummyVecEnv([make_env(env_class, **env_kwargs)])
eval_callback = EvalCallback(eval_env,
best_model_save_path='./logs/',
log_path='./logs/',
eval_freq=1000, # 每1000步评估一次
deterministic=True,
render=False)
print("[INFO] 开始训练 PPO 模型...")
start_time = time.time()
model.learn(total_timesteps=total_timesteps, callback=eval_callback)
end_time = time.time()
print(f"[INFO] 训练完成,耗时 {end_time - start_time:.2f} 秒。")
model.save(save_path)
print(f"[INFO] PPO 模型已保存至 {save_path}")
vec_env.close()
eval_env.close()
return model
def test_model(env, model, num_episodes=5):
"""
测试通用模型 (PPO / DQN / SAC ...)
:param env: 单个环境实例
:param model: 训练好的模型
:param num_episodes: 测试回合数
"""
total_rewards = []
for ep in range(num_episodes):
obs = env.reset()
done = False
ep_reward = 0.0
while not done:
action, _ = model.predict(obs, deterministic=True)
obs, reward, done, info = env.step(action)
ep_reward += reward
total_rewards.append(ep_reward)
print(f"回合 {ep+1}, 奖励 = {ep_reward:.2f}")
return total_rewards
训练注意事项
- 环境配置:确保环境中的
info
字典包含energy_step
信息,以便遗传算法准确获取每步能耗。 - 超参数调整:根据实际训练效果,适当调整学习率、批量大小等超参数,以获得更优的训练表现。
- 模型评估:利用
EvalCallback
定期评估模型性能,并保存表现最好的模型,避免过拟合或训练不稳定。
多目标优化:遗传算法与环境联动
在优化无人机飞行路径时,往往需要在多个目标之间进行权衡,例如最小化飞行时间、能耗与风险。**遗传算法(GA)**以其强大的搜索与优化能力,成为解决多目标优化问题的理想选择。
遗传算法流程
- 个体与种群初始化:每个个体代表一条飞行路径,由多个关键节点组成。
- 适应度评估:在环境中模拟飞行路径,计算对应的时间、能耗与风险。
- 选择、交叉与变异:通过选择机制保留优秀个体,利用交叉与变异操作生成新个体,促进种群多样性。
- 进化迭代:重复适应度评估与遗传操作,逐步优化种群适应度。
实现细节
以下是dqn_ga.py
中遗传算法的关键部分:
# ----------------------------------------------------------------
# 2) 多目标遗传算法 (GA) 示例: 与环境真正联动
# ----------------------------------------------------------------
# 定义多目标适应度, 需要最小化 time, energy, risk => weights = (-1, -1, -1)
creator.create("FitnessMulti", base.Fitness, weights=(-1.0, -1.0, -1.0))
creator.create("Individual", list, fitness=creator.FitnessMulti)
toolbox = base.Toolbox()
def random_node_3d(grid_size=10, max_alt=10):
"""
随机生成一个三维节点 (x, y, z)
"""
x = random.uniform(0, grid_size)
y = random.uniform(0, grid_size)
z = random.uniform(0, max_alt)
return (x, y, z)
def init_individual_3d():
"""
初始化一个个体: 假设每条路径由5个关键节点组成
你可根据需求调整节点数量
"""
return tools.initRepeat(
creator.Individual,
lambda: random_node_3d(10, 10), # 也可用 config 传入 grid_size, max_alt
5
)
toolbox.register("individual", init_individual_3d)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
def multi_objective_fitness(individual, env_class, env_kwargs):
"""
在环境中模拟从 "起点" -> individual[0] -> individual[1] -> ... -> "最后一个节点"
计算 time, energy, risk.
"""
# 创建临时环境
env = env_class(**env_kwargs)
obs = env.reset()
total_time = 0.0
total_energy = 0.0
total_risk = 0.0
current_pos = obs[0:3] # x, y, z
for waypoint in individual:
# 简化: 人为地让无人机朝 waypoint 飞, 每步检查距离
steps = 0
done = False
while not done and steps < 50: # 最多飞50步去接近该 waypoint
direction = np.array(waypoint) - np.array(current_pos)
distance = np.linalg.norm(direction)
if distance < 0.5: # 到该节点
break
direction = direction / (distance + 1e-8)
# 假设你的环境是连续动作, action = [dx, dy, dz] * 0.5
action = direction * 0.5
# 与环境交互
obs, reward, done, info = env.step(action)
current_pos = obs[0:3]
# 按你需要统计的指标累计
# time => 每执行一步 step 就算1
total_time += 1.0
# energy 可从 step() 的 info 或其他方式获得,这里仅示例
total_energy += info.get('energy_step', 0.1)
total_risk += info.get('risk', 0.0)
steps += 1
if done:
# 可能是电量耗尽等原因
break
# 返回 (time, energy, risk)
return (total_time, total_energy, total_risk)
# ========== 让 deap 调用 multi_objective_fitness ==========
def evaluate_individual(ind):
return multi_objective_fitness(ind, GLOBAL_ENV_CLASS, GLOBAL_ENV_KWARGS)
toolbox.register("evaluate", evaluate_individual)
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutShuffleIndexes, indpb=0.2)
toolbox.register("select", tools.selNSGA2)
def run_ga(pop_size=10, ngen=10, env_class=None, env_kwargs=None):
"""
运行多目标遗传算法 (示例)
:param pop_size: 种群规模
:param ngen: 迭代代数
:param env_class: 你的环境类
:param env_kwargs: 传给环境的参数dict
"""
# 注册全局变量, 供 evaluate_individual 调用
global GLOBAL_ENV_CLASS, GLOBAL_ENV_KWARGS
GLOBAL_ENV_CLASS = env_class
GLOBAL_ENV_KWARGS = env_kwargs if env_kwargs else {}
pop = toolbox.population(n=pop_size)
invalid_ind = [ind for ind in pop if not ind.fitness.valid]
fitnesses = map(toolbox.evaluate, invalid_ind)
for ind, fit in zip(invalid_ind, fitnesses):
ind.fitness.values = fit
for gen in range(ngen):
# 选择
offspring = toolbox.select(pop, len(pop))
offspring = list(map(toolbox.clone, offspring))
# 交叉、变异
for child1, child2 in zip(offspring[::2], offspring[1::2]):
if random.random() < 0.9:
toolbox.mate(child1, child2)
toolbox.mutate(child1)
toolbox.mutate(child2)
del child1.fitness.values
del child2.fitness.values
invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
fitnesses = map(toolbox.evaluate, invalid_ind)
for ind, fit in zip(invalid_ind, fitnesses):
ind.fitness.values = fit
# 新一代
pop = toolbox.select(pop + offspring, pop_size)
return pop
适应度评估
在multi_objective_fitness
函数中,我们通过在环境中模拟飞行路径,计算每条路径的飞行时间、能耗与风险。遗传算法将基于这些指标,进化出最优的飞行路径。
遗传算法运行
以下是如何运行遗传算法的示例:
# 创建环境配置
env_kwargs = {
"grid_size": 10,
"max_alt": 5.0,
"max_episode_steps": 50,
"alpha": 1.0,
"beta": 0.1,
"gamma": 0.5,
"obstacle_list": [
(5, 5, 3, 0.1, 0, 0),
(2, 7, 2, 0, 0.1, 0)
],
"battery_capacity": 50.0,
"continuous_action": True
}
# 运行GA
final_population = run_ga(
pop_size=20,
ngen=15,
env_class=LowAltitudeLogisticsEnv,
env_kwargs=env_kwargs
)
# 打印前几名个体及其适应度
for i, ind in enumerate(final_population[:5]):
print(f"个体 {i+1}: {ind}, 适应度={ind.fitness.values}")
炫酷的3D可视化:Plotly应用
为了更直观地展示无人机的飞行路径与环境中的动态障碍物,我们使用Plotly库实现了交互式的3D可视化效果。通过可旋转、缩放的3D图形,用户可以全面了解无人机在不同飞行路径下的表现。
可视化实现
以下是main.py
中实现3D可视化的关键部分:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
文件: main.py
示例功能:
1. 创建 LowAltitudeLogisticsEnv 环境
2. 执行若干步 (使用随机动作或自定义动作)
3. 使用 Plotly 绘制 3D 轨迹和障碍物分布,实现可交互、炫酷的可视化
"""
import plotly.graph_objects as go
import numpy as np
from low_altitude_env import LowAltitudeLogisticsEnv
from dqn_ga import train_ppo, test_model, run_ga
from stable_baselines3 import PPO
def visualize_3d_path_plotly(env, model=None, max_steps=100, title="3D Flight Simulation"):
"""
使用 Plotly 进行交互式3D可视化:
- env: 传入的环境实例 (需已初始化)
- model: 若为 None,则随机动作;否则使用 model.predict(obs)
- max_steps: 最大步数
- title: 图标题
"""
path_x, path_y, path_z = [], [], []
obstacles_history = [] # 若想逐步保存障碍物位置
obs = env.reset()
path_x.append(obs[0])
path_y.append(obs[1])
path_z.append(obs[2])
for step_i in range(max_steps):
if model is not None:
# 若有训练好的模型,可用 model.predict(...)
action, _ = model.predict(obs)
else:
# 无模型时用随机动作演示
if env.continuous_action:
action = np.random.uniform(-1, 1, size=(3,))
else:
action = env.action_space.sample()
obs, reward, done, info = env.step(action)
path_x.append(obs[0])
path_y.append(obs[1])
path_z.append(obs[2])
# 保存当前步障碍物位置(用于后面可视化)
obstacles_history.append(env.obstacle_list)
if done:
break
# ========== Plotly 绘图部分 ==========
# 1) 无人机飞行轨迹(线 + 点)
trace_path = go.Scatter3d(
x=path_x,
y=path_y,
z=path_z,
mode='lines+markers',
line=dict(color='blue', width=4),
marker=dict(size=4, color='blue', symbol='circle'),
name='无人机路径'
)
# 2) 起点 & 终点
start_marker = go.Scatter3d(
x=[path_x[0]], y=[path_y[0]], z=[path_z[0]],
mode='markers',
marker=dict(size=6, color='green', symbol='diamond'),
name='起点'
)
# obs里 goal_x, goal_y, goal_z 分别在下标 [8], [9], [10]
goal_marker = go.Scatter3d(
x=[obs[8]], y=[obs[9]], z=[obs[10]],
mode='markers',
marker=dict(size=6, color='red', symbol='diamond'),
name='目标点'
)
# 3) 障碍物:仅示例最终一步的障碍物位置
final_obstacles = obstacles_history[-1] if len(obstacles_history) > 0 else env.obstacle_list
obs_x = [o[0] for o in final_obstacles]
obs_y = [o[1] for o in final_obstacles]
obs_z = [o[2] for o in final_obstacles]
obstacle_marker = go.Scatter3d(
x=obs_x,
y=obs_y,
z=obs_z,
mode='markers',
marker=dict(size=5, color='orange', symbol='x'),
name='障碍物'
)
# 4) 布局
layout = go.Layout(
title=title,
scene=dict(
xaxis_title='X',
yaxis_title='Y',
zaxis_title='Z',
aspectmode='cube'
),
width=800,
height=700,
showlegend=True
)
fig = go.Figure(data=[trace_path, start_marker, goal_marker, obstacle_marker], layout=layout)
fig.show() # 在浏览器窗口中显示交互式3D图
# 若在Jupyter Notebook中,可使用:
# from plotly.offline import iplot
# iplot(fig)
# 或将图表保存为HTML:
# fig.write_html("3d_simulation.html")
def main():
"""
主函数示例:
1. 创建连续动作环境
2. 使用PPO训练无人机
3. 测试并可视化
4. 运行遗传算法进行路径优化
"""
# 环境配置
env_kwargs = {
"grid_size": 10,
"max_alt": 5.0,
"max_episode_steps": 50,
"alpha": 1.0,
"beta": 0.1,
"gamma": 0.5,
"obstacle_list": [
(5, 5, 3, 0.1, 0, 0),
(2, 7, 2, 0, 0.1, 0)
],
"battery_capacity": 50.0,
"continuous_action": True
}
# 创建环境实例
env = LowAltitudeLogisticsEnv(**env_kwargs)
# 1. 训练 PPO 模型
model_path = "ppo_low_altitude_model.zip"
model = train_ppo(
env_class=LowAltitudeLogisticsEnv,
env_kwargs=env_kwargs,
total_timesteps=50000,
save_path=model_path
)
# 2. 测试训练好的模型
print("\n[INFO] 测试训练好的PPO模型...")
test_rewards = test_model(env, model, num_episodes=3)
# 3. 使用Plotly可视化单次飞行轨迹
print("\n[INFO] 可视化单次飞行轨迹...")
visualize_3d_path_plotly(env, model=model, max_steps=100, title="Plotly 3D飞行模拟")
# 4. 运行遗传算法进行路径优化
print("\n[INFO] 运行多目标遗传算法进行路径优化...")
final_population = run_ga(
pop_size=20,
ngen=15,
env_class=LowAltitudeLogisticsEnv,
env_kwargs=env_kwargs
)
print("遗传算法最终种群(前5名):")
for i, ind in enumerate(final_population[:5]):
print(f" 个体 {i+1}: {ind}, 适应度={ind.fitness.values}")
# 可视化遗传算法的Pareto前沿
visualize_pareto(final_population)
print("\n[INFO] 所有任务完成!")
def visualize_pareto(population):
"""
可视化多目标优化的Pareto前沿
"""
import plotly.express as px
# 提取适应度值
time_vals = [ind.fitness.values[0] for ind in population]
energy_vals = [ind.fitness.values[1] for ind in population]
risk_vals = [ind.fitness.values[2] for ind in population]
# 创建DataFrame
import pandas as pd
df = pd.DataFrame({
'Time': time_vals,
'Energy': energy_vals,
'Risk': risk_vals
})
# 使用Plotly绘制3D散点图
fig = px.scatter_3d(df, x='Time', y='Energy', z='Risk',
title="遗传算法Pareto前沿",
labels={'Time': '时间', 'Energy': '能耗', 'Risk': '风险'},
opacity=0.7)
fig.show()
if __name__ == "__main__":
main()
适应度评估细节
在multi_objective_fitness
函数中,我们通过模拟无人机飞行路径,逐步计算每条路径的总飞行时间、能耗与风险。这些指标将作为遗传算法的适应度值,指导种群的进化方向。
运行遗传算法
通过run_ga
函数,我们设定种群规模(pop_size
)和迭代代数(ngen
),并传入环境类与配置参数。最终,遗传算法将输出优化后的路径种群,并可视化其Pareto前沿。
炫酷的3D可视化:Plotly应用
为了更直观地展示无人机的飞行路径与环境中的动态障碍物,我们采用了Plotly库实现了交互式的3D可视化效果。通过可旋转、缩放的3D图形,用户可以全面了解无人机在不同飞行路径下的表现。
可视化实现
以下是main.py
中实现3D可视化的关键部分:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
文件: main.py
示例功能:
1. 创建 LowAltitudeLogisticsEnv 环境
2. 执行若干步 (使用随机动作或自定义动作)
3. 使用 Plotly 绘制 3D 轨迹和障碍物分布,实现可交互、炫酷的可视化
"""
import plotly.graph_objects as go
import numpy as np
from low_altitude_env import LowAltitudeLogisticsEnv
from dqn_ga import train_ppo, test_model, run_ga
from stable_baselines3 import PPO
def visualize_3d_path_plotly(env, model=None, max_steps=100, title="3D Flight Simulation"):
"""
使用 Plotly 进行交互式3D可视化:
- env: 传入的环境实例 (需已初始化)
- model: 若为 None,则随机动作;否则使用 model.predict(obs)
- max_steps: 最大步数
- title: 图标题
"""
path_x, path_y, path_z = [], [], []
obstacles_history = [] # 若想逐步保存障碍物位置
obs = env.reset()
path_x.append(obs[0])
path_y.append(obs[1])
path_z.append(obs[2])
for step_i in range(max_steps):
if model is not None:
# 若有训练好的模型,可用 model.predict(...)
action, _ = model.predict(obs)
else:
# 无模型时用随机动作演示
if env.continuous_action:
action = np.random.uniform(-1, 1, size=(3,))
else:
action = env.action_space.sample()
obs, reward, done, info = env.step(action)
path_x.append(obs[0])
path_y.append(obs[1])
path_z.append(obs[2])
# 保存当前步障碍物位置(用于后面可视化)
obstacles_history.append(env.obstacle_list)
if done:
break
# ========== Plotly 绘图部分 ==========
# 1) 无人机飞行轨迹(线 + 点)
trace_path = go.Scatter3d(
x=path_x,
y=path_y,
z=path_z,
mode='lines+markers',
line=dict(color='blue', width=4),
marker=dict(size=4, color='blue', symbol='circle'),
name='无人机路径'
)
# 2) 起点 & 终点
start_marker = go.Scatter3d(
x=[path_x[0]], y=[path_y[0]], z=[path_z[0]],
mode='markers',
marker=dict(size=6, color='green', symbol='diamond'),
name='起点'
)
# obs里 goal_x, goal_y, goal_z 分别在下标 [8], [9], [10]
goal_marker = go.Scatter3d(
x=[obs[8]], y=[obs[9]], z=[obs[10]],
mode='markers',
marker=dict(size=6, color='red', symbol='diamond'),
name='目标点'
)
# 3) 障碍物:仅示例最终一步的障碍物位置
final_obstacles = obstacles_history[-1] if len(obstacles_history) > 0 else env.obstacle_list
obs_x = [o[0] for o in final_obstacles]
obs_y = [o[1] for o in final_obstacles]
obs_z = [o[2] for o in final_obstacles]
obstacle_marker = go.Scatter3d(
x=obs_x,
y=obs_y,
z=obs_z,
mode='markers',
marker=dict(size=5, color='orange', symbol='x'),
name='障碍物'
)
# 4) 布局
layout = go.Layout(
title=title,
scene=dict(
xaxis_title='X',
yaxis_title='Y',
zaxis_title='Z',
aspectmode='cube'
),
width=800,
height=700,
showlegend=True
)
fig = go.Figure(data=[trace_path, start_marker, goal_marker, obstacle_marker], layout=layout)
fig.show() # 在浏览器窗口中显示交互式3D图
# 若在Jupyter Notebook中,可使用:
# from plotly.offline import iplot
# iplot(fig)
# 或将图表保存为HTML:
# fig.write_html("3d_simulation.html")
def main():
"""
主函数示例:
1. 创建连续动作环境
2. 使用PPO训练无人机
3. 测试并可视化
4. 运行遗传算法进行路径优化
"""
# 环境配置
env_kwargs = {
"grid_size": 10,
"max_alt": 5.0,
"max_episode_steps": 50,
"alpha": 1.0,
"beta": 0.1,
"gamma": 0.5,
"obstacle_list": [
(5, 5, 3, 0.1, 0, 0),
(2, 7, 2, 0, 0.1, 0)
],
"battery_capacity": 50.0,
"continuous_action": True
}
# 创建环境实例
env = LowAltitudeLogisticsEnv(**env_kwargs)
# 1. 训练 PPO 模型
model_path = "ppo_low_altitude_model.zip"
model = train_ppo(
env_class=LowAltitudeLogisticsEnv,
env_kwargs=env_kwargs,
total_timesteps=50000,
save_path=model_path
)
# 2. 测试训练好的模型
print("\n[INFO] 测试训练好的PPO模型...")
test_rewards = test_model(env, model, num_episodes=3)
# 3. 使用Plotly可视化单次飞行轨迹
print("\n[INFO] 可视化单次飞行轨迹...")
visualize_3d_path_plotly(env, model=model, max_steps=100, title="Plotly 3D飞行模拟")
# 4. 运行遗传算法进行路径优化
print("\n[INFO] 运行多目标遗传算法进行路径优化...")
final_population = run_ga(
pop_size=20,
ngen=15,
env_class=LowAltitudeLogisticsEnv,
env_kwargs=env_kwargs
)
print("遗传算法最终种群(前5名):")
for i, ind in enumerate(final_population[:5]):
print(f" 个体 {i+1}: {ind}, 适应度={ind.fitness.values}")
# 可视化遗传算法的Pareto前沿
visualize_pareto(final_population)
print("\n[INFO] 所有任务完成!")
def visualize_pareto(population):
"""
可视化多目标优化的Pareto前沿
"""
import plotly.express as px
# 提取适应度值
time_vals = [ind.fitness.values[0] for ind in population]
energy_vals = [ind.fitness.values[1] for ind in population]
risk_vals = [ind.fitness.values[2] for ind in population]
# 创建DataFrame
import pandas as pd
df = pd.DataFrame({
'Time': time_vals,
'Energy': energy_vals,
'Risk': risk_vals
})
# 使用Plotly绘制3D散点图
fig = px.scatter_3d(df, x='Time', y='Energy', z='Risk',
title="遗传算法Pareto前沿",
labels={'Time': '时间', 'Energy': '能耗', 'Risk': '风险'},
opacity=0.7)
fig.show()
if __name__ == "__main__":
main()
可视化说明
- 无人机路径:通过蓝色线条和标记展示无人机的飞行轨迹。
- 起点与目标点:分别用绿色和红色的菱形标记表示,直观展示无人机的起始与终点位置。
- 障碍物:用橙色的“×”符号标记,显示环境中动态障碍物的最终位置。
- 交互功能:用户可以通过鼠标旋转、缩放图形,全面观察无人机的飞行路径与障碍物分布。
遗传算法Pareto前沿可视化
通过Plotly的3D散点图,我们可以清晰地看到遗传算法在多目标优化下的Pareto前沿,直观展示不同个体在时间、能耗与风险之间的权衡关系。