参考自: Camelyon16数据集切块预处理
区别是这里做了批量处理
数据集目录格式:
** main.py**
# !/usr/bin/python3
# -*- coding: utf-8 -*-
# @Time : 2024/9/4 20:21
# @Author : 猫娜Lisa
# @File : camelyon16_get_patch.py
# @Software: PyCharm
import os
import json
import numpy as np
import xml.etree.ElementTree as ET
import openslide # 这个的下载有点子麻烦的哦
import cv2
import shutil
from PIL import Image
from skimage.color import rgb2hsv
from skimage.filters import threshold_otsu
from multiprocessing import Pool
from pathos.multiprocessing import ProcessingPool as paPool
from config import train_root, test_root
from config import root, wsi_dir
from config import xml_dir, json_dir
from config import tumor_npy_dir, tissue_npy_dir, no_tumor_npy_dir
from config import tumor_txt_dir, no_tumor_txt_dir, normal_txt_dir
from config import tumor_patch_dir, no_tumor_patch_dir, normal_patch_dir
from config import level, RGB_min, patch_number, patch_size, level_patch, num_process
from config import if_trans_xml_to_json, if_get_tumor_mask_npy, if_get_tissue_mask_npy, if_get_no_tumor_mask_npy, if_get_sample_txt
from config import (if_get_train_tumor_patches, if_get_train_no_tumor_patches, if_get_train_normal_patches,
if_get_test_tumor_patches, if_get_test_no_tumor_patches, if_get_test_normal_patches)
# 将xml标注转换为json格式
# 每个注释都是多边形列表,其中每个多边形都由其顶点表示。阳性多边形表示肿瘤区域,阴性多边形表示正常区域。在本阶段,将标注格式转换成更简单的 .json 格式。
# xml_dir, json_dir, wsi_names
def camelyon16xml2json(xml_dir, json_dir, wsi_names):
"""
Convert an annotation of camelyon16 xml format into a json format.
Arguments:
inxml: string, path to the input camelyon16 xml format
outjson: string, path to the output json format
"""
for wsi_name in wsi_names:
inxml = xml_dir + wsi_name + '.xml'
outjson = json_dir + wsi_name + '.json'
root = ET.parse(inxml).getroot()
annotations_tumor = \
root.findall('./Annotations/Annotation[@PartOfGroup="Tumor"]')
annotations_0 = \
root.findall('./Annotations/Annotation[@PartOfGroup="_0"]')
annotations_1 = \
root.findall('./Annotations/Annotation[@PartOfGroup="_1"]')
annotations_2 = \
root.findall('./Annotations/Annotation[@PartOfGroup="_2"]')
annotations_positive = \
annotations_tumor + annotations_0 + annotations_1
annotations_negative = annotations_2
json_dict = {}
json_dict['positive'] = []
json_dict['negative'] = []
for annotation in annotations_positive:
X = list(map(lambda x: float(x.get('X')),
annotation.findall('./Coordinates/Coordinate')))
Y = list(map(lambda x: float(x.get('Y')),
annotation.findall('./Coordinates/Coordinate')))
vertices = np.round([X, Y]).astype(int).transpose().tolist()
name = annotation.attrib['Name']
json_dict['positive'].append({'name': name, 'vertices': vertices})
for annotation in annotations_negative:
X = list(map(lambda x: float(x.get('X')),
annotation.findall('./Coordinates/Coordinate')))
Y = list(map(lambda x: float(x.get('Y')),
annotation.findall('./Coordinates/Coordinate')))
vertices = np.round([X, Y]).astype(int).transpose().tolist()
name = annotation.attrib['Name']
json_dict['negative'].append({'name': name, 'vertices': vertices})
with open(outjson, 'w') as f:
json.dump(json_dict, f, indent=1)
# 获得tumor区域的mask
# 本阶段利用json标注得到tumor区域的mask文件,格式为 .npy 。
def get_tumor_mask(wsi_dir, level, json_dir, tumor_npy_dir, wsi_names):
for wsi_name in wsi_names:
wsi_path = wsi_dir + wsi_name + '.tif'
json_path = json_dir + wsi_name + '.json'
tumor_npy_path = tumor_npy_dir + wsi_name + '.npy'
slide = openslide.OpenSlide(wsi_path)
w, h = slide.level_dimensions[level]
mask_tumor = np.zeros((h, w)) # the init mask, and all the value is 0
factor = slide.level_downsamples[level] # get the factor of level * e.g. level 6 is 2^6
with open(json_path) as f:
dicts = json.load(f)
tumor_polygons = dicts['positive']
for tumor_polygon in tumor_polygons:
# plot a polygon
vertices = np.array(tumor_polygon["vertices"]) / factor
vertices = vertices.astype(np.int32)
cv2.fillPoly(mask_tumor, [vertices], (255))
mask_tumor = mask_tumor[:] > 127
mask_tumor = np.transpose(mask_tumor)
np.save(tumor_npy_path, mask_tumor) # 获得Tumor_001.tif在level_6下的tumor区域掩码
# 获得tissue区域的mask
# 使用大津算法进行图像分割即可获得组织区域。RGB_min可以手动调整,确定最低阈值。可以将tissue_mask转化为二值图像保存下来看看效果.
def get_tissue_mask(wsi_dir, level, tissue_npy_dir, RGB_min, wsi_names):
for wsi_name in wsi_names:
wsi_path = wsi_dir + wsi_name + '.tif'
tissue_npy_path = tissue_npy_dir + wsi_name + '.npy'
slide = openslide.OpenSlide(wsi_path)
img_RGB = np.transpose(np.array(slide.read_region((0, 0),
level,
slide.level_dimensions[level]).convert('RGB')),
axes=[1, 0, 2])
img_HSV = rgb2hsv(img_RGB)
background_R = img_RGB[:, :, 0] > threshold_otsu(img_RGB[:, :, 0])
background_G = img_RGB[:, :, 1] > threshold_otsu(img_RGB[:, :, 1])
background_B = img_RGB[:, :, 2] > threshold_otsu(img_RGB[:, :, 2])
tissue_RGB = np.logical_not(background_R & background_G & background_B)
tissue_S = img_HSV[:, :, 1] > threshold_otsu(img_HSV[:, :, 1])
min_R = img_RGB[:, :, 0] > RGB_min
min_G = img_RGB[:, :, 1] > RGB_min
min_B = img_RGB[:, :, 2] > RGB_min
tissue_mask = tissue_S & tissue_RGB & min_R & min_G & min_B
np.save(tissue_npy_path, tissue_mask) # 获得Tumor_001.tif在level_6下的组织掩码
# img = Image.fromarray(tissue_mask)
# img.save('tumor_001_tissue.png') # 可以保存二值图像看看效果如何
# 获得no_tumor区域的mask
# tissue区域包含了tumor和no_tumor,所以只需要通过tissue_mask和tumor_mask做一下逻辑运算即可得到no_tumor区域的mask。
def get_no_tumor_mask(tumor_npy_dir, tissue_npy_dir, no_tumor_npy_dir, wsi_names):
for wsi_name in wsi_names:
tumor_npy_path = tumor_npy_dir + wsi_name + '.npy'
tissue_npy_path = tissue_npy_dir + wsi_name + '.npy'
no_tumor_npy_path = no_tumor_npy_dir + wsi_name + '.npy'
tumor_mask = np.load(tumor_npy_path)
tissue_mask = np.load(tissue_npy_path)
no_tumor_mask = tissue_mask & (~ tumor_mask)
np.save(no_tumor_npy_path, no_tumor_mask)
# 随机采样各组织(tumor、no_tumor)区域。
# 一张WSI就可以切出来成千上万块patch,但并不需要全部的,只需要在每张WSI中采样出一定数量就可以了。
# 采样原理比较简单,由于前面拿到的都是WSI 在level 6 下的mask,大概1k * 2k的分辨率,直接在低分辨率的mask中采样一些点,
# 得到采样点在level 6下的坐标,再乘以缩放倍数就能算出他们在level 0 下的坐标(patch的中心点坐标)。得到采样坐标txt文件。
def sample_from_mask(npy_dir, patch_number, level, txt_dir, wsi_names):
for wsi_name in wsi_names:
npy_path = npy_dir + wsi_name + '.npy'
txt_path = txt_dir + wsi_name + '.txt'
mask_tissue = np.load(npy_path)
X_idcs, Y_idcs = np.where(mask_tissue)
centre_points = np.stack(np.vstack((X_idcs.T, Y_idcs.T)), axis=1)
if centre_points.shape[0] > patch_number:
sampled_points = centre_points[np.random.randint(centre_points.shape[0],
size=patch_number), :]
else:
sampled_points = centre_points # 点数不够就全要
sampled_points = (sampled_points * 2 ** level).astype(np.int32) # make sure the factor
mask_only_name = os.path.split(npy_path)[-1].split(".")[0]
name = np.full((sampled_points.shape[0], 1), mask_only_name)
center_points = np.hstack((name, sampled_points))
with open(txt_path, "a") as f:
np.savetxt(f, center_points, fmt="%s", delimiter=",")
# 得到patch数据集
# 根据采样点的坐标,在level 0 下切割WSI即可得到patch。需要对tumor和no_tumor分别操作,
# 得到两类patch。还需要对测试集切块,都是一样的流程。仅以训练集的tumor切块举例。
def process(opts): # , patch_size, wsi_path, level_patch, patch_dir
j, pid, x_center, y_center, wsi_path, patch_size, level_patch, patch_dir = opts
x = int(int(x_center) - patch_size / 2)
y = int(int(y_center) - patch_size / 2)
slide = openslide.OpenSlide(wsi_path)
img = slide.read_region((x,y),level_patch,(patch_size,patch_size)).convert('RGB')
img.save(os.path.join(patch_dir,pid+'_'+str(100000+j)+'.png'))
# 得到patch数据集
def get_patches(txt_dir, wsi_dir, num_process, patch_size, level_patch, patch_dir, wsi_names):
for wsi_name in wsi_names:
txt_path = txt_dir + wsi_name + '.txt'
wsi_path = wsi_dir + wsi_name + '.tif'
opt_list = []
with open(txt_path) as f:
for j, line in enumerate(f):
pid, x_center, y_center = line.strip('\n').split(',')
# pid为不带后缀的文件名字,如tumor_001
opt_list.append((j, pid, x_center, y_center, wsi_path, patch_size, level_patch, patch_dir))
# print(j)
pool = Pool(processes=num_process)
# print(opt_list)
pool.map(process, opt_list)
# pool.close()
# pool.join()
def camelyon16_process(wsi_names):
# 将xml标注转换为json格式
if if_trans_xml_to_json:
shutil.rmtree(json_dir)
os.mkdir(json_dir)
camelyon16xml2json(xml_dir, json_dir, wsi_names)
# 获得tumor区域的mask
if if_get_tumor_mask_npy:
shutil.rmtree(tumor_npy_dir)
os.mkdir(tumor_npy_dir)
get_tumor_mask(wsi_dir, level, json_dir, tumor_npy_dir, wsi_names)
# 获得tissue区域的mask
if if_get_tissue_mask_npy:
shutil.rmtree(tissue_npy_dir)
os.mkdir(tissue_npy_dir)
get_tissue_mask(wsi_dir, level, tissue_npy_dir, RGB_min, wsi_names)
# 获得no_tumor区域的mask
if if_get_no_tumor_mask_npy:
shutil.rmtree(no_tumor_npy_dir)
os.mkdir(no_tumor_npy_dir)
get_no_tumor_mask(tumor_npy_dir, tissue_npy_dir, no_tumor_npy_dir, wsi_names)
if if_get_train_tumor_patches or if_get_test_tumor_patches:
npy_dir = tumor_npy_dir
patch_dir = tumor_patch_dir
txt_dir = tumor_txt_dir
elif if_get_train_no_tumor_patches or if_get_test_no_tumor_patches:
npy_dir = no_tumor_npy_dir
patch_dir = no_tumor_patch_dir
txt_dir = no_tumor_txt_dir
elif if_get_train_normal_patches or if_get_test_normal_patches:
npy_dir = tissue_npy_dir
patch_dir = normal_patch_dir
txt_dir = normal_txt_dir
else:
assert False
# 随机采样各组织(tumor、no_tumor)区域。
if if_get_sample_txt:
shutil.rmtree(txt_dir)
os.mkdir(txt_dir)
sample_from_mask(npy_dir, patch_number, level, txt_dir, wsi_names)
# 得到patch数据集
shutil.rmtree(patch_dir)
os.mkdir(patch_dir)
get_patches(txt_dir, wsi_dir, num_process, patch_size, level_patch, patch_dir, wsi_names)
if __name__=='__main__':
# 获取wsi文件夹下所有的tif文件名
wsi_names = []
if if_get_train_tumor_patches or if_get_train_no_tumor_patches or if_get_train_normal_patches:
for wsi_name in os.listdir(wsi_dir):
if os.path.splitext(wsi_name)[1] == '.tif':
wsi_names.append(wsi_name.split('.')[0])
elif if_get_test_tumor_patches or if_get_test_no_tumor_patches:
wsi_all_names = []
for wsi_name in os.listdir(wsi_dir):
if os.path.splitext(wsi_name)[1] == '.tif':
wsi_all_names.append(wsi_name.split('.')[0])
xml_names = []
for xml_name in os.listdir(xml_dir):
if os.path.splitext(xml_name)[1] == '.xml':
xml_names.append(xml_name.split('.')[0])
wsi_names = list(set(wsi_all_names)&set(xml_names))
else:
wsi_all_names = []
for wsi_name in os.listdir(wsi_dir):
if os.path.splitext(wsi_name)[1] == '.tif':
wsi_all_names.append(wsi_name.split('.')[0])
xml_names = []
for xml_name in os.listdir(xml_dir):
if os.path.splitext(xml_name)[1] == '.xml':
xml_names.append(xml_name.split('.')[0])
wsi_names = list(set(wsi_all_names).difference(set(wsi_all_names) & set(xml_names)))
print(wsi_names)
camelyon16_process(wsi_names)
config.py
# !/usr/bin/python3
# -*- coding: utf-8 -*-
# @Time : 2024/9/5 20:18
# @Author : 猫娜Lisa
# @File : config.py
# @Software: PyCharm
train_root = 'xxx\\CAMELYON16\\train\\'
test_root = 'xxx\\CAMELYON16\\test\\'
# train
root = train_root
# tumor
wsi_dir = train_root + 'tumor\\'
# normal
# wsi_dir = train_root + 'normal\\'
# test
# root = test_root
# wsi_dir = test_root + 'image\\'
xml_dir = root + 'util_annotations\\lesion_annotations\\'
json_dir = root + 'util_annotations\\json_annotations\\'
tumor_npy_dir = root + 'util_tumor\\tumor_npy\\'
no_tumor_npy_dir = root + 'util_no_tumor\\no_tumor_npy\\'
tissue_npy_dir = root + 'util_tissue_npy\\' # mask文件输出路径
tumor_txt_dir = root + 'util_tumor\\tumor_txt\\'
no_tumor_txt_dir = root + 'util_no_tumor\\no_tumor_txt\\'
normal_txt_dir = root + 'util_normal\\normal_txt\\'
tumor_patch_dir = root + 'train_patch\\tumor' # patch输出文件夹路径
no_tumor_patch_dir = root + 'train_patch\\no_tumor' # patch输出文件夹路径
normal_patch_dir = root + 'train_patch\\normal' # patch输出文件夹路径
level = 6 # at which WSI level to obtain the mask
RGB_min = 50 # min value for RGB channel
patch_number = 10 # 采样点数 1000
patch_size = 224 # patch 的尺寸 默认256*256大小
level_patch = 0 # 默认在level 0 切割WSI
num_process = 2 # 进程数,使用多进程切块要快得多 16
#
if_trans_xml_to_json = True
if_get_tumor_mask_npy = True
if_get_tissue_mask_npy = True
if_get_no_tumor_mask_npy = True
if_get_sample_txt = True
#
if_get_train_tumor_patches = True
if_get_train_no_tumor_patches = False
if_get_train_normal_patches = False
if_get_test_tumor_patches = False
if_get_test_no_tumor_patches = False
if_get_test_normal_patches = False