多目标跟踪中用到的求解线性分配问题(Linear Assignment Problem,LAP)Python
flyfish
如果想看 C++版本的,请点这里。
线性分配问题(LAP,Linear Assignment Problem)是一个经典的优化问题,其目标是在若干任务和若干工人之间进行分配,以最小化总成本。成本可以是时间、金钱等。
LAPJV算法(Linear Assignment Problem Jonker-Volgenant algorithm)是解决线性分配问题的一种具体方法。这是一种基于Jonker和Volgenant提出的优化算法,旨在高效地求解LAP。
我们有一个公司,当前有5个任务需要完成,同时公司有5名工人可供选择。每个工人完成每个任务的成本(例如时间、金钱等)不同。公司的目标是找到一种分配方案,使得所有任务都能被一个工人完成,并且总成本最小。
具体而言,成本矩阵如下:
任务1 | 任务2 | 任务3 | 任务4 | 任务5 | |
---|---|---|---|---|---|
工人1 | 9 | 11 | 14 | 11 | 7 |
工人2 | 6 | 15 | 13 | 13 | 10 |
工人3 | 12 | 13 | 6 | 8 | 8 |
工人4 | 11 | 9 | 10 | 12 | 9 |
工人5 | 7 | 12 | 14 | 10 | 14 |
该矩阵中的每个值表示分配工人完成任务的成本。公司希望最小化这些成本的总和。
用Python代码来理解,使用lap库来验证写的对不对
依赖库
conda install -c conda-forge lap
代码实现
import numpy as np
import matplotlib.pyplot as plt
import lap
class LAPJV:
def __init__(self, cost_matrix):
self.cost_matrix = np.array(cost_matrix)
self.n = self.cost_matrix.shape[0]
self.x = np.full(self.n, -1, dtype=int)
self.y = np.full(self.n, -1, dtype=int)
self.v = np.zeros(self.n)
self.free_rows = []
def solve(self):
self.column_reduction()
self.augment()
def column_reduction(self):
for j in range(self.n):
min_cost = np.min(self.cost_matrix[:, j])
self.cost_matrix[:, j] -= min_cost
self.v[j] = min_cost
for j in range(self.n):
i = np.argmin(self.cost_matrix[:, j])
if self.x[i] == -1:
self.x[i] = j
self.y[j] = i
else:
self.y[j] = -1
for i in range(self.n):
if self.x[i] == -1:
self.free_rows.append(i)
elif self.y[self.x[i]] != i:
self.x[i] = -1
self.free_rows.append(i)
def augment(self):
while self.free_rows:
self.find_augmenting_path()
def find_augmenting_path(self):
d = np.full(self.n, np.inf)
pred = np.full(self.n, -1, dtype=int)
cols = np.arange(self.n)
i = self.free_rows.pop(0)
for j in range(self.n):
d[j] = self.cost_matrix[i, j] - self.v[j]
pred[j] = i
u = np.zeros(self.n)
visited = np.zeros(self.n, dtype=bool)
final_j = -1
while final_j == -1:
min_d = np.min(d[~visited])
for j in cols[~visited]:
if d[j] == min_d:
if self.y[j] == -1:
final_j = j
break
visited[j] = True
i = self.y[j]
u[j] = min_d
for k in cols[~visited]:
new_d = self.cost_matrix[i, k] - self.v[k] - u[j]
if new_d < d[k]:
d[k] = new_d
pred[k] = j
while final_j != -1:
i = pred[final_j]
j = self.x[i]
self.y[final_j] = i
self.x[i] = final_j
final_j = j
def get_assignment(self):
return self.x
def plot_assignment(cost_matrix, assignment):
n = len(cost_matrix)
fig, ax = plt.subplots()
im = ax.imshow(cost_matrix, cmap='viridis')
for i in range(n):
ax.plot(assignment[i], i, 'ro')
ax.set_xticks(np.arange(n))
ax.set_yticks(np.arange(n))
ax.set_xticklabels([f'Job {j+1}' for j in range(n)])
ax.set_yticklabels([f'Worker {i+1}' for i in range(n)])
plt.title('Job Assignment')
plt.colorbar(im)
plt.show()
# 更复杂的示例成本矩阵
cost_matrix = [
[9, 11, 14, 11, 7],
[6, 15, 13, 13, 10],
[12, 13, 6, 8, 8],
[11, 9, 10, 12, 9],
[7, 12, 14, 10, 14]
]
# 创建并解决LAPJV问题
lap_solver = LAPJV(cost_matrix)
lap_solver.solve()
assignment = lap_solver.get_assignment()
# 使用lap库验证结果
cost_matrix_np = np.array(cost_matrix)
row_ind, col_ind, _ = lap.lapjv(cost_matrix_np)
# 打印分配结果
print("Custom LAPJV Assignment:")
for i, j in enumerate(assignment):
print(f"Worker {i+1} is assigned to Job {j+1}")
print("\nLAP Library Assignment:")
for i, j in enumerate(col_ind):
print(f"Worker {i+1} is assigned to Job {j+1}")
# 绘制分配结果
plot_assignment(cost_matrix, assignment)
Custom LAPJV Assignment:
Worker 1 is assigned to Job 5
Worker 2 is assigned to Job 1
Worker 3 is assigned to Job 3
Worker 4 is assigned to Job 2
Worker 5 is assigned to Job 4
LAP Library Assignment:
Worker 1 is assigned to Job 5
Worker 2 is assigned to Job 1
Worker 3 is assigned to Job 3
Worker 4 is assigned to Job 2
Worker 5 is assigned to Job 4
如何使用~来选择未访问的列:
import numpy as np
# 示例数据
visited = np.array([True, False, True, False])
cols = np.array([0, 1, 2, 3])
# 选择未访问的列
unvisited_cols = cols[~visited]
print("Visited:", visited)
print("Cols:", cols)
print("Unvisited Cols:", unvisited_cols)
Visited: [ True False True False]
Cols: [0 1 2 3]
Unvisited Cols: [1 3]