农产品价格报告爬虫使用说明

news2025/2/2 10:02:42

农产品价格报告爬虫使用说明

# **************************************************************************
# *                                                                          *
# *                      农产品价格报告爬虫                                   *
# *                                                                          *
# *                        作者: xiaohai                                     *
# *                        版本: v1.0.0                                      *
# *                        日期: 2024-12-05                                  *
# *                                                                          *
# *        功能说明:                                                         *
# *            1. 日度报告                                                   *
# *               - 生成今日分析报告                                         *
# *               - 生成指定日期报告                                         *
# *               - 包含价格指数、分品类分析等                                *
# *                                                                          *
# *            2. 周度报告                                                   *
# *               - 生成本周分析报告                                         *
# *               - 生成指定周报告                                           *
# *               - 汇总周内价格变化                                         *
# *                                                                          *
# *            3. 价格走势                                                   *
# *               - 农产品价格200指数走势                                    *
# *               - 猪肉价格全国走势                                         *
# *               - 猪肉价格区域走势                                         *
# *               - 粮油价格指数走势                                         *
# *                                                                          *
# *            4. 数据导出                                                   *
# *               - 支持Excel格式导出                                        *
# *               - 包含多个数据分类                                         *
# *               - 支持时间范围选择                                         *
# *                                                                          *
# *        : 农业农村部市场信息中心                                   *
# *        版权声明: 仅用于学习交流                                          *
# *                                                                          *
# **************************************************************************

import os
import json
import logging
import requests
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import pandas as pd
import warnings
import urllib3
import sys
import subprocess
import pkg_resources
from bs4 import BeautifulSoup
import re
import time

# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
warnings.filterwarnings('ignore')

# 配置常量
VERSION = 'v1.0.0'
AUTHOR = 'xiaohai'
DATA_SOURCE = '农业农村部市场信息中心'

# API配置
API_BASE_URL = 'https://ncpscxx.moa.gov.cn'
API_ENDPOINTS = {
    'price_index': '/product/common-price-index/getIndexList',
    'variety_list': '/product/sys-variety/selectList',
    'price_trend': '/product/price-info/getPriceInfoList',
    'market_list': '/product/sys-market/selectList',
    'daily_price': '/product/price-info/getDailyPrice',
    'analysis_report': '/product/analysis-report/pageList'
}

# 输出目录配置
OUTPUT_DIRS = {
    'base': 'reports',
    'daily': 'reports/daily',
    'weekly': 'reports/weekly'
}

# 图表样式配置
CHART_STYLE = {
    'figure': {
        'figsize': (12, 6),
        'facecolor': '#f8fcfa'
    },
    'grid': {
        'linestyle': '--',
        'alpha': 0.3,
        'color': 'gray'
    },
    'line': {
        'marker': 'o',
        'markersize': 4,
        'linewidth': 2
    },
    'colors': {
        'blue': '#40a9ff',
        'green': '#73d13d',
        'orange': '#ffa940',
        'red': '#ff4d4f',
        'purple': '#9254de',
        'cyan': '#36cfc9'
    }
}

def check_and_install_packages():
    """检查并安装所需的包"""
    required_packages = {
        'requests': 'requests',      # HTTP请求
        'pandas': 'pandas',          # 数据处理
        'matplotlib': 'matplotlib',  # 绘图支持
        'urllib3': 'urllib3',        # HTTP客户端
        'openpyxl': 'openpyxl',     # Excel支持
        'colorama': 'colorama'       # 控制台颜色
    }
    
    print("\n" + "="*50)
    print("检查并安装依赖包...")
    print("="*50)
    
    try:
        import colorama
        colorama.init()
        success_mark = colorama.Fore.GREEN + "✓" + colorama.Style.RESET_ALL
        error_mark = colorama.Fore.RED + "✗" + colorama.Style.RESET_ALL
    except ImportError:
        success_mark = "✓"
        error_mark = "✗"
    
    all_success = True
    for package, import_name in required_packages.items():
        try:
            pkg_resources.require(package)
            print(f"{success_mark} {package:15} 已安装")
        except (pkg_resources.DistributionNotFound, pkg_resources.VersionConflict):
            print(f"{error_mark} {package:15} 未安装,正在安装...")
            try:
                subprocess.check_call([
                    sys.executable, 
                    "-m", 
                    "pip", 
                    "install", 
                    "--disable-pip-version-check",
                    "--no-cache-dir",
                    package
                ], stdout=subprocess.DEVNULL)
                print(f"{success_mark} {package:15} 安装成功")
            except Exception as e:
                print(f"{error_mark} {package:15} 安装失败: {str(e)}")
                all_success = False
    
    print("\n依赖包检查" + ("全部完成" if all_success else "存在问题"))
    print("="*50 + "\n")
    
    if not all_success:
        print("某些依赖包安装失败,程序能无法正常运行!")
        if input("是否继续运行?(y/n): ").lower() != 'y':
            sys.exit(1)

class ReportCrawler:
    """农产品价格报告爬虫"""
    
    def __init__(self):
        # 禁用SSL警告
        warnings.filterwarnings('ignore')
        
        # 基础配置
        self._setup_directories()
        self._setup_logger()
        self._setup_api()
        
    def _setup_directories(self):
        """创建输出目录"""
        self.output_dir = "reports"
        self.daily_dir = os.path.join(self.output_dir, "daily")
        self.weekly_dir = os.path.join(self.output_dir, "weekly")
        
        for d in [self.output_dir, self.daily_dir, self.weekly_dir]:
            if not os.path.exists(d):
                os.makedirs(d)
                
    def _setup_logger(self):
        """配置日志"""
        log_file = os.path.join("logs", f"crawler_{datetime.now().strftime('%Y%m%d')}.log")
        os.makedirs("logs", exist_ok=True)
        
        formatter = logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
        )
        
        # 文件处理器
        file_handler = logging.FileHandler(log_file, encoding='utf-8')
        file_handler.setFormatter(formatter)
        
        # 制台处理器
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(formatter)
        
        # 配置日志器
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.INFO)
        self.logger.addHandler(file_handler)
        self.logger.addHandler(console_handler)
        
    def _setup_api(self):
        """配置API"""
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
            'Origin': 'https://ncpscxx.moa.gov.cn',
            'Referer': 'https://ncpscxx.moa.gov.cn/',
            'Accept': 'application/json, text/plain, */*',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Content-Type': 'application/json;charset=UTF-8'
        }

    def show_menu(self):
        """显示功能菜单"""
        menu = """
农产品价格报告爬虫系统
====================
1. 成今日分析报告
2. 生成本周分报告
3. 生成指定日期报告
4. 生成指定周报告
5. 生成价格指数走势图
6. 生成猪肉价格走势图
7. 生成区域价格走势图
8. 生成粮油价格走势图
9. 导出Excel数据
0. 退出系统

请输入���能编号(0-9): """
        print("\n" + "="*50)  # 添加分隔线
        choice = input(menu)
        print("="*50 + "\n")  # 添加分隔线
        return choice

    def run(self):
        """运行系统"""
        while True:
            choice = self.show_menu()
            
            if choice == "0":
                print("感谢使用,再见!")
                break
                
            elif choice == "1":
                print("正在生成今日分析报告...")
                self.generate_daily_report(datetime.now())
                
            elif choice == "2":
                print("正在生成本周分析报告...")
                today = datetime.now()
                self.generate_weekly_report(today.year, int(today.strftime("%W")))
                
            elif choice == "3":
                date_str = input("请输入日期(格式:YYYY-MM-DD): ")
                try:
                    date = datetime.strptime(date_str, "%Y-%m-%d")
                    self.generate_daily_report(date)
                except:
                    print("日期格式错误!")
                    
            elif choice == "4":
                year = int(input("请输入年份: "))
                week = int(input("请输入周数(1-52): "))
                self.generate_weekly_report(year, week)
                
            elif choice == "5":
                days = int(input("请输入要查看的天数: "))
                end = datetime.now()
                start = end - timedelta(days=days)
                self.plot_index_trend(start, end)
                
            elif choice == "6":
                days = int(input("请输入要查看的天数: "))
                end = datetime.now()
                start = end - timedelta(days=days)
                self.plot_pig_price_trend(start, end)
                
            elif choice == "7":
                days = int(input("请输入要查看的天数: "))
                end = datetime.now()
                start = end - timedelta(days=days)
                self.plot_pig_price_region_trend(start, end)
                
            elif choice == "8":
                days = int(input("请输入要查看的天数: "))
                end = datetime.now()
                start = end - timedelta(days=days)
                self.plot_grain_price_trend(start, end)
                
            elif choice == "9":
                days = int(input("请输入要导天数: "))
                end = datetime.now()
                start = end - timedelta(days=days)
                self.export_data(start, end)
                
            else:
                print("无效的选择,请重试!")
            
            input("\n按回车键继续...")

    def _make_request(self, url, method='get', params=None, data=None):
        """发送HTTP请求
        
        Args:
            url: 请求URL
            method: 请求方法,支持 'get'/'post'
            params: URL参数
            data: POST数据
            
        Returns:
            Response对象或None(请求失败)
        """
        try:
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
            }
            
            if method.lower() == 'get':
                response = requests.get(
                    url,
                    params=params,
                    headers=headers,
                    verify=False,
                    timeout=10
                )
            else:
                response = requests.post(
                    url,
                    params=params,
                    json=data,  # 添加json参数支持
                    headers=headers,
                    verify=False,
                    timeout=10
                )
            
            response.raise_for_status()
            return response
            
        except requests.exceptions.RequestException as e:
            self.logger.error(f"请求失败: {str(e)}")
            return None

    def fetch_daily_report(self, date):
        """获取日度价格报告"""
        try:
            url = f"{API_BASE_URL}/api/FarmDaily/list"
            data = {
                "daylyDate": date.strftime("%Y-%m-%d")
            }
            
            response = self._make_request(url, method='post', data=data)
            if not response:
                return None
            
            data = response.json()
            if data.get("code") == 200 and data.get("content",{}).get("list"):
                # 找到指定日期的报告
                target_date = date.strftime("%Y-%m-%d")
                for report in data["content"]["list"]:
                    if report["daylyDate"].startswith(target_date):
                        # 提取所需数据
                        return {
                            "conclusion": report["counclesion"],
                            "indexConclusion": report["indexConclusion"],
                            "animalConclusion": report["animalConclusion"],
                            "aquaticConclusion": report["aquaticConclusion"],
                            "vegetablesConclusion": report["vegetablesConclusion"],
                            "fruitsConclusion": report["fruitsConclusion"],
                            "content": report["countent"],
                            "incOrReduRange": report["incOrReduRange"]
                        }
                        
                self.logger.warning(f"未找到{target_date}的报告")
                return None
            
            self.logger.warning(f"获取数据失败: {data.get('message', '未知错误')}")
            return None
            
        except Exception as e:
            self.logger.error(f"获取日度报告出错: {str(e)}")
            return None

    def _extract_conclusions(self, report):
        """从报告中提取各类结论"""
        try:
            return {
                "index": report.get("indexConclusion", ""),
                "animal": report.get("animalConclusion", ""),
                "aquatic": report.get("aquaticConclusion", ""),
                "vegetables": report.get("vegetablesConclusion", ""),
                "fruits": report.get("fruitsConclusion", ""),
                "range": report.get("incOrReduRange", "")
            }
        except Exception as e:
            self.logger.error(f"提取论出错: {str(e)}")
            return {}

    def fetch_index_data(self, start_date, end_date):
        """获取价格指数数据"""
        try:
            url = "https://pfsc.agri.cn/price_portal/pi-info-day/getPortalPiInfoDay"
            response = requests.post(url, headers=self.headers, verify=False)
            data = response.json()
            
            if data["code"] == 200:
                result = []
                for item in data["content"]:
                    pub_date = datetime.strptime(item["publishDate"], "%Y-%m-%d")
                    if start_date <= pub_date <= end_date:
                        result.append({
                            "日期": item["publishDate"],
                            "农产品批发价格200指数": item["agriculture"],
                            "粮油指数": item["grainAndOil"],
                            "篮子数": item["vegetableBasket"]
                        })
                return result
            return None
            
        except Exception as e:
            self.logger.error(f"获取指数数据失败: {str(e)}")
            return None

    def fetch_pig_price_data(self, start_date, end_date):
        """获取猪肉价格数据"""
        try:
            url = f"{API_BASE_URL}{API_ENDPOINTS['variety_list']}"
            params = {'pid': 'MH'}  # 猪肉品类ID
            
            response = self._make_request(url, method='post', params=params)
            if not response:
                return None
            
            data = response.json()
            if data.get("code") == 200 and data.get("data"):
                # 转换数据格式
                result = []
                for item in data["data"]:
                    if start_date <= datetime.strptime(item["date"], "%Y-%m-%d") <= end_date:
                        result.append({
                            "日期": item["date"],
                            "全国": float(item["national"]),
                            "东北": float(item["northEast"]),
                            "华北": float(item["northChina"]),
                            "华东": float(item["eastChina"]),
                            "华中": float(item["centralChina"]),
                            "华南": float(item["southChina"]),
                            "西南": float(item["southWest"])
                        })
                return result
            
            self.logger.warning(f"获取数据失败: {data.get('message', '未知错误')}")
            return None
            
        except Exception as e:
            self.logger.error(f"获取猪肉价格数据失败: {str(e)}")
            return None

    def fetch_grain_price_data(self, start_date, end_date):
        """获取粮油价格数据"""
        try:
            url = f"{API_BASE_URL}{API_ENDPOINTS['variety_list']}"
            params = {'pid': 'TL'}  # 粮油品类ID
            
            response = self._make_request(url, method='post', params=params)
            if not response:
                return None
            
            data = response.json()
            if data.get("code") == 200 and data.get("data"):
                # 转换数据格式
                result = []
                for item in data["data"]:
                    if start_date <= datetime.strptime(item["date"], "%Y-%m-%d") <= end_date:
                        result.append({
                            "日期": item["date"],
                            "通义粮价指数": float(item["grainPriceIndex"]),
                            "通义粮市指数": float(item["grainMarketIndex"]),
                            "通义粮市第1号": float(item["grainMarketNo1"]),
                            "通义粮天指数": float(item["grainDayIndex"]),
                            "通义���指": float(item["grainIndex"]),
                            "通义粮天指数(干粮)": float(item["grainDayDryIndex"])
                        })
                return result
            
            self.logger.warning(f"获取数据失败: {data.get('message', '未知错误')}")
            return None
            
        except Exception as e:
            self.logger.error(f"获取粮油价格数据失败: {str(e)}")
            return None

    def generate_daily_report(self, date):
        """生成每日分析报告"""
        try:
            report_data = self.fetch_daily_report(date)
            if not report_data:
                self.logger.warning(f"未获取到 {date.strftime('%Y-%m-%d')} 的报告数据")
                return
            
            report_file = os.path.join(
                self.daily_dir,
                f"{date.strftime('%Y年%m月%d日')}_价格分析报告.md"
            )
            
            # 使用更清晰模板格式
            content = f"""# {date.strftime('%Y年%m月%d日')} 农产品价格分析报告

## 一、价格指数变化
{report_data["indexConclusion"]}

## 二、分品类分析

### 1. 畜禽产品
{report_data["animalConclusion"]}

### 2. 水产品
{report_data["aquaticConclusion"]}

### 3. 蔬菜
{report_data["vegetablesConclusion"]}

### 4. 水果
{report_data["fruitsConclusion"]}

## 三、价格波动情况
{report_data["incOrReduRange"]}

## 四、数据说明
- 数据来源: {report_data["source"]}
- 生成时间: {datetime.now().strftime('%Y年%m月%d日 %H:%M:%S')}
- 价格单位: 元/斤
- 涨跌幅: 与上一交易日相比

---
*注: 本报告由系统自动生成,仅供参考。*
"""
            
            with open(report_file, "w", encoding="utf-8") as f:
                f.write(content)
            
            self.logger.info(f"分析报告已生成: {report_file}")
            
        except Exception as e:
            self.logger.error(f"生成分析报告失败: {str(e)}")

    def generate_weekly_report(self, year, week):
        """生成周度汇总报告"""
        try:
            start_date = datetime.strptime(f'{year}-W{week:02d}-1', '%Y-W%W-%w')
            end_date = start_date + timedelta(days=6)
            
            print(f"\n正在生成第{week}周报告...")
            print(f"时间范围: {start_date.strftime('%Y-%m-%d')}{end_date.strftime('%Y-%m-%d')}")
            print("="*50)
            
            # 获周内所有报告
            reports = []
            current = start_date
            total_days = (end_date - start_date).days + 1
            
            for i in range(total_days):
                print(f"\r进度: {i+1}/{total_days} ", end="")
                report = self.fetch_daily_report(current)
                if report:
                    reports.append(report)
                current += timedelta(days=1)
            
            if not reports:
                self.logger.warning("本周无可用数据")
                return
            
            # 计算周度汇总数据
            weekly_summary = self._calculate_weekly_summary(reports)
            
            report_file = os.path.join(
                self.weekly_dir,
                f"{year}年第{week:02d}周_{start_date.strftime('%m月%d日')}-{end_date.strftime('%m月%d日')}_价格分析报告.md"
            )
            
            with open(report_file, "w", encoding="utf-8") as f:
                f.write(f"""# {year}年第{week:02d}周农产品价格分析报告
({start_date.strftime('%Y年%m月%d日')}{end_date.strftime('%Y年%m月%d日')})

## 一、本周价格概况
{weekly_summary['overview']}

## 二、价格指数变化
- 周初: {weekly_summary['index_start']}
- 周末: {weekly_summary['index_end']}
- 度变化: {weekly_summary['index_change']}

## 三、分品类周度分析
### 1. 畜禽产品
{weekly_summary['animal_summary']}

### 2. 水产品
{weekly_summary['aquatic_summary']}

### 3. 蔬菜
{weekly_summary['vegetables_summary']}

### 4. 水果
{weekly_summary['fruits_summary']}

## 四、日度价格详情
""")
            
                for report in reports:
                    pub_date = datetime.strptime(report['daylyDate'][:10], '%Y-%m-%d')
                    f.write(f"""### {pub_date.strftime('%Y年%m月%d日')}
1. 价格指数: {report.get('indexConclusion', '暂无数据')}
2. 畜禽产品: {report.get('animalConclusion', '暂无数据')}
3. 水产品: {report.get('aquaticConclusion', '暂无数据')}
4. 蔬菜: {report.get('vegetablesConclusion', '暂无数据')}
5. 水果: {report.get('fruitsConclusion', '暂无数据')}

""")
            
                f.write(f"""## 五、数据说明
- 数据来源: {reports[0]["source"]}
- 生成时间: {datetime.now().strftime('%Y年%m月%d日 %H:%M:%S')}
- 价格单位: 元/公斤
- 跌幅: 与上期相比

---
*注: 本报告由系统自动生成,仅供参考。*""")
            
            print("\n报告生成完成!")
            self.logger.info(f"周度报告已生成: {report_file}")
            
        except Exception as e:
            self.logger.error(f"生成周度报告失败: {str(e)}")

    def _calculate_weekly_summary(self, reports):
        """计算周度汇总数据"""
        summary = {
            'overview': '',
            'index_start': reports[0].get('indexConclusion', '暂无数据'),
            'index_end': reports[-1].get('indexConclusion', '暂无数据'),
            'index_change': '',
            'animal_summary': '',
            'aquatic_summary': '',
            'vegetables_summary': '',
            'fruits_summary': ''
        }
        
        # 计算价格指数变化
        try:
            start_index = float(reports[0]['indexConclusion'].split('为')[1].split(',')[0])
            end_index = float(reports[-1]['indexConclusion'].split('为')[1].split(',')[0])
            change = end_index - start_index
            summary['index_change'] = f"{'上升' if change >= 0 else '下降'}{abs(change):.2f}个点"
        except:
            summary['index_change'] = '数据异常'
        
        # 生成概述
        summary['overview'] = f"本周农产品批发价格200指数从{summary['index_start']},到{summary['index_end']},整体{summary['index_change']}。"
        
        # 其他品类汇总...
        
        return summary

    def plot_index_trend(self, start_date, end_date):
        """绘制价格指数走势图"""
        try:
            data = self.fetch_index_data(start_date, end_date)
            if not data:
                return
            
            plt.figure(figsize=(12, 6), facecolor='#f8fcfa')
            ax = plt.gca()
            ax.set_facecolor('#f8fcfa')
            
            dates = [item["日期"] for item in data]
            indices = [
                ("农品批发价格200指数", "#ffa940"),
                ("菜篮子指", "#73d13d"),
                ("粮油指数", "#40a9ff")
            ]
            
            for name, color in indices:
                values = [item[name] for item in data]
                plt.plot(dates, values, color=color, marker='o',
                        markersize=4, linewidth=2, label=name)
            
            plt.title('农业农村部"农产品批发价格200指数"日度走势图',
                     pad=20, fontsize=12, loc='left')
            plt.grid(True, linestyle='--', alpha=0.3)
            plt.xticks(rotation=45)
            plt.legend(loc='upper right', frameon=False)
            
            plt.tight_layout()
            plt.savefig(
                os.path.join(self.output_dir, "价格指数走势图.png"),
                dpi=300,
                bbox_inches='tight'
            )
            plt.close()
            
            self.logger.info("价格指数走势已生成")
            
        except Exception as e:
            self.logger.error(f"生成价格指数走势图失败: {str(e)}")

    def plot_pig_price_trend(self, start_date, end_date):
        """绘制猪肉价格走势图"""
        try:
            data = self.fetch_pig_price_data(start_date, end_date)
            if not data:
                return
            
            plt.figure(figsize=(12, 6), facecolor='#f8fcfa')
            ax = plt.gca()
            ax.set_facecolor('#f8fcfa')
            
            dates = [item["日期"] for item in data]
            values = [item["全国"] for item in data]
            
            plt.plot(dates, values, color='#40a9ff', marker='o',
                    markersize=4, linewidth=2)
            plt.fill_between(dates, values, color='#e6f7ff', alpha=0.5)
            
            plt.title('"瘦肉型白条猪肉出厂价格指数"全国走势图',
                     pad=20, fontsize=12, loc='left')
            plt.grid(True, linestyle='--', alpha=0.3)
            plt.xticks(rotation=45)
            
            plt.tight_layout()
            plt.savefig(
                os.path.join(self.output_dir, "猪肉价格走势图.png"),
                dpi=300,
                bbox_inches='tight'
            )
            plt.close()
            
            self.logger.info("猪肉价格走势图已生成")
            
        except Exception as e:
            self.logger.error(f"生成猪肉价格走势图失败: {str(e)}")

    def plot_pig_price_region_trend(self, start_date, end_date):
        """绘制猪肉分区域价格走势图"""
        try:
            data = self.fetch_pig_price_data(start_date, end_date)
            if not data:
                return
            
            plt.figure(figsize=(12, 6), facecolor='#f8fcfa')
            ax = plt.gca()
            ax.set_facecolor('#f8fcfa')
            
            dates = [item["日期"] for item in data]
            regions = [
                ("东北", "#40a9ff"),
                ("华南", "#73d13d"),
                ("华", "#ffa940"),
                ("华中", "#ff4d4f"),
                ("华东", "#9254de"),
                ("西南", "#36cfc9")
            ]
            
            for region, color in regions:
                values = [item[region] for item in data]
                plt.plot(dates, values, color=color, marker='o',
                        markersize=4, linewidth=2, label=region)
            
            plt.title('"瘦肉型条猪肉出厂价格指数"区域走势图',
                     pad=20, fontsize=12, loc='left')
            plt.grid(True, linestyle='--', alpha=0.3)
            plt.xticks(rotation=45)
            plt.legend(loc='upper right', frameon=False)
            
            plt.tight_layout()
            plt.savefig(
                os.path.join(self.output_dir, "猪肉价格区域走势图.png"),
                dpi=300,
                bbox_inches='tight'
            )
            plt.close()
            
            self.logger.info("猪肉价格区域走势图已生成")
            
        except Exception as e:
            self.logger.error(f"生成猪肉价格区域走势图失败: {str(e)}")

    def plot_grain_price_trend(self, start_date, end_date):
        """绘制粮油价格走势图"""
        try:
            data = self.fetch_grain_price_data(start_date, end_date)
            if not data:
                return
            
            plt.figure(figsize=(12, 6), facecolor='#f8fcfa')
            ax = plt.gca()
            ax.set_facecolor('#f8fcfa')
            
            dates = [item["日期"] for item in data]
            indices = [
                ("通义粮价指数", "#40a9ff"),
                ("通义粮市指数", "#73d13d"),
                ("通义粮市第1号", "#ffa940"),
                ("通义粮天指数", "#ff4d4f"),
                ("通义粮指", "#9254de"),
                ("通义粮天指数(干粮)", "#36cfc9")
            ]
            
            for name, color in indices:
                values = [item[name] for item in data]
                plt.plot(dates, values, color=color, marker='o',
                        markersize=4, linewidth=2, label=name)
            
            plt.title('中国通义粮油发价格指数走势图',
                     pad=20, fontsize=12, loc='left')
            plt.grid(True, linestyle='--', alpha=0.3)
            plt.xticks(rotation=45)
            plt.legend(loc='upper right', frameon=False)
            
            plt.tight_layout()
            plt.savefig(
                os.path.join(self.output_dir, "粮油价格指数走势图.png"),
                dpi=300,
                bbox_inches='tight'
            )
            plt.close()
            
            self.logger.info("粮油价格指数走势图已生成")
            
        except Exception as e:
            self.logger.error(f"生成粮油价格指数走势失败: {str(e)}")

    def export_data(self, start_date, end_date, format='excel'):
        """导出数据
        
        Args:
            start_date: 开始日期
            end_date: 结束日期
            format: 导出格式,支持 'excel'/'csv'/'json'
        """
        try:
            # 获取数据
            data = {
                'index': self.fetch_index_data(start_date, end_date),
                'pig': self.fetch_pig_price_data(start_date, end_date),
                'grain': self.fetch_grain_price_data(start_date, end_date)
            }
            
            if not any(data.values()):
                return
            
            # 根据格式导出
            if format == 'excel':
                self._export_excel(data, start_date, end_date)
            elif format == 'csv':
                self._export_csv(data, start_date, end_date)
            elif format == 'json':
                self._export_json(data, start_date, end_date)
            else:
                self.logger.error(f"不支持的导出格式: {format}")
            
        except Exception as e:
            self.logger.error(f"导出数据失败: {str(e)}")

    def _clean_text(self, text):
        """清理文本内容"""
        if not text:
            return ""
        # 去除多余空白字符
        text = ' '.join(text.split())
        # 修复可能的标点符号问题
        text = text.replace('。。', '。').replace(',。', '。').replace(';。', '。')
        # 修复中文编码
        text = text.encode('utf-8').decode('utf-8', 'ignore')
        return text

    def _validate_report_data(self, report):
        """验证报告数据完整性"""
        required_fields = [
            "indexConclusion",
            "animalConclusion",
            "aquaticConclusion",
            "vegetablesConclusion",
            "fruitsConclusion"
        ]
        
        is_valid = True
        for field in required_fields:
            if not report.get(field):
                self.logger.warning(f"报告缺少 {field} 数据")
                is_valid = False
                report[field] = "暂无数据"
        
        return is_valid

    def _export_excel(self, data, start_date, end_date):
        """导出Excel数据"""
        try:
            filename = f"价格数据_{start_date.strftime('%Y%m%d')}_{end_date.strftime('%Y%m%d')}.xlsx"
            filepath = os.path.join(self.output_dir, filename)
            
            with pd.ExcelWriter(filepath) as writer:
                # 导出价格指数
                if data.get('index'):
                    df_index = pd.DataFrame(data['index'])
                    df_index.to_excel(writer, sheet_name='价格指数', index=False)
                    
                # 导出猪肉价格
                if data.get('pig'):
                    df_pig = pd.DataFrame(data['pig'])
                    df_pig.to_excel(writer, sheet_name='猪肉价格', index=False)
                    
                # 导出粮油价格
                if data.get('grain'):
                    df_grain = pd.DataFrame(data['grain'])
                    df_grain.to_excel(writer, sheet_name='粮油价格', index=False)
                    
            self.logger.info(f"���据已导出至: {filepath}")
            return True
            
        except Exception as e:
            self.logger.error(f"导出Excel失败: {str(e)}")
            return False

    def fetch_all_categories(self):
        """获取所有品类数据"""
        categories = {
            'MH': '猪肉',
            'SC': '蔬菜',
            'SG': '水果',
            'TL': '粮油',
            'SC': '水产',
            'DJ': '蛋鸡',
            'NR': '牛肉',
            'YR': '羊肉'
        }
        
        result = {}
        for code, name in categories.items():
            try:
                url = f"{API_BASE_URL}{API_ENDPOINTS['variety_list']}"
                params = {'pid': code}
                response = self._make_request(url, method='post', params=params)
                if response and response.json().get("code") == 200:
                    result[name] = response.json().get("data", [])
            except Exception as e:
                self.logger.error(f"获取{name}品类数据失败: {str(e)}")
        return result

    def fetch_market_prices(self, market_id=None, variety_id=None, start_date=None, end_date=None):
        """获取市场价格数据"""
        try:
            url = f"{API_BASE_URL}{API_ENDPOINTS['daily_price']}"
            params = {
                'marketId': market_id,
                'varietyId': variety_id,
                'startDate': start_date.strftime("%Y-%m-%d") if start_date else None,
                'endDate': end_date.strftime("%Y-%m-%d") if end_date else None
            }
            
            response = self._make_request(url, method='post', params=params)
            if response and response.json().get("code") == 200:
                return response.json().get("data", [])
            return None
        except Exception as e:
            self.logger.error(f"获取市场价格数据失败: {str(e)}")
            return None

    def fetch_analysis_reports(self, page=1, page_size=10):
        """获取分析报告列表"""
        try:
            url = f"{API_BASE_URL}{API_ENDPOINTS['analysis_report']}"
            params = {
                'pageNum': page,
                'pageSize': page_size
            }
            
            response = self._make_request(url, method='post', params=params)
            if response and response.json().get("code") == 200:
                return response.json().get("data", {}).get("list", [])
            return None
        except Exception as e:
            self.logger.error(f"获取分析报告失败: {str(e)}")
            return None

    def crawl_all_data(self, start_date, end_date):
        """爬取所有数据"""
        try:
            # 获取所有品类
            categories = self.fetch_all_categories()
            
            # 获取所有市场
            markets_response = self._make_request(
                f"{API_BASE_URL}{API_ENDPOINTS['market_list']}", 
                method='post'
            )
            markets = markets_response.json().get("data", []) if markets_response else []
            
            # 存储结果
            results = {
                'categories': categories,
                'markets': markets,
                'prices': {},
                'reports': []
            }
            
            # 获取每个品类的价格数据
            for category, varieties in categories.items():
                results['prices'][category] = {}
                for variety in varieties:
                    variety_id = variety.get('id')
                    if variety_id:
                        prices = self.fetch_market_prices(
                            variety_id=variety_id,
                            start_date=start_date,
                            end_date=end_date
                        )
                        results['prices'][category][variety.get('name')] = prices
            
            # 获取分析报告
            page = 1
            while True:
                reports = self.fetch_analysis_reports(page=page)
                if not reports:
                    break
                results['reports'].extend(reports)
                page += 1
                
            return results
            
        except Exception as e:
            self.logger.error(f"爬取所有数据失败: {str(e)}")
            return None

    def fetch_weekly_report_content(self, report_id=None):
        """获取周度报告内容"""
        try:
            url = f"{API_BASE_URL}/product/analysis-report/getReportContent"
            params = {'id': report_id} if report_id else None
            
            response = self._make_request(url, method='post', params=params)
            if not response:
                return None
            
            # 解析HTML内容
            soup = BeautifulSoup(response.text, 'html.parser')
            
            # 提取报告基本信息
            title = soup.find('h1', class_='report-title').text.strip()
            date = soup.find('div', class_='report-date').text.strip()
            source = soup.find('div', class_='report-source').text.strip()
            
            # 提取报告主体内容
            content = soup.find('div', class_='report-content')
            
            # 提取表格数据
            tables = []
            for table in content.find_all('table'):
                df = pd.read_html(str(table))[0]
                tables.append(df.to_dict('records'))
                
            # 提取文本内容
            paragraphs = []
            for p in content.find_all('p'):
                text = p.text.strip()
                if text:
                    paragraphs.append(text)
                    
            return {
                'title': title,
                'date': date,
                'source': source,
                'content': {
                    'text': paragraphs,
                    'tables': tables
                }
            }
            
        except Exception as e:
            self.logger.error(f"获取报告内容失败: {str(e)}")
            return None

    def crawl_all_reports(self, start_date=None, end_date=None):
        """爬取所有报告"""
        try:
            reports = []
            page = 1
            
            while True:
                # 获取报告列表
                report_list = self.fetch_analysis_reports(page=page)
                if not report_list:
                    break
                    
                # 过滤日期范围
                if start_date or end_date:
                    filtered_reports = []
                    for report in report_list:
                        report_date = datetime.strptime(report['publishDate'], '%Y-%m-%d')
                        if start_date and report_date < start_date:
                            continue
                        if end_date and report_date > end_date:
                            continue
                        filtered_reports.append(report)
                    report_list = filtered_reports
                    
                # 获取每个报告的详细内容
                for report in report_list:
                    report_content = self.fetch_weekly_report_content(report['id'])
                    if report_content:
                        reports.append({
                            'meta': report,
                            'content': report_content
                        })
                        
                    # 添加延时避免请求过快
                    time.sleep(1)
                    
                page += 1
                
            return reports
            
        except Exception as e:
            self.logger.error(f"爬取报告失败: {str(e)}")
            return None

    def save_reports(self, reports, output_dir='reports'):
        """保存报告到文件"""
        try:
            if not os.path.exists(output_dir):
                os.makedirs(output_dir)
                
            for report in reports:
                # 生成文件名
                date = datetime.strptime(report['meta']['publishDate'], '%Y-%m-%d')
                filename = f"{date.strftime('%Y%m%d')}_{report['meta']['id']}.json"
                filepath = os.path.join(output_dir, filename)
                
                # 保存为JSON文件
                with open(filepath, 'w', encoding='utf-8') as f:
                    json.dump(report, f, ensure_ascii=False, indent=2)
                    
            return True
            
        except Exception as e:
            self.logger.error(f"保存报告失败: {str(e)}")
            return False

if __name__ == "__main__":
    try:
        # 检查并安装依赖包
        check_and_install_packages()
        
        # 运行爬虫
        crawler = ReportCrawler()
        crawler.run()
        
    except KeyboardInterrupt:
        print("\n程序已被用中断")
        sys.exit(0)
    except Exception as e:
        print(f"\n程序运行出错: {str(e)}")
        sys.exit(1) 

一、功能介绍

本程序用于爬取农业农村部发布的农产品价格监测报告,包括以下功能:

1. 日度报告

  • 生成今日分析报告
  • 生成指定日期报告
  • 包含价格指数、分品类分析等

2. 周度报告

  • 生成本周分析报告
  • 生成指定周报告
  • 汇总周内价格变化

3. 价格走势

  • 农产品价格200指数走势
  • 猪肉价格全国走势
  • 猪肉价格区域走势
  • 粮油价格指数走势

4. 数据导出

  • 支持Excel格式导出
  • 包含多个数据分类
  • 支持时间范围选择

二、使用说明

1. 环境要求

  • Python 3.7+
  • 依赖包会自动安装:
    • requests: HTTP请求
    • pandas: 数据处理
    • matplotlib: 绘图支持
    • urllib3: HTTP客户端
    • openpyxl: Excel支持
    • colorama: 控制台颜色

2. 运行方法

python
直接运行程序
python report_crawler.py

3. 功能菜单

农产品价格报告爬虫系统

生成今日分析报告
生成本周分析报告
生成指定日期报告
生成指定周报告
生成价格指数走势图
生成猪肉价格走势图
生成区域价格走势图
生成粮油价格走势图
导出Excel数据
退出系统

4. 输出文件

  • reports/daily: 日度分析报告
  • reports/weekly: 周度分析报告
  • reports: 价格走势图和Excel数据

三、数据来源

  • 农业农村部市场信息中心
  • 数据更新频率: 每日14:00

四、注意事项

  1. 首次运行会自动检查并安装依赖包
  2. 所有数据仅供学习交流使用
  3. 建议使用时设置合理的时间范围
  4. 如遇到问题可查看日志文件

五、更新记录

v1.0.0 (2024-12-05)

  • 实现基础数据爬取功能
  • 支持生成分析报告
  • 支持绘制价格走势图
  • 支持导出Excel数据

六、联系方式

作者: xiaohai
仅用于学习交流

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2289109.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

字节iOS面试经验分享:HTTP与网络编程

字节iOS面试经验分享&#xff1a;HTTP与网络编程 &#x1f31f; 嗨&#xff0c;我是LucianaiB&#xff01; &#x1f30d; 总有人间一两风&#xff0c;填我十万八千梦。 &#x1f680; 路漫漫其修远兮&#xff0c;吾将上下而求索。 目录 字节iOS面试经验分享&#xff1a;HTT…

代码随想录_栈与队列

栈与队列 232.用栈实现队列 232. 用栈实现队列 使用栈实现队列的下列操作&#xff1a; push(x) – 将一个元素放入队列的尾部。 pop() – 从队列首部移除元素。 peek() – 返回队列首部的元素。 empty() – 返回队列是否为空。 思路: 定义两个栈: 入队栈, 出队栈, 控制出入…

【Oracle篇】使用Hint对优化器的执行计划进行干预(含单表、多表、查询块、声明四大类Hint干预)

&#x1f4ab;《博主介绍》&#xff1a;✨又是一天没白过&#xff0c;我是奈斯&#xff0c;从事IT领域✨ &#x1f4ab;《擅长领域》&#xff1a;✌️擅长阿里云AnalyticDB for MySQL(分布式数据仓库)、Oracle、MySQL、Linux、prometheus监控&#xff1b;并对SQLserver、NoSQL(…

论文阅读(九):通过概率图模型建立连锁不平衡模型和进行关联研究:最新进展访问之旅

1.论文链接&#xff1a;Modeling Linkage Disequilibrium and Performing Association Studies through Probabilistic Graphical Models: a Visiting Tour of Recent Advances 摘要&#xff1a; 本章对概率图模型&#xff08;PGMs&#xff09;的最新进展进行了深入的回顾&…

【Matlab高端绘图SCI绘图模板】第006期 对比绘柱状图 (只需替换数据)

1. 简介 柱状图作为科研论文中常用的实验结果对比图&#xff0c;本文采用了3组实验对比的效果展示图&#xff0c;代码已调试好&#xff0c;只需替换数据即可生成相关柱状图&#xff0c;为科研加分。通过获得Nature配色的柱状图&#xff0c;让你的论文看起来档次更高&#xff0…

YOLOv8源码修改(4)- 实现YOLOv8模型剪枝(任意YOLO模型的简单剪枝)

目录 前言 1. 需修改的源码文件 1.1添加C2f_v2模块 1.2 修改模型读取方式 1.3 增加 L1 正则约束化训练 1.4 在tensorboard上增加BN层权重和偏置参数分布的可视化 1.5 增加剪枝处理文件 2. 工程目录结构 3. 源码文件修改 3.1 添加C2f_v2模块和模型读取 3.2 添加L1正则…

后端token校验流程

获取用户信息 前端中只有 await userStore.getInfo() 表示从后端获取数据 在页面中找到info对应的url地址&#xff0c;在IDEA中查找 这里是getInfo函数的声明&#xff0c;我们要找到这个函数的使用&#xff0c;所以点getInfo() Override public JSONObject getInfo() {JSO…

Ansible自动化运维实战--通过role远程部署nginx并配置(8/8)

文章目录 1、准备工作2、创建角色结构3、编写任务4、准备配置文件&#xff08;金甲模板&#xff09;5、编写变量6、编写处理程序7、编写剧本8、执行剧本Playbook9、验证-游览器访问每台主机的nginx页面 在 Ansible 中&#xff0c;使用角色&#xff08;Role&#xff09;来远程部…

C语言自定义数据类型详解(二)——结构体类型(下)

书接上回&#xff0c;前面我们已经给大家介绍了如何去声明和创建一个结构体&#xff0c;如何初始化结构体变量等这些关于结构体的基础知识。下面我们将继续给大家介绍和结构体有关的知识&#xff1a; 今天的主题是&#xff1a;结构体大小的计算并简单了解一下位段的相关知识。…

Maven的单元测试

1. 单元测试的基本概念 单元测试&#xff08;Unit Testing&#xff09; 是一种软件测试方法&#xff0c;专注于测试程序中的最小可测试单元——通常是单个类或方法。通过单元测试&#xff0c;可以确保每个模块按预期工作&#xff0c;从而提高代码的质量和可靠性。 2.安装和配…

Jetson Xavier NX 安装 CUDA 支持的 PyTorch 指南

本指南将帮助开发者完成在 Jetson Xavier NX 上安装 CUDA 支持的 PyTorch。 安装方法 在 Jetson 上安装 Pytorch 只有两种方法。 一种是直接安装他人已经编译好的 PyTorch 轮子&#xff1b;一种是自己从头开始开始构建 PyTorch 轮子并且安装。 使用轮子安装 可以从我的 Gi…

GWO优化GRNN回归预测matlab

灰狼优化算法&#xff08;Grey Wolf Optimizer&#xff0c;简称 GWO&#xff09;&#xff0c;是一种群智能优化算法&#xff0c;由澳大利亚格里菲斯大学的 Mirjalii 等人于 2014 年提出。该算法的设计灵感源自灰狼群体的捕食行为&#xff0c;核心思想在于模拟灰狼社会的结构与行…

Unity 粒子特效在UI中使用裁剪效果

1.使用Sprite Mask 首先建立一个粒子特效在UI中显示 新建一个在场景下新建一个空物体&#xff0c;添加Sprite Mask组件&#xff0c;将其的Layer设置为UI相机渲染的UI层&#xff0c; 并将其添加到Canvas子物体中&#xff0c;调整好大小&#xff0c;并选择合适的Sprite&#xff…

【大厂AI实践】OPPO:大规模知识图谱及其在小布助手中的应用

导读&#xff1a;OPPO知识图谱是OPPO数智工程系统小布助手团队主导、多团队协作建设的自研大规模通用知识图谱&#xff0c;目前已达到数亿实体和数十亿三元组的规模&#xff0c;主要落地在小布助手知识问答、电商搜索等场景。 本文主要分享OPPO知识图谱建设过程中算法相关的技…

C# 添加、替换、提取、或删除Excel中的图片

在Excel中插入与数据相关的图片&#xff0c;能将关键数据或信息以更直观的方式呈现出来&#xff0c;使文档更加美观。此外&#xff0c;对于已有图片&#xff0c;你有事可能需要更新图片以确保信息的准确性&#xff0c;或者将Excel 中的图片单独保存&#xff0c;用于资料归档、备…

赛博算卦之周易六十四卦JAVA实现:六幺算尽天下事,梅花化解天下苦。

佬们过年好呀~新年第一篇博客让我们来场赛博算命吧&#xff01; 更多文章&#xff1a;个人主页 系列文章&#xff1a;JAVA专栏 欢迎各位大佬来访哦~互三必回&#xff01;&#xff01;&#xff01; 文章目录 #一、文化背景概述1.文化起源2.起卦步骤 #二、卦象解读#三、just do i…

iperf 测 TCP 和 UDP 网络吞吐量

注&#xff1a;本文为 “iperf 测网络吞吐量” 相关文章合辑。 未整理去重。 使用 iperf3 监测网络吞吐量 Tom 王 2019-12-21 22:23:52 一 iperf3 介绍 (1.1) iperf3 是一个网络带宽测试工具&#xff0c;iperf3 可以擦拭 TCP 和 UDP 带宽质量。iperf3 可以测量最大 TCP 带宽…

内外网文件摆渡企业常见应用场景和对应方案

在如今的企业环境中&#xff0c;内外网文件摆渡的需求越来越常见&#xff0c;也变得越来越重要。随着信息化的不断推进&#xff0c;企业内部和外部之间的数据交换越来越频繁&#xff0c;如何安全、高效地进行文件传输成了一个关键问题。今天&#xff0c;咱就来聊聊内外网文件摆…

论文阅读(十五):DNA甲基化水平分析的潜变量模型

1.论文链接&#xff1a;Latent Variable Models for Analyzing DNA Methylation 摘要&#xff1a; 脱氧核糖核酸&#xff08;DNA&#xff09;甲基化与细胞分化密切相关。例如&#xff0c;已经观察到肿瘤细胞中的DNA甲基化编码关于肿瘤的表型信息。因此&#xff0c;通过研究DNA…

Android View 的事件分发机制解析

前言&#xff1a;当一个事件发生时&#xff08;例如触摸屏幕&#xff09;&#xff0c;事件会从根View&#xff08;通常是Activity的布局中的最顶层View&#xff09;开始&#xff0c;通过一个特定的路径传递到具体的View&#xff0c;这个过程涉及到三个关键的阶段&#xff1a;事…