需要源码和数据集请点赞关注收藏后评论区留言私信~~~
一、视觉问题简介
视觉问答(VQA)是一种同时设计计算机视觉和自然语言处理的学习任务。简单来说,VQA就是对给定的图片进行问答,一个VQA系统以一张图片和一个关于这张图片形式自由,开放式的自然语言问题作为输入,生成一条自然语言答案作为输出,视觉问题系统综合运用到了目前的计算机视觉和自然语言处理的技术,并设计模型设计,实验,以及可视化。
VQA问题的一种典型模型是联合嵌入模型,这种方法首先学习视觉与自然语言的两个不同模态特征在一个共同的特征空间的嵌入表示,然后根据这种嵌入表示产生回答。
二、数据集的准备
1:下载数据
这里使用VQA2.0数据集进行训练和验证,VQA2.0是一个公认有难度,并且语言验证得到了有效控制的数据集
本次使用到的图片为MSCOCO数据集中train2014子集和val2014子集,图片可以在官网下载
数据集网址
本次用到的图像特征是由目标检测网络Faster-RCNN检测并生成的,可评论区留言私信博主要
2:安装依赖
确保安装好PyTorch,然后在程序目录下运行pip install -r requirements.txt安装其他依赖项
三、关键模块简介
1:FCnet模块
FCnet即一系列的全连接层,各个层的输入输出大小在模块构建时给出,这个模块默认使其中的全连接层具有bias,并以ReLU作为激活函数 并使用weight normalization
2:SimpleClassifier模块
它的作用是:在视觉问答系统的末端,根据融合的特征得到最终答案
3:问题嵌入模块
在联合嵌入模型中,需要使用RNN将输入的问题编码成向量,LSTM和GRU使两种代表性的RNN,由于实践中GRU与LSTM表现相近且占用显存较少,所以这里选用GRU
4:词嵌入
要获得问题句子的嵌入表示,首先应该获得词嵌入表示,每一个词需要用一个唯一的数字表示
baseline代码如下
import torch
import torch.nn as nn
from lib.module import topdown_attention
from lib.module.language_model import WordEmbedding, QuestionEmbedding
from lib.module.classifier import SimpleClassifier
from lib.module.fc import FCNet
class Baseline(nn.Module):
def __init__(self, w_emb, q_emb, v_att, q_net, v_net, classifer, need_internals=False):
super(Baseline, self).__init__()
self.need_internals = need_internals
self.w_emb = w_emb
self.q_emb = q_emb
self.v_att = v_att
self.q_net = q_net
self.v_net = v_net
self.classifier = classifer
def forward(self, q_tokens, ent_features):
w_emb = self.w_emb(q_tokens)
q_emb = self.q_emb(w_emb)
att = self.v_att(q_emb, ent_features) # [ B, n_ent, 1 ]
v_emb = (att * ent_features).sum(1) # [ B, hid_dim ]
internals = [att.squeeze()] if self.need_internals else None
q_repr = self.q_net(q_emb)
v_repr = self.v_net(v_emb)
joint_repr = q_repr * v_repr
logits = self.classifier(joint_repr)
return logits, internals
@classmethod
def build_from_config(cls, cfg, dataset, need_internals):
w_emb = WordEmbedding(dataset.word_dict.n_tokens, cfg.lm.word_emb_dim, 0.0)
q_emb = QuestionEmbedding(cfg.lm.word_emb_dim, cfg.hid_dim, cfg.lm.n_layers, cfg.lm.bidirectional, cfg.lm.dropout, cfg.lm.rnn_type)
q_dim = cfg.hid_dim
att_cls = topdown_attention.classes[cfg.topdown_att.type]
v_att = att_cls(1, q_dim, cfg.ent_dim, cfg.topdown_att.hid_dim, cfg.topdown_att.dropout)
q_net = FCNet([q_dim, cfg.hid_dim])
v_net = FCNet([cfg.ent_dim, cfg.hid_dim])
classifier = SimpleClassifier(cfg.hid_dim, cfg.mlp.hid_dim, dataset.ans_dict.n_tokens, cfg.mlp.dropout)
return cls(w_emb, q_emb, v_att, q_net, v_net, classifier, need_internals)
数据集目录如下
四、结果可视化
读取了之前训练好的模型之后,使用数据为配置文件中的val,程序运行完成后结果可视化如下
机器对于给出的图片会输出对于的问答结果
五、代码
部分代码如下
训练类
import os
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim.lr_scheduler import LambdaLR
from torch.nn.utils import clip_grad_norm_
from bisect import bisect
from tqdm import tqdm
def bce_with_logits(logits, labels):
assert logits.dim() == 2
loss = F.binary_cross_entropy_with_logits(logits, labels)
loss *= labels.size(1) # multiply by number of QAs
return loss
def sce_with_logits(logits, labels):
assert logits.dim() == 2
loss = F.cross_entropy(logits, labels.nonzero()[:, 1])
loss *= labels.size(1)
return loss
def compute_score_with_logits(logits, labels):
with torch.no_grad():
logits = torch.max(logits, 1)[1] # argmax
one_hots = torch.zeros(*labels.size()).cuda()
one_hots.scatter_(1, logits.view(-1, 1), 1)
scores = (one_hots * labels)
return scores
def lr_schedule_func_builder(cfg):
def func(step_idx):
if step_idx <= cfg.train.warmup_steps:
alpha = float(step_idx) / float(cfg.train.warmup_steps)
return cfg.train.warmup_factor * (1. - alpha) + alpha
else:
idx = bisect(cfg.train.lr_steps, step_idx)
return pow(cfg.train.lr_ratio, idx)
return func
def train(model, cfg, train_loader, val_loader, n_epochs, val_freq, out_dir):
os.makedirs(out_dir, exist_ok=True)
optim = torch.optim.Adamax(model.parameters(), **cfg.train.optim)
n_train_batches = len(train_loader)
train_score = 0.0
loss_fn = bce_with_logits if cfg.model.loss == "logistic" else sce_with_logits
for epoch in range(n_epochs):
epoch_loss = 0.0
tic_0 = time.time()
for i, data in enumerate(train_loader):
tic_1 = time.time()
q_tokens = data[2].cuda()
a_targets = data[3].cuda()
v_features = [_.cuda() for _ in data[4:]]
tic_2 = time.time()
optim.zero_grad()
logits, _ = model(q_tokens, *v_features)
loss = loss_fn(logits, a_targets)
tic_3 = time.time()
loss.backward()
if cfg.train.clip_grad: clip_grad_norm_(model.parameters(), cfg.train.max_grad_norm)
optim.step()
tic_4 = time.time()
batch_score = compute_score_with_logits(logits, a_targets).sum()
epoch_loss += float(loss.data.item() * logits.size(0))
train_score += float(batch_score)
del loss
logstr = "epoch %2d batch %4d/%4d | ^ %4dms | => %4dms | <= %4dms" % \
(epoch + 1, i + 1, n_train_batches, 1000*(tic_2-tic_0), 1000*(tic_3-tic_2), 1000*(tic_4-tic_3))
print("%-80s" % logstr, end="\r")
tic_0 = time.time()
epoch_loss /= len(train_loader.dataset)
train_score = 100 * train_score / len(train_loader.dataset)
logstr = "epoch %2d | train_loss: %5.2f train_score: %5.2f" % (epoch + 1, epoch_loss, train_score)
if (epoch + 1) % val_freq == 0:
model.eval()
val_score, upper_bound = validate(model, val_loader)
model.train()
logstr += " | val_score: %5.2f (%5.2f)" % (100 * val_score, 100 * upper_bound)
print("%-80s" % logstr)
model_path = os.path.join(out_dir, 'model_%d.pth' % (epoch + 1))
torch.save(model.state_dict(), model_path)
def validate(model, loader):
score = 0
upper_bound = 0
n_qas = 0
with torch.no_grad():
for i, data in enumerate(loader):
q_tokens = data[2].cuda()
a_targets = data[3].cuda()
v_features = [_.cuda() for _ in data[4:]]
logits, _ = model(q_tokens, *v_features)
batch_score = compute_score_with_logits(logits, a_targets)
score += batch_score.sum()
upper_bound += (a_targets.max(1)[0]).sum()
n_qas += logits.size(0)
logstr = "val batch %5d/%5d" % (i + 1, len(loader))
print("%-80s" % logstr, end='\r')
score = score / n_qas
upper_bound = upper_bound / n_qas
return score, upper_bound
infer类
import os
import time
import json
import torch
import cv2
import shutil
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm
colors = [ (175, 84, 65), (68, 194, 246), (136, 147, 65), (92, 192, 151) ]
def attention_map(im, boxes, atts, p=0.8, bgc=1.0, compress=0.85, box_color=(65, 81, 226)):
height, width, channel = im.shape
im = im / 255.0
att_map = np.zeros([height, width])
boxes = boxes.astype(np.int)
for box, att in zip(boxes, atts):
x1, y1, x2, y2 = box
roi = att_map[y1:y2, x1:x2]
roi[roi < att] = att
att_map /= att_map.max()
att_map = att_map ** p
att_map = att_map * compress + (1-compress)
att_map = cv2.resize(att_map, (int(width/16), int(height/16)))
att_map = cv2.resize(att_map, (width, height))
att_map = np.expand_dims(att_map, axis=2)
bg = np.ones_like(att_map) * bgc
att_im = im * att_map + bg * (1-att_map)
att_im = (att_im * 255).astype(np.uint8)
center = np.argmax(atts)
x1, y1, x2, y2 = boxes[center]
cv2.rectangle(att_im, (x1, y1), (x2, y2), box_color, 5)
return att_im
def infer_visualize(model, args, cfg, ans_dict, loader):
_, ckpt = os.path.split(args.checkpoint)
ckpt, _ = os.path.splitext(ckpt)
out_dir = os.path.join(args.out_dir, "%s_%s_%s_visualization" % (args.cfg_name, ckpt, args.data))
os.makedirs(out_dir, exist_ok=True)
model.eval()
questions_path = cfg.data[args.data].composition[0].q_jsons[0]
questions = json.load(open(questions_path))
pbar = tqdm(total=args.n_batches * loader.batch_size)
with torch.no_grad():
for i, data in enumerate(loader):
if i == args.n_batches: break
question_ids = data[0]
image_ids = data[1]
q_tokens = data[2].cuda()
obj_featuers = data[4].cuda()
batch_boxes = data[5].numpy()
logits, internals = model(q_tokens, obj_featuers)
topdown_atts = internals[0]
topdown_atts = topdown_atts.data.cpu().numpy()
_, predictions = logits.max(dim=1)
for idx in range(len(question_ids)):
question_id = question_ids[idx]
image_id = image_ids[idx]
boxes = batch_boxes[idx]
answer = ans_dict.idx2ans[predictions[idx]]
q_entry = questions[question_id]
topdown_att = topdown_atts[idx]
question = q_entry["question"]
gts = list(q_entry["answers"].items())
gts = sorted(gts, reverse=True, key=lambda x: x[1])
gt = gts[0][0]
q_out_dir = os.path.join(out_dir, question_id)
os.makedirs(q_out_dir, exist_ok=True)
q_str = question + "\n" + "gt: %s\n" % gt + "answer: %s\n" % answer
with open(os.path.join(q_out_dir, "qa.txt"), "w") as f: f.write(q_str)
image_path = os.path.join(args.images_dir, "%s.jpg" % image_id)
shutil.copy(image_path, os.path.join(q_out_dir, "original.jpg"))
im = cv2.imread(image_path)
att_map = attention_map(im.copy(), boxes, topdown_att)
cv2.imwrite(os.path.join(q_out_dir, "topdown_att.jpg"), att_map)
pbar.update(1)
创作不易 觉得有帮助请点赞关注收藏~~~