随着地理信息系统(GIS)技术的发展,地理位置数据在城市规划、商业分析、旅游推荐等多个领域发挥着越来越重要的作用。POI(Point of Interest,兴趣点)数据作为地理信息的重要组成部分,提供了丰富的地点信息,如餐厅、酒店、景点等,对于研究和应用具有重要意义,本篇文章旨在通过Python编程语言,结合高德地图API,实现对指定区域内的POI数据的高效获取和处理。
参考文章:Python使用高德API批量下载POI数据 (qq.com)
先讲一下方法思路,一共四个步骤;
方法思路
- 通过高德拾取坐标,生成矩形,打印出来坐标
- 获取POI数据——通过调用高德地图API
- GeoJSON转成csv
- 坐标转换——高德坐标系(GCJ-02) to WGS84
本篇文章就以上海比较著名的上海城隍庙广场为例,讲一讲,为什么选择一个比较小的范围内,还是因为高德的POI API 获取限额的原因,高德把这部分业务高度商业化了,所以给到个人开发者的额度就非常少,当然如果有企业开发者账号,或者通过购买额度的方式,就没什么限制~
第一步,通过高德拾取坐标:坐标拾取器 | 高德地图API (amap.com),先拾取要获取POI范围的坐标中心点,生成边长为500m的矩形,这里需要更改范围的话,自行调整即可,把这个side_length_meters = 500,调整成需要的范围即可,因为这里的地理坐标使用的是高德坐标系(GCJ-02),所以生成的范围也是高德坐标系下的结果,打印出来坐标;
完整代码#运行环境Python 3.11
import math
def get_rectangle_corners_gps(center_lng, center_lat, side_length_meters=500):
# 地球平均半径,单位为米
earth_radius = 6371000
# 每度纬度对应的米数
meters_per_degree_lat = math.pi * earth_radius / 180
# 每度经度对应的米数(在给定纬度处)
meters_per_degree_lng = meters_per_degree_lat * math.cos(math.radians(center_lat))
# 计算纬度和经度的变化量
delta_lat = side_length_meters / meters_per_degree_lat / 2
delta_lng = side_length_meters / meters_per_degree_lng / 2
# 计算四个角的坐标
top_left = (center_lng - delta_lng, center_lat + delta_lat)
top_right = (center_lng + delta_lng, center_lat + delta_lat)
bottom_left = (center_lng - delta_lng, center_lat - delta_lat)
bottom_right = (center_lng + delta_lng, center_lat - delta_lat)
return [top_left, top_right, bottom_left, bottom_right]
# 使用示例
center_lng, center_lat = 121.508092, 31.098532
side_length_meters = 500
# 包含四个角的经纬度坐标的列表,顺序为左上角、右上角、左下角、右下角
corners = get_rectangle_corners_gps(center_lng, center_lat, side_length_meters)
print("矩形四个角的坐标为:", corners)
然后把得到的坐标放到下面的代码部分进行替换,另外,这里的key需要填一下,有多个的话用逗号隔开,关键词也需要选择一下,KEYWORD = "餐厅",这里生成网格的大小也可以根据实际进行调整,关键词可以改成"酒店"、"公司"、"公园"等等,可以参考高德地图 API POI 分类对照表:poi-type-list.pdf (xdc.at);
KEYS = ["你的key"] # 改成自己的key
SAVE_DIR = r"D:\data\gaode" # 数据保存目录
GRID_SIZE = 0.001 # 网格大小为0.001度(大约为100m)
KEYWORD = 餐厅" # 查询关键字,可以修改为其他关键词
polygon_coords = [
[121.49064782905262, 31.227263304014794],
[121.49590617094739, 31.227263304014794],
[121.49064782905262, 31.222766695985204],
[121.49590617094739, 31.222766695985204]
] # 查询的多边形区域坐标,顺序为左上角、右上角、左下角、右下角
完整代码#运行环境Python 3.11
import requests
import json
import os
import math
from concurrent.futures import ThreadPoolExecutor
# 全局变量
KEYS = ["你的key"] # 高德地图API的Key列表,可以添加多个以应对请求限制
SAVE_DIR = r"D:\data\gaode" # 数据保存目录
GRID_SIZE = 0.001 # 网格大小为0.001度(大约为100m)
KEYWORD = "餐厅" # 查询关键字,可以修改为其他关键词
polygon_coords = [
[121.49064782905262, 31.227263304014794],
[121.49590617094739, 31.227263304014794],
[121.49064782905262, 31.222766695985204],
[121.49590617094739, 31.222766695985204]
] # 查询的多边形区域坐标
NUM_THREADS = 4 # 线程数量,可以根据需求调整
def file_exists(polygon, keyword, page):
"""检查文件是否已经存在"""
lng_min, lat_min = polygon[0][0], polygon[2][1]
filename = f"{SAVE_DIR}/poi_{lng_min}_{lat_min}_{keyword}_page{page}.geojson"
return os.path.exists(filename)
def save_poi_data(polygon, keyword, page, data):
"""保存POI数据到GeoJSON文件"""
if not os.path.exists(SAVE_DIR):
os.makedirs(SAVE_DIR) # 如果目录不存在则创建
lng_min, lat_min = polygon[0][0], polygon[2][1]
filename = f"{SAVE_DIR}/poi_{lng_min}_{lat_min}_{keyword}_page{page}.geojson"
with open(filename, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4) # 保存数据为GeoJSON格式
return True
def fetch_poi_data(polygon, key, keyword):
"""获取POI数据,支持分页"""
lng_min, lat_min = polygon[0][0], polygon[2][1]
page = 1
while True:
if file_exists(polygon, keyword, page):
print(f"文件已存在,跳过:网格: {polygon},页数: {page}")
page += 1
continue
polygon_str = f"{lng_min},{lat_min}|{polygon[1][0]},{polygon[1][1]}"
api_url = f"https://restapi.amap.com/v3/place/polygon?polygon={polygon_str}&keywords={keyword}&key={key}&page={page}"
try:
response = requests.get(api_url, timeout=10)
response.raise_for_status() # 检查HTTP响应状态码
data = response.json()
infocode = data.get("infocode")
if infocode == "10000" and data.get("pois"):
save_poi_data(polygon, keyword, page, data)
print(f"下载成功!网格: {polygon_str},页数: {page}")
page += 1
if len(data.get("pois")) < 20: # 当POI数据少于20时,说明已经是最后一页
break
elif infocode in ["10001", "10003", "10004"]:
print(f"Key 出现问题,infocode: {infocode},切换到下一个Key进行重试...")
return False # 返回False以便切换Key进行重试
else:
print(f"请求失败或无数据,infocode: {infocode},信息: {data.get('info')}")
break
except Exception as e:
print(f"请求异常,跳过此Key:{str(e)}")
break
return True # 下载成功或完成所有页数时返回True
def generate_grids(polygon_coords):
"""生成网格"""
min_lng = min([coord[0] for coord in polygon_coords]) # 获取多边形区域的最小经度
max_lng = max([coord[0] for coord in polygon_coords]) # 获取多边形区域的最大经度
min_lat = min([coord[1] for coord in polygon_coords]) # 获取多边形区域的最小纬度
max_lat = max([coord[1] for coord in polygon_coords]) # 获取多边形区域的最大纬度
grids = []
lng_steps = math.ceil((max_lng - min_lng) / GRID_SIZE) # 计算经度方向上的网格数量
lat_steps = math.ceil((max_lat - min_lat) / GRID_SIZE) # 计算纬度方向上的网格数量
for i in range(lng_steps):
for j in range(lat_steps):
grid_min_lng = min_lng + i * GRID_SIZE # 计算当前网格的最小经度
grid_max_lng = min(grid_min_lng + GRID_SIZE, max_lng) # 计算当前网格的最大经度
grid_min_lat = min_lat + j * GRID_SIZE # 计算当前网格的最小纬度
grid_max_lat = min(grid_min_lat + GRID_SIZE, max_lat) # 计算当前网格的最大纬度
grid_polygon = [
[grid_min_lng, grid_max_lat],
[grid_max_lng, grid_max_lat],
[grid_max_lng, grid_min_lat],
[grid_min_lng, grid_min_lat]
]
grids.append(grid_polygon)
return grids
def download_poi_for_grid(polygon, key, keyword):
"""下载单个网格的POI数据"""
success = fetch_poi_data(polygon, key, keyword)
if not success:
return False # 返回False以便线程外层处理Key切换
return True
def main():
grids = generate_grids(polygon_coords)
with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
for idx, grid in enumerate(grids):
key_idx = 0
while key_idx < len(KEYS):
key = KEYS[key_idx]
future = executor.submit(download_poi_for_grid, grid, key, KEYWORD)
if future.result():
break # 如果下载成功或完成,则跳出循环
key_idx += 1
if key_idx == len(KEYS):
print(f"所有Key均不可用,跳过此网格: {grid}")
if __name__ == "__main__":
main()
运行成功的话会打印出下面的结果;
接下来就是把得到的GeoJSON转成csv,这里会生成多个GeoJSON文件的原因是:多边形搜索接口中每个边界范围能够获取到的POI数量是有限制的,超过该限制的POI数据则不会返回,这里设置的是每个多边形内POI数量不超过20个以防止不返回结果,也就是20条POI数据会生成一个GeoJSON文件,我们需要把多个GeoJSON提取需要的标签,并统一合并成一个csv,我们先打开一个GeoJSON看一下POI标签内容;
标签包括name、type、address、等等这里就列举部分,具体标签解释可以参考官方文档:搜索POI 2.0-高级 API 文档-开发指南-Web服务 API | 高德地图API (amap.com)
接下来,我们获取一些关键的标签,这里仅获取了id、name、location、address、adname、cityname、type这些标签,有需要其他标签的,可以自行添加,获取这些标签的同时把这些GeoJSON合并成一个csv;
完整代码#运行环境Python 3.11
import json
import csv
from glob import glob
def merge_pois_from_files(file_pattern, output_file):
all_pois = []
# 获取所有匹配给定模式的文件路径
file_paths = sorted(glob(file_pattern))
for input_file_path in file_paths:
try:
# 打开文件并读取内容,指定编码(根据你的文件实际编码,这里使用 utf-8 作为示例)
with open(input_file_path, 'r', encoding='utf-8') as f:
# 加载 JSON 数据
data = json.load(f)
# 检查是否存在 'pois' 键
if 'pois' in data:
# 遍历 pois 列表
for poi in data['pois']:
# 提取所需字段
id = poi.get('id', '未找到')
name = poi.get('name', '未找到')
location = poi.get('location', '未找到')
address = poi.get('address', '未找到')
adname = poi.get('adname', '未找到')
cityname = poi.get('cityname', '未找到')
type = poi.get('type', '未找到')
# 将提取的信息添加到列表中
all_pois.append({
'id': id,
'name': name,
'location': location,
'address': address,
'adname': adname,
'cityname': cityname,
'type': type
})
else:
# 如果文件中不包含 'pois' 键,打印错误信息
print(f"{input_file_path} does not contain 'pois' key.")
except FileNotFoundError:
# 捕获文件未找到的错误
print(f"File not found: {input_file_path}")
except json.JSONDecodeError:
# 捕获 JSON 解析错误
print(f"Invalid JSON format in {input_file_path}.")
except Exception as e:
# 捕获其他未预料的错误
print(f"Unexpected error occurred while processing {input_file_path}: {e}")
# 写入 CSV 文件
write_to_csv(all_pois, output_file)
def write_to_csv(pois_list, output_file_path):
# 定义 CSV 文件的字段名
fieldnames = ['id', 'name', 'location', 'address', 'adname', 'cityname', 'type']
# 写入 CSV 文件
with open(output_file_path, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# 写入表头
writer.writeheader()
# 写入数据
for poi in pois_list:
writer.writerow(poi)
if __name__ == '__main__':
# 输入文件模式
input_file_pattern = r'D:\data\gaode\poi_121.49064782905262,31.222766695985204_餐厅*page*.geojson'
# 输出文件路径
output_file_path = 'D:\data\gaode\pois.csv'
# 合并所有文件中的 POIs 并写入 CSV 文件
merge_pois_from_files(input_file_pattern, output_file_path)
我们把csv的坐标列手动分列一下,并把坐标从高德坐标系(GCJ-02)转到WGS84,批量转换工具:地图坐标系批量转换 - 免费在线工具 (latlongconverter.online);
并把csv导入arcgis/arcgispro进行展示,可以看到周边餐饮POI的分布情况;
先说两个高德对POI获取的限制,多边形搜索接口中每个边界范围能够获取到的POI数量是有限制的,超过该限制的POI数据则不会返回。所以有学者用四叉树索引的概念,将大的多边形不断四分,直到所有小多边形均满足数量限制为止,但是所谓道高一尺魔高一丈,现在对个人开发者的多边形搜索限额也降到了100条/天,意味着你是个人开发者且只使用免费额度的话,你一天可以获取的POI极其有限,而且一次仅可以获取一个类型的POI,并且会出现POI接口返回数据是不完全的情况,这个导致对额度的需求几何倍增加,可以参考这篇文章:2024年5月最新高德poi数据采集科普 (qq.com);
结论:利用Python和高德地图API批量获取POI数据路径上可以实现,现阶段基本上都是利用高德开放的api接口:多边形搜索,把需要获取的多边形切分为无数个满足返回上限的小多边形的方式来遍历POI数据,但是高德POI数据本身的商业程度很高,并且通过限制接口及配额两种方法来增加POI数据免费获取的难度,以至于获取城市量级的POI可行性基本上不高,需要购买额度,或者购买企业账号。
文章仅用于分享个人学习成果与个人存档之用,分享知识,如有侵权,请联系作者进行删除。所有信息均基于作者的个人理解和经验,不代表任何官方立场或权威解读。