本文介绍使用Python的多线程技术,提高happybase模块和gdal模块的效率,从tif格式的影像文件中读取数据,并将其存储到HBase数据库中。主要步骤包括:
- 准备工作:安装Python环境,安装happybase模块和gdal模块,安装HBase数据库,并准备tif影像文件。
- 读取tif影像数据:使用readTif函数读取tif影像数据集,并获取其宽度、高度、波段数、数据数组、仿射变换参数和投影信息。遍历tif影像文件所在的文件夹,获取tif影像文件的日期和分块信息。
- 写入HBase数据库:创建一个happybase连接对象,并获取或创建一个happybase表对象。遍历每个分块,使用readTif函数读取每个分块的每个日期的每个波段的数据,并将其存储到一个三维的numpy数组中。遍历每个像素,将其对应的每个波段的每个日期的数据组合成一个字典,作为HBase表中的列值。使用分块编号、行号和列号拼接成一个字符串,作为HBase表中的行键。使用put方法将行键和列值写入HBase表中。关闭happybase连接对象。
- 使用多线程技术:导入threading模块,创建一个信号量对象,用于限制线程的最大数量。遍历每个分块,使用信号量对象的acquire方法获取一个信号量,然后创建一个新的线程对象,并指定目标函数为load函数,以及传递分块编号、分块列表和日期列表作为参数。然后调用start方法启动该线程。在load函数中,在完成数据的读取和写入后,使用信号量对象的release方法释放一个信号量,并关闭happybase连接对象。记录程序运行的开始时间和结束时间,并计算程序运行的总时间。
一、环境准备
- 安装Python环境,本文使用的是Anaconda3。
- 安装happybase模块,可以使用pip或conda命令。例如:
pip install happybase
- 安装gdal模块,可以使用pip或conda命令。例如:
conda install gdal
- 启动分布式集群,hadoop以及hbase;启动thrift服务,可以使用hbase-daemon.sh脚本。例如:
hbase-daemon.sh start thrift
就可以使用happybase模块连接到thrift服务,并操作HBase数据库了
- 准备tif格式的影像文件,并放在一个文件夹中。本文使用的是Sentinel-2卫星的10个波段的影像数据,分为多个日期和多个分块。
二、定义读取tif影像数据的函数
- 导入需要的模块,包括time、happybase、gdal、numpy、pandas、os、tqdm和threading。例如:
import time
import happybase
from osgeo import gdal
import numpy as np
import pandas as pd
import os
from tqdm import tqdm
import threading
- 定义一个函数readTif,用于读取tif格式的影像数据集,并返回其宽度、高度、波段数、数据数组、仿射变换参数和投影信息。例如:
# 读取tif数据集
def readTif(fileName, xoff=0, yoff=0, data_width=0, data_height=0):
dataset = gdal.Open(fileName)
num_bands = dataset.RasterCount
# print(num_bands)
if dataset == None:
print(fileName + "文件无法打开")
# 栅格矩阵的列数
width = dataset.RasterXSize
# 栅格矩阵的行数
height = dataset.RasterYSize
# 波段数
bands = dataset.RasterCount
# 获取数据
if (data_width == 0 and data_height == 0):
data_width = width
data_height = height
data = dataset.ReadAsArray(xoff, yoff, data_width, data_height)
# 获取仿射矩阵信息
geotrans = dataset.GetGeoTransform()
# 获取投影信息
proj = dataset.GetProjection()
return width, height, bands, data, geotrans, proj
- 获取tif影像文件所在的文件夹路径,并遍历该文件夹下所有以.tif为后缀名的文件。例如:
# 分块影像所在文件夹,不能有中文
tifDir = r"E:\pyimg\tif2csv\S2SR10mallband3tile"
tifs = [i for i in os.listdir(tifDir) if i.endswith(".tif")]
print("有 %s 个tif文件" % len(tifs))
- 获取tif影像文件的日期和分块信息,并去重排序。例如:
# 获取目标文件数量,前缀相同的
bandlist=['B2','B3','B4','B5','B6','B7','B8','B8A','B11','B12']
datelist1 = []
fenkuailist1 = []
for i in tifs:
datelist1.append(i[:-26])
fenkuailist1.append(i[-25:-4])
datelist = list(set(datelist1))
datelist.sort(key=datelist1.index)
fenkuailist = list(set(fenkuailist1))
fenkuailist.sort(key=fenkuailist1.index)
print("有 %s 个日期" % len(datelist))
print("datelist" , datelist)
print("每个日期 %s 个块" % len(fenkuailist))
print("fenkuailist" , fenkuailist)
三、创建happybase连接和表对象
- 创建一个happybase连接对象,并指定HBase数据库的IP地址。例如:
connection = happybase.Connection('192.168.1.100')
# # before first use:
connection.open()
- 获取或创建一个happybase表对象,并指定表名和列族名。例如:
table = connection.table('rawdata')
四、定义写入HBase数据库的函数
-
定义一个函数load,用于读取和写入一个分块的数据。该函数接受分块编号、分块列表和日期列表作为参数。该函数的主要步骤如下:
- 打印当前分块的编号。
- 初始化一个三维的numpy数组,用于存储该分块的所有波段和所有日期的数据。
- 遍历每个日期,使用readTif函数读取该分块的每个日期的每个波段的数据,并将其存储到numpy数组中。
- 打印写入中的提示。
- 遍历每个像素,将其对应的每个波段的每个日期的数据组合成一个字典,作为HBase表中的列值。使用分块编号、行号和列号拼接成一个字符串,作为HBase表中的行键。使用put方法将行键和列值写入HBase表中。
- 关闭happybase连接对象。
def load(kuai,fenkuailist,datelist):
connection = happybase.Connection('192.168.1.100')
# # before first use:
connection.open()
table = connection.table('rawdata')
print("(%d/%d)块编号:"%(kuai+1,len(fenkuailist)),fenkuailist[kuai])
# 初始化立方体
img_file = tifDir + "\\" + datelist[0] + "-" + fenkuailist[kuai] + ".tif"
im_width, im_height, im_bands, im_data, kuai_im_geotrans, kuai_im_proj = readTif(img_file)
tmpttt = np.empty((im_bands, im_width * im_height, len(datelist)))
# print("波段 %s 个" % im_bands)
# print("行列数", im_width, im_height)
for shijian in range(len(datelist)):
# 图像
img_file = tifDir + "\\" + datelist[shijian] + "-" + fenkuailist[kuai] + ".tif"
# print(img_file)
im_width, im_height, im_bands, im_data, im_geotrans, im_proj = readTif(img_file)
kuai_im_geotrans = im_geotrans
kuai_im_proj=im_proj
for j in range(im_bands):
tmpttt[j, :, shijian] = im_data[j].flatten(order='C')
print("写入中...")
for index in tqdm(range(im_width * im_height)):
dt={}
for ban in range(im_bands):
d1=zip(map(lambda x:"f1:"+x+bandlist[ban],datelist),tmpttt[ban, index, :].astype(str))
# Converting zip object to dict using dict() contructor.
dt.update(d1)
key=str(kuai%3)+fenkuailist[kuai][6:10]+fenkuailist[kuai][-4:]+str(index)
table.put(key, dt) # 提交数据,0001代表行键,写入的数据要使用字典形式表示
connection.close() # 关闭传输
五、使用多线程技术
- 导入threading模块,该模块提供了多线程的支持,可以创建和管理线程,以及实现线程间的同步和通信。
- 使用time模块的perf_counter函数,记录程序运行的开始时间。例如:
# #计时开始
start = time.perf_counter()
- 使用threading模块的Semaphore类,创建一个信号量对象,用于限制线程的最大数量。信号量对象有一个内部计数器,每当一个线程调用acquire方法时,计数器减一,每当一个线程调用release方法时,计数器加一。如果计数器为零,那么acquire方法将阻塞,直到其他线程调用release方法。例如:
sem=threading.Semaphore(5) #限制线程的最大数量为5
- 遍历每个分块,使用sem对象的acquire方法获取一个信号量,然后使用threading模块的Thread类,创建一个新的线程对象,并指定目标函数为load函数,以及传递分块编号、分块列表和日期列表作为参数。然后调用start方法启动该线程。例如:
for kuai in range(30,72):
sem.acquire()
threading.Thread(target = load, args = (kuai,fenkuailist,datelist)).start()
- 在load函数中,在完成数据的读取和写入后,使用sem对象的release方法释放一个信号量,并关闭happybase连接对象。例如:
def load(kuai,fenkuailist,datelist):
connection = happybase.Connection('192.168.1.100')
# # before first use:
connection.open()
table = connection.table('rawdata')
print("(%d/%d)块编号:"%(kuai+1,len(fenkuailist)),fenkuailist[kuai])
# 初始化立方体
img_file = tifDir + "\\" + datelist[0] + "-" + fenkuailist[kuai] + ".tif"
im_width, im_height, im_bands, im_data, kuai_im_geotrans, kuai_im_proj = readTif(img_file)
tmpttt = np.empty((im_bands, im_width * im_height, len(datelist)))
# print("波段 %s 个" % im_bands)
# print("行列数", im_width, im_height)
for shijian in range(len(datelist)):
# 图像
img_file = tifDir + "\\" + datelist[shijian] + "-" + fenkuailist[kuai] + ".tif"
# print(img_file)
im_width, im_height, im_bands, im_data, im_geotrans, im_proj = readTif(img_file)
kuai_im_geotrans = im_geotrans
kuai_im_proj=im_proj
for j in range(im_bands):
tmpttt[j, :, shijian] = im_data[j].flatten(order='C')
print("写入中...")
for index in tqdm(range(im_width * im_height)):
dt={}
for ban in range(im_bands):
d1=zip(map(lambda x:"f1:"+x+bandlist[ban],datelist),tmpttt[ban, index, :].astype(str))
# Converting zip object to dict using dict() contructor.
dt.update(d1)
key=str(kuai%3)+fenkuailist[kuai][6:10]+fenkuailist[kuai][-4:]+str(index)
table.put(key, dt) # 提交数据,0001代表行键,写入的数据要使用字典形式表示
connection.close() # 关闭传输
sem.release()
- 使用time模块的perf_counter函数,记录程序运行的结束时间,并计算程序运行的总时间。例如:
# #计时结束
delta = time.perf_counter()-start
print("程序运行的时间是:{}秒".format(delta))