背景
因为最近三年用的 Go 版本是 1.16,但最新的版本升级到了 1.23,很多依赖的三方包最新文件都已经升级,使用了泛型以及 GO 新版本的特性,导致我只能适配 Go1.16 的三方包旧版本,但这种问题发生的频率多了后,自然就感觉到了麻烦和落后,所以打算升级 Go 版本。
但升级到哪个版本?我的考虑的重心有一点在于主流 Go 服务在用哪个版本。
而社区环境,最佳的选择当然是 Github。
所以,首先目标是统计 Github 上 Go 项目中使用的版本分布情况。
统计目标
Github 上,Go 项目数量有一百六十多万:
抓取全部仓库数据进行统计,有点不现实。
所以我根据 starts 数量分析,最后确定抓取 stars 数量大于 200 的仓库数据:
总共只有一万多一点,所以主流和有价值的仓库占比,确实很少,大部分都是私人仓库而已。
数据抓取方式
Github 有开放 API,通过在 GitHub 账号重心生成 Token,就可以调用。
API 包括仓库搜索、仓库详情信息获取等,详细可以查阅 官方 API 文档。
使用 GITHUB_TOKEN 鉴权,调用 API 存在部分限制,比如:
- GitHub Search API 每次搜索,分页每页最多 100,每次查询结果分页,超过 1000 个结果就会报错 422。
- GitHub API 的速率限制为 5000 次请求/小时。
所以,在使用 API 时,结合统计目标的总数量,要考虑到上述两个限制。
数据抓取脚本
import requests
import base64
from collections import defaultdict
import time
import os
import csv
import configparser
from concurrent.futures import ThreadPoolExecutor, as_completed
# 读取配置文件
config = configparser.ConfigParser()
config.read('config.ini')
GITHUB_TOKEN = config['GITHUB']['TOKEN']
# 设置请求头,包含认证信息
headers = {}
if GITHUB_TOKEN:
headers = {
'Authorization': f'token {GITHUB_TOKEN}'
}
# 检查 API 速率限制
def check_rate_limit(headers):
remaining = int(headers.get('X-RateLimit-Remaining', 0))
reset_time = int(headers.get('X-RateLimit-Reset', time.time()))
if remaining == 0:
sleep_time = reset_time - time.time() + 1 # 等待到重置时间
print(f"API 请求达到速率限制,等待 {sleep_time} 秒...")
time.sleep(sleep_time)
else:
print(f"剩余请求次数: {remaining}")
# 构建 GitHub API 查询 URL
def build_search_url(query_params, page=1, per_page=100):
base_url = f"https://api.github.com/search/repositories"
query = '+'.join(query_params) # 将查询条件组装成字符串
url = f"{base_url}?q={query}&sort=stars&order=desc&per_page={per_page}&page={page}"
return url
# 获取 Go 仓库信息,增加重试机制以避免数据丢失
def search_go_repos(query_params, page=1, per_page=100, max_retries=3):
url = build_search_url(query_params, page=page, per_page=per_page)
retries = 0
while retries < max_retries:
try:
response = requests.get(url, headers=headers, timeout=10) # 设置超时为 10 秒
# 检查速率限制
check_rate_limit(response.headers)
if response.status_code == 200:
return response.json()['items']
else:
print(f"API 请求失败,状态码: {response.status_code}")
return []
except requests.Timeout:
retries += 1
print(f"请求超时: {url},重试第 {retries} 次")
time.sleep(2) # 重试前等待 2 秒
except requests.RequestException as e:
print(f"请求发生错误: {e}")
retries += 1
time.sleep(2) # 重试前等待 2 秒
print(f"请求失败超过最大重试次数,跳过该页数据: {url}")
return []
# 获取仓库的 go.mod 文件内容,并提取 Go 版本号
def get_go_version(owner, repo):
url = f"https://api.github.com/repos/{owner}/{repo}/contents/go.mod"
response = requests.get(url, headers=headers)
# 检查速率限制
check_rate_limit(response.headers)
if response.status_code == 200:
content = response.json().get('content')
if content:
decoded_content = base64.b64decode(content).decode('utf-8')
for line in decoded_content.splitlines():
if line.startswith('go '):
return line.split()[1]
return None
# 使用并发来加速处理仓库详细信息
def process_repos_with_concurrency(repos, version_counts, repos_data, max_workers=5):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_repo = {executor.submit(get_go_version, repo['owner']['login'], repo['name']): repo for repo in repos}
for future in as_completed(future_to_repo):
repo = future_to_repo[future]
owner = repo['owner']['login']
repo_name = repo['name']
repo_url = repo['html_url']
stars = repo['stargazers_count']
created_at = repo['created_at']
updated_at = repo['pushed_at']
try:
version = future.result()
if version:
version_counts[version] += 1
repos_data.append([repo_name, repo_url, stars, version, created_at, updated_at])
else:
repos_data.append([repo_name, repo_url, stars, '未检测到版本号', created_at, updated_at])
except Exception as e:
print(f"处理仓库 {repo_name} 时出错: {e}")
# 统计 Go 版本分布并导出仓库信息到 CSV
def collect_and_save_repo_data(query_params, start_page=1, max_pages=1, per_page=10, max_workers=5):
version_counts = defaultdict(int)
repos_data = []
all_repos = [] # 用于收集所有返回的仓库
for page in range(start_page, start_page + max_pages):
print(f"正在处理第 {page}/{start_page + max_pages - 1} 页的数据...")
repos = search_go_repos(query_params, page, per_page)
if not repos:
break
all_repos.extend(repos) # 将所有查询结果收集到一起
# 并发处理仓库 go.mod 文件
process_repos_with_concurrency(repos, version_counts, repos_data, max_workers)
# 每处理一页数据就将数据写入 CSV 文件
write_to_csv("go_repos_info.csv", repos_data)
repos_data = [] # 清空 repos_data 以便处理下一页数据
return version_counts, all_repos # 返回版本统计和所有仓库信息
# 判断文件是否存在,追加数据或创建新文件
def write_to_csv(filename, repos_data):
file_exists = os.path.isfile(filename) # 判断文件是否存在
mode = 'a' if file_exists else 'w' # 如果文件存在则以追加模式打开,否则以写模式创建
with open(filename, mode=mode, newline='', encoding='utf-8') as file:
writer = csv.writer(file)
# 如果文件不存在,写入表头
if not file_exists:
writer.writerow(['仓库名', '仓库地址', 'Star 数量', 'Go 版本号', '创建时间', '最近更新时间'])
# 写入仓库数据
writer.writerows(repos_data)
# 自动调整 star 范围并分页查询
def paginate_through_stars(start_star=131084, min_star=300, per_page=100, max_workers=5):
current_star = start_star
next_star = None
while current_star > min_star:
query_params = [f'language:Go', f'stars:{min_star}..{current_star}']
# 获取总仓库数和总页数
total_repos, total_pages = get_total_repos_and_pages(query_params, per_page)
print(f"正在查询 stars: <{current_star} 的范围,找到 {total_repos} 个仓库")
if total_repos == 0:
print(f"在 stars: <{current_star} 范围内没有找到仓库,程序终止。")
break
# 设置查询的最大页数
max_pages = min(total_pages, 10) # 一次最多查询 10 页
version_counts, all_repos = collect_and_save_repo_data(query_params, start_page=1, max_pages=max_pages, per_page=per_page, max_workers=max_workers)
# 获取所有仓库中最小的 star 数,确保正确的 star 数排序
if all_repos:
sorted_repos = sorted(all_repos, key=lambda repo: repo['stargazers_count'], reverse=False)
next_star = sorted_repos[0]['stargazers_count'] - 1 # 获取最小 star 数,且减1。(stars:300..4175 查询的是 star 数 大于等于 300 且小于等于 4175 的仓库,所以要减1,避免重复)
print(f"调整下一个查询的 star 范围为 stars: <{next_star}")
current_star = next_star # 更新查询范围
else:
print("无法找到下一个 star 范围,程序结束。")
break
# 输出统计结果
def print_version_stats(version_counts):
print("\nGo 版本分布统计结果:")
for version, count in sorted(version_counts.items(), key=lambda x: x[0]):
print(f"Go 版本: {version}, 使用次数: {count}")
# 获取总仓库数量和页数
def get_total_repos_and_pages(query_params, per_page=100):
url = build_search_url(query_params, page=1, per_page=per_page)
response = requests.get(url, headers=headers)
# 检查速率限制
check_rate_limit(response.headers)
if response.status_code == 200:
total_count = response.json()['total_count']
total_pages = (total_count // per_page) + 1
return total_count, total_pages
else:
print(f"无法获取总仓库数量,状态码: {response.status_code}")
return 0, 0
# 获取当前 GITHUB TOKEN 的剩余次数和恢复时间
def get_rate_limit():
url = "https://api.github.com/rate_limit"
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
remaining = data['rate']['remaining']
reset_time = data['rate']['reset'] # 重置时间(UNIX 时间戳)
reset_time_human = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(reset_time)) # 将重置时间转换为可读格式
print(f"当前剩余请求次数: {remaining}")
print(f"请求次数将在 {reset_time_human} 重置")
# 返回剩余次数和重置时间,供其他地方使用
return remaining, reset_time_human
else:
print(f"无法获取速率限制信息,状态码: {response.status_code}")
return None, None
if __name__ == "__main__":
# 设置起始 Star 数和最小 Star 数
# START_STAR = 141084 # 最大的 Star 数
START_STAR = 475 # 最大的 Star 数
MIN_STAR = 200 # 最小的 Star 数
PER_PAGE = 100
MAX_WORKERS = 10
# 自动分页查询并处理数据
paginate_through_stars(START_STAR, MIN_STAR, PER_PAGE, MAX_WORKERS)
# 检查当前 TOKEN 状态
# get_rate_limit()
这个脚本通过控制 stars 条件、分页 条件,查询所有 Go 仓库数据,获取 Go 仓库下 go.mod 中的版本定义。
并且加入了并发处理,保证查询速度,同时考虑到 Token 的限制,所以还加了一个 TOKEN 请求速度检查和等待机制。
直接在 python 环境下运行就可以,实践过的。
Go 分布统计结果
得到的数据表如下图所示:
对数据做了处理后,统计结果如下:
一目了然,目前社区中主流的版本为 1.22、1.21。
所以,我也打算把 Go 版本先升级到 1.22。