Python 爬虫技术常用于从网页上抓取数据,并将这些数据存储起来以供进一步分析或使用。数据的存储方式多种多样,常见的包括文件存储和数据库存储。下面我将通过一个简单的示例来介绍如何使用 Python 爬取数据,并将其存储为 CSV 和 JSON 文件格式,以及如何将数据存储到 SQL 和 NoSQL 数据库中。
示例场景:
假设我们要爬取一个网站上的产品信息,每个产品包含以下字段:product_name
, price
, rating
, description
。
步骤 1: 安装所需库
首先需要安装一些必要的 Python 库:
requests
用于发起 HTTP 请求BeautifulSoup
用于解析 HTML 文档pandas
用于数据处理sqlite3
或pymysql
(MySQL) 用于 SQL 数据库操作pymongo
用于 MongoDB (NoSQL)
可以通过 pip 安装:
pip install requests beautifulsoup4 pandas sqlite3 pymysql pymongo
步骤 2: 编写爬虫代码
2.1 爬取数据
import requests
from bs4 import BeautifulSoup
def fetch_data(url):
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
products = []
for product in soup.find_all('div', class_='product'):
name = product.find('h2').text.strip()
price = product.find('span', class_='price').text.strip()
rating = product.find('span', class_='rating').text.strip()
description = product.find('p', class_='description').text.strip()
products.append({
'product_name': name,
'price': price,
'rating': rating,
'description': description
})
return products
url = 'http://example.com/products'
products = fetch_data(url)
print(products)
2.2 存储数据到 CSV 文件
import pandas as pd
def save_to_csv(data, filename):
df = pd.DataFrame(data)
df.to_csv(filename, index=False)
save_to_csv(products, 'products.csv')
2.3 存储数据到 JSON 文件
import json
def save_to_json(data, filename):
with open(filename, 'w') as f:
json.dump(data, f, indent=4)
save_to_json(products, 'products.json')
步骤 3: 存储数据到数据库
3.1 存储数据到 SQL 数据库 (SQLite)
import sqlite3
def save_to_sqlite(data):
conn = sqlite3.connect('products.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS products
(product_name TEXT, price TEXT, rating TEXT, description TEXT)''')
placeholders = ', '.join(['?'] * len(data[0]))
columns = ', '.join(data[0].keys())
sql = f'INSERT INTO products ({columns}) VALUES ({placeholders})'
c.executemany(sql, [list(item.values()) for item in data])
conn.commit()
conn.close()
save_to_sqlite(products)
3.2 存储数据到 NoSQL 数据库 (MongoDB)
from pymongo import MongoClient
def save_to_mongodb(data):
client = MongoClient('mongodb://localhost:27017/')
db = client['products_db']
collection = db['products']
collection.insert_many(data)
client.close()
save_to_mongodb(products)
以上示例展示了如何使用 Python 抓取数据并将其存储到不同的数据存储方式中。你可以根据具体的需求选择合适的存储方式。如果需要在生产环境中使用,请确保遵守相关法律法规和网站的服务条款。
当然可以!我们可以进一步完善上面的代码,使其更加完整和实用。接下来,我们将添加异常处理、日志记录等功能,并提供完整的代码示例。
完整的代码示例
1. 导入必要的库
import requests
from bs4 import BeautifulSoup
import pandas as pd
import json
import sqlite3
from pymongo import MongoClient
import logging
2. 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
3. 网页数据抓取函数
def fetch_data(url):
try:
response = requests.get(url)
response.raise_for_status() # 检查响应状态码是否正常
soup = BeautifulSoup(response.text, 'html.parser')
products = []
for product in soup.find_all('div', class_='product'):
name = product.find('h2').text.strip()
price = product.find('span', class_='price').text.strip()
rating = product.find('span', class_='rating').text.strip()
description = product.find('p', class_='description').text.strip()
products.append({
'product_name': name,
'price': price,
'rating': rating,
'description': description
})
logging.info(f"Fetched {len(products)} products.")
return products
except Exception as e:
logging.error(f"Error fetching data: {e}")
return []
url = 'http://example.com/products'
products = fetch_data(url)
4. CSV 文件存储
def save_to_csv(data, filename):
try:
df = pd.DataFrame(data)
df.to_csv(filename, index=False)
logging.info(f"Data saved to {filename}.")
except Exception as e:
logging.error(f"Error saving to CSV: {e}")
save_to_csv(products, 'products.csv')
5. JSON 文件存储
def save_to_json(data, filename):
try:
with open(filename, 'w') as f:
json.dump(data, f, indent=4)
logging.info(f"Data saved to {filename}.")
except Exception as e:
logging.error(f"Error saving to JSON: {e}")
save_to_json(products, 'products.json')
6. SQLite 数据库存储
def save_to_sqlite(data):
try:
conn = sqlite3.connect('products.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS products
(product_name TEXT, price TEXT, rating TEXT, description TEXT)''')
placeholders = ', '.join(['?'] * len(data[0]))
columns = ', '.join(data[0].keys())
sql = f'INSERT INTO products ({columns}) VALUES ({placeholders})'
c.executemany(sql, [list(item.values()) for item in data])
conn.commit()
conn.close()
logging.info("Data saved to SQLite database.")
except Exception as e:
logging.error(f"Error saving to SQLite: {e}")
save_to_sqlite(products)
7. MongoDB 数据库存储
def save_to_mongodb(data):
try:
client = MongoClient('mongodb://localhost:27017/')
db = client['products_db']
collection = db['products']
collection.insert_many(data)
client.close()
logging.info("Data saved to MongoDB database.")
except Exception as e:
logging.error(f"Error saving to MongoDB: {e}")
save_to_mongodb(products)
完整的脚本
将上述所有部分组合成一个完整的脚本如下所示:
import requests
from bs4 import BeautifulSoup
import pandas as pd
import json
import sqlite3
from pymongo import MongoClient
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def fetch_data(url):
try:
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
products = []
for product in soup.find_all('div', class_='product'):
name = product.find('h2').text.strip()
price = product.find('span', class_='price').text.strip()
rating = product.find('span', class_='rating').text.strip()
description = product.find('p', class_='description').text.strip()
products.append({
'product_name': name,
'price': price,
'rating': rating,
'description': description
})
logging.info(f"Fetched {len(products)} products.")
return products
except Exception as e:
logging.error(f"Error fetching data: {e}")
return []
def save_to_csv(data, filename):
try:
df = pd.DataFrame(data)
df.to_csv(filename, index=False)
logging.info(f"Data saved to {filename}.")
except Exception as e:
logging.error(f"Error saving to CSV: {e}")
def save_to_json(data, filename):
try:
with open(filename, 'w') as f:
json.dump(data, f, indent=4)
logging.info(f"Data saved to {filename}.")
except Exception as e:
logging.error(f"Error saving to JSON: {e}")
def save_to_sqlite(data):
try:
conn = sqlite3.connect('products.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS products
(product_name TEXT, price TEXT, rating TEXT, description TEXT)''')
placeholders = ', '.join(['?'] * len(data[0]))
columns = ', '.join(data[0].keys())
sql = f'INSERT INTO products ({columns}) VALUES ({placeholders})'
c.executemany(sql, [list(item.values()) for item in data])
conn.commit()
conn.close()
logging.info("Data saved to SQLite database.")
except Exception as e:
logging.error(f"Error saving to SQLite: {e}")
def save_to_mongodb(data):
try:
client = MongoClient('mongodb://localhost:27017/')
db = client['products_db']
collection = db['products']
collection.insert_many(data)
client.close()
logging.info("Data saved to MongoDB database.")
except Exception as e:
logging.error(f"Error saving to MongoDB: {e}")
if __name__ == '__main__':
url = 'http://example.com/products'
products = fetch_data(url)
save_to_csv(products, 'products.csv')
save_to_json(products, 'products.json')
save_to_sqlite(products)
save_to_mongodb(products)
这个脚本包含了基本的数据抓取、存储到文件和数据库的功能,并且具有错误处理和日志记录机制,可以作为实际项目的基础。请注意,在实际部署时还需要考虑更详细的错误处理、性能优化等细节问题。
好的,我们可以在现有的基础上进一步完善代码,增加更多的功能和健壮性。以下是针对现有代码的一些改进点:
- 参数化 URL: 允许用户指定要抓取的 URL。
- 异常处理: 增加更详细的异常处理。
- 日志文件: 将日志输出到文件而不是控制台。
- 命令行参数: 使用命令行参数来控制程序行为。
- 数据验证: 对抓取的数据进行简单的验证。
- 数据库连接池: 使用连接池提高数据库操作效率。
下面是根据以上改进点修改后的代码:
import requests
from bs4 import BeautifulSoup
import pandas as pd
import json
import sqlite3
from pymongo import MongoClient
import logging
from argparse import ArgumentParser
from contextlib import closing
from urllib.parse import urlparse
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 创建命令行参数解析器
parser = ArgumentParser(description="Web scraper and data storage tool.")
parser.add_argument("url", help="The URL of the web page to scrape.")
parser.add_argument("--output-csv", help="Save data to a CSV file.", default=None)
parser.add_argument("--output-json", help="Save data to a JSON file.", default=None)
parser.add_argument("--output-sqlite", help="Save data to an SQLite database.", action="store_true")
parser.add_argument("--output-mongodb", help="Save data to a MongoDB database.", action="store_true")
# 网页数据抓取函数
def fetch_data(url):
try:
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
products = []
for product in soup.find_all('div', class_='product'):
name = product.find('h2').text.strip()
price = product.find('span', class_='price').text.strip()
rating = product.find('span', class_='rating').text.strip()
description = product.find('p', class_='description').text.strip()
products.append({
'product_name': name,
'price': price,
'rating': rating,
'description': description
})
logger.info(f"Fetched {len(products)} products.")
return products
except requests.exceptions.HTTPError as errh:
logger.error(f"HTTP Error: {errh}")
except requests.exceptions.ConnectionError as errc:
logger.error(f"Error Connecting: {errc}")
except requests.exceptions.Timeout as errt:
logger.error(f"Timeout Error: {errt}")
except requests.exceptions.RequestException as err:
logger.error(f"OOps: Something Else: {err}")
except Exception as e:
logger.error(f"Error fetching data: {e}")
return []
# CSV 文件存储
def save_to_csv(data, filename):
try:
df = pd.DataFrame(data)
df.to_csv(filename, index=False)
logger.info(f"Data saved to {filename}.")
except Exception as e:
logger.error(f"Error saving to CSV: {e}")
# JSON 文件存储
def save_to_json(data, filename):
try:
with open(filename, 'w') as f:
json.dump(data, f, indent=4)
logger.info(f"Data saved to {filename}.")
except Exception as e:
logger.error(f"Error saving to JSON: {e}")
# SQLite 数据库存储
def save_to_sqlite(data):
try:
with closing(sqlite3.connect('products.db')) as conn:
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS products
(product_name TEXT, price TEXT, rating TEXT, description TEXT)''')
placeholders = ', '.join(['?'] * len(data[0]))
columns = ', '.join(data[0].keys())
sql = f'INSERT INTO products ({columns}) VALUES ({placeholders})'
c.executemany(sql, [list(item.values()) for item in data])
conn.commit()
logger.info("Data saved to SQLite database.")
except Exception as e:
logger.error(f"Error saving to SQLite: {e}")
# MongoDB 数据库存储
def save_to_mongodb(data):
try:
client = MongoClient('mongodb://localhost:27017/')
db = client['products_db']
collection = db['products']
collection.insert_many(data)
client.close()
logger.info("Data saved to MongoDB database.")
except Exception as e:
logger.error(f"Error saving to MongoDB: {e}")
# 主程序入口
if __name__ == '__main__':
args = parser.parse_args()
url = args.url
if not url.startswith(('http://', 'https://')):
logger.error("Invalid URL format. Please ensure it starts with http:// or https://.")
exit(1)
products = fetch_data(url)
if args.output_csv:
save_to_csv(products, args.output_csv)
if args.output_json:
save_to_json(products, args.output_json)
if args.output_sqlite:
save_to_sqlite(products)
if args.output_mongodb:
save_to_mongodb(products)
代码解释
- 命令行参数: 使用
argparse
解析命令行参数,允许用户指定 URL 以及输出文件名或数据库。 - 日志: 使用标准的日志配置,可以方便地修改日志级别或输出位置。
- 异常处理: 添加了针对网络请求的异常处理,包括常见的 HTTP 错误、连接错误等。
- 数据验证: 在实际应用中,可能还需要对数据进行更严格的验证,例如检查数据是否符合预期格式。
- 数据库连接: 使用
with closing()
语句管理 SQLite 连接,确保连接正确关闭。 - URL 校验: 确保提供的 URL 是有效的 HTTP/HTTPS URL。
这个版本的代码提供了更多的灵活性和健壮性,可以根据具体需求进一步调整和完善。如果需要在生产环境中使用,还应考虑其他因素,如并发处理、更高级的错误恢复机制等。