Files
AIRegulations-Data/catarc_iso_scraper.py

558 lines
20 KiB
Python
Raw Normal View History

2026-04-17 11:41:22 +08:00
"""
全国汽车标准化技术委员会 - ISO发布标准数据采集脚本
数据来源: https://www.catarc.org.cn/gjbzh/isoiec/iso/gzdt/fbbz/index.html
方式: 解析服务端渲染的 HTML 页面 ( API)
功能:
1. 自动发现并采集所有子页面中的 ISO 标准表格数据
2. 支持不同时期的表格列结构差异 (新旧格式自适应)
3. 采集: 序号所属机构文件号英文名称中文名称代替标准所属分技术委员会
4. 自动补充: 发布批次发布日期来源页面
5. 支持断点续采
6. 导出为格式化的 Excel 文件 (含统计 Sheet)
用法:
python catarc_iso_scraper.py # 全量采集
python catarc_iso_scraper.py --resume # 断点续采
python catarc_iso_scraper.py --output result # 自定义输出文件名
python catarc_iso_scraper.py --delay 1.0 # 自定义请求间隔(秒)
"""
import sys
import io
import os
import re
import json
import time
import argparse
from datetime import datetime
from html.parser import HTMLParser
from urllib.parse import urljoin
import requests
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
from openpyxl.utils import get_column_letter
# ─── Windows 控制台中文输出修复 ─────────────────────────
if sys.platform == "win32":
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
# ─── 配置 ───────────────────────────────────────────────
BASE_URL = "https://www.catarc.org.cn"
LIST_PAGE_PATTERN = "/gjbzh/isoiec/iso/gzdt/fbbz/index{page}.html"
CACHE_FILE = ".catarc_iso_cache.json"
MAX_RETRIES = 3
DEFAULT_DELAY = 0.5 # 秒
REQUEST_TIMEOUT = 30
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
}
# 已知的表头关键词 → 标准字段名映射
HEADER_KEYWORDS = {
"序号": "序号",
"所属机构": "所属机构",
"文件号": "文件号",
"标准号": "文件号",
"英文名称": "英文名称",
"中文名称": "中文名称",
"所属分技术委员会": "所属分技术委员会",
"分技术委员会": "所属分技术委员会",
"代替": "代替标准",
"代替标准": "代替标准",
}
# Excel 列定义 (输出顺序)
OUTPUT_COLUMNS = [
("序号", 8),
("所属机构", 28),
("文件号", 25),
("英文名称", 60),
("中文名称", 50),
("所属分技术委员会", 18),
("代替标准", 22),
("发布批次", 22),
("发布日期", 14),
("来源页面", 55),
]
# ─── HTML 解析器 ─────────────────────────────────────────
class TableParser(HTMLParser):
"""从 HTML 中提取 <table> 数据, 支持跨 <tr>/<td> 解析"""
def __init__(self):
super().__init__()
self.tables = [] # list of list of list of str
self._current_table = []
self._current_row = []
self._current_cell = []
self._in_table = 0
self._in_td = False
self._skip = False
def handle_starttag(self, tag, attrs):
tag = tag.lower()
if tag == "table":
self._in_table += 1
if self._in_table == 1:
self._current_table = []
elif tag == "tr" and self._in_table == 1:
self._current_row = []
elif tag in ("td", "th") and self._in_table == 1:
self._in_td = True
self._current_cell = []
elif tag in ("script", "style"):
self._skip = True
def handle_endtag(self, tag):
tag = tag.lower()
if tag == "table":
if self._in_table == 1 and self._current_table:
self.tables.append(self._current_table)
self._in_table = max(0, self._in_table - 1)
elif tag == "tr" and self._in_table == 1:
if self._current_row:
self._current_table.append(self._current_row)
elif tag in ("td", "th") and self._in_table == 1:
self._in_td = False
cell_text = " ".join("".join(self._current_cell).split()).strip()
self._current_row.append(cell_text)
elif tag in ("script", "style"):
self._skip = False
def handle_data(self, data):
if self._skip:
return
if self._in_td:
self._current_cell.append(data)
elif self._in_table == 1:
# 处理 <td> 中没有子标签包裹的文本
pass
def handle_entityref(self, name):
char_map = {"nbsp": " ", "amp": "&", "lt": "<", "gt": ">", "mdash": "", "ndash": ""}
if self._in_td:
self._current_cell.append(char_map.get(name, f"&{name};"))
def parse_html_tables(html_text):
"""解析 HTML 文本, 返回所有表格数据"""
parser = TableParser()
try:
parser.feed(html_text)
except Exception:
pass
return parser.tables
def normalize_headers(headers):
"""将表头文本映射为标准字段名, 返回字段名列表"""
result = []
for h in headers:
mapped = None
for keyword, field in HEADER_KEYWORDS.items():
if keyword in h:
mapped = field
break
result.append(mapped if mapped else h.strip())
return result
def extract_records_from_table(table):
"""从解析后的表格中提取记录列表, 返回 (records, headers)"""
if len(table) < 2:
return [], []
# 第一行是表头
raw_headers = table[0]
headers = normalize_headers(raw_headers)
records = []
for row in table[1:]:
if len(row) < 2:
continue
# 跳过空行或重复表头
first_cell = row[0].strip()
if not first_cell or first_cell in ("序号", "所属机构", "文件号"):
continue
record = {}
for i, val in enumerate(row):
if i < len(headers):
record[headers[i]] = val
else:
record[f"col_{i}"] = val
records.append(record)
return records, headers
# ─── 网络请求 ───────────────────────────────────────────
def fetch_page(session, url):
"""请求页面, 返回 UTF-8 编码的 HTML 文本"""
for attempt in range(1, MAX_RETRIES + 1):
try:
resp = session.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
# 显式用 utf-8 解码, 避免 requests 自动检测编码错误
return resp.content.decode("utf-8")
except requests.exceptions.RequestException as e:
print(f" [!] 请求失败 (第 {attempt}/{MAX_RETRIES} 次): {e}")
if attempt < MAX_RETRIES:
time.sleep(2 * attempt)
return None
# ─── 列表页解析 ─────────────────────────────────────────
def discover_subpages(session):
"""从列表页发现所有子页面, 返回 [(url, title, date), ...]"""
subpages = []
page_num = 1
while True:
if page_num == 1:
page_suffix = ""
else:
page_suffix = f"_{page_num}"
url = BASE_URL + LIST_PAGE_PATTERN.format(page=page_suffix)
html = fetch_page(session, url)
if not html:
break
# 提取子页面链接: <li><a href="/gjbzh/.../fbbz/XXXX.html"><span>date</span>title</a></li>
pattern = r'href="(/gjbzh/isoiec/iso/gzdt/fbbz/[^"]+\.html)"'
links = re.findall(pattern, html)
if not links:
break
for link in links:
full_url = BASE_URL + link
# 从链接上下文提取日期和标题
escaped = re.escape(link)
ctx_pattern = rf'<a\s+href="{escaped}">\s*<span>(\d{{4}}-\d{{2}}-\d{{2}})</span>([^<]+)</a>'
ctx_match = re.search(ctx_pattern, html)
date = ctx_match.group(1) if ctx_match else ""
title = ctx_match.group(2).strip() if ctx_match else os.path.basename(link)
subpages.append((full_url, title, date))
# 检查是否有下一页
if f'index_{page_num + 1}.html' in html or f'index_{page_num + 1}"' in html:
page_num += 1
else:
# 也检查下一页链接
if f'>下一页<' in html and len(links) > 0:
page_num += 1
else:
break
return subpages
# ─── 详情页解析 ─────────────────────────────────────────
def scrape_detail_page(session, url, title, date):
"""采集单个详情页, 返回记录列表"""
html = fetch_page(session, url)
if not html:
return []
tables = parse_html_tables(html)
if not tables:
# 可能表格在 iframe 或特殊容器中, 尝试用正则直接提取
return _fallback_parse(html, title, date)
# 找最大的表格 (通常是标准数据表)
best_table = max(tables, key=len)
records, headers = extract_records_from_table(best_table)
# 补充元数据
for r in records:
r["发布批次"] = title
r["发布日期"] = date
r["来源页面"] = url
# 确保 "所属机构" 字段存在 (旧格式可能只有 "所属分技术委员会")
if "所属机构" not in r or not r.get("所属机构"):
r["所属机构"] = r.get("所属分技术委员会", "")
return records
def _fallback_parse(html, title, date):
"""备用: 正则解析表格行"""
records = []
rows = re.findall(r'<tr[^>]*>(.*?)</tr>', html, re.S)
if len(rows) < 2:
return []
# 解析表头
header_cells = re.findall(r'<t[dh][^>]*>(.*?)</t[dh]>', rows[0], re.S)
headers = normalize_headers([re.sub(r'<[^>]+>', ' ', c).strip() for c in header_cells])
for row in rows[1:]:
cells = re.findall(r'<td[^>]*>(.*?)</td>', row, re.S)
if not cells or len(cells) < 2:
continue
cells_clean = [re.sub(r'<[^>]+>', ' ', c).strip() for c in cells]
record = {}
for i, val in enumerate(cells_clean):
if i < len(headers):
record[headers[i]] = val
record["发布批次"] = title
record["发布日期"] = date
records.append(record)
return records
# ─── 缓存管理 ───────────────────────────────────────────
def load_cache():
if os.path.exists(CACHE_FILE):
try:
with open(CACHE_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
pass
return {"records": [], "done_urls": [], "total_pages": 0}
def save_cache(cache):
with open(CACHE_FILE, "w", encoding="utf-8") as f:
json.dump(cache, f, ensure_ascii=False)
# ─── Excel 导出 ─────────────────────────────────────────
def export_to_excel(records, output_path):
wb = Workbook()
ws = wb.active
ws.title = "ISO发布标准"
# ── 标题行 ──
header_font = Font(name="微软雅黑", bold=True, color="FFFFFF", size=11)
header_fill = PatternFill(start_color="2F5496", end_color="2F5496", fill_type="solid")
header_align = Alignment(horizontal="center", vertical="center", wrap_text=True)
thin_border = Border(
left=Side(style="thin", color="B4C6E7"),
right=Side(style="thin", color="B4C6E7"),
top=Side(style="thin", color="B4C6E7"),
bottom=Side(style="thin", color="B4C6E7"),
)
col_names = [col[0] for col in OUTPUT_COLUMNS]
for col_idx, name in enumerate(col_names, 1):
cell = ws.cell(row=1, column=col_idx, value=name)
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_align
cell.border = thin_border
# ── 数据行 ──
data_font = Font(name="微软雅黑", size=10)
data_align = Alignment(vertical="center", wrap_text=True)
even_fill = PatternFill(start_color="D6E4F0", end_color="D6E4F0", fill_type="solid")
for row_idx, record in enumerate(records, 2):
for col_idx, (col_name, _) in enumerate(OUTPUT_COLUMNS, 1):
val = record.get(col_name, "")
if col_name == "序号":
try:
val = int(val)
except (ValueError, TypeError):
pass
cell = ws.cell(row=row_idx, column=col_idx, value=val)
cell.font = data_font
cell.alignment = data_align
cell.border = thin_border
if row_idx % 2 == 0:
cell.fill = even_fill
# ── 列宽 ──
for col_idx, (name, width) in enumerate(OUTPUT_COLUMNS, 1):
ws.column_dimensions[get_column_letter(col_idx)].width = width
# ── 冻结 & 筛选 ──
ws.freeze_panes = "A2"
ws.auto_filter.ref = f"A1:{get_column_letter(len(col_names))}{len(records) + 1}"
# ── 统计信息 Sheet ──
ws_stat = wb.create_sheet("统计信息")
# 所属机构分布
org_count = {}
batch_count = {}
year_count = {}
tc_count = {}
for r in records:
org = r.get("所属机构", "未知") or "未知"
org_count[org] = org_count.get(org, 0) + 1
batch = r.get("发布批次", "未知") or "未知"
batch_count[batch] = batch_count.get(batch, 0) + 1
pub_date = r.get("发布日期", "") or ""
year = pub_date[:4] if pub_date else "未知"
year_count[year] = year_count.get(year, 0) + 1
tc = r.get("所属分技术委员会", "") or ""
if tc:
tc_count[tc] = tc_count.get(tc, 0) + 1
stat_rows = [
("采集时间", datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
("数据来源", "全国汽车标准化技术委员会 — ISO发布标准"),
("来源网址", "https://www.catarc.org.cn/gjbzh/isoiec/iso/gzdt/fbbz/index.html"),
("标准总数", len(records)),
("子页面数", len(batch_count)),
("", ""),
("── 所属机构分布 (Top 30) ──", ""),
("所属机构", "标准数量"),
]
for org, cnt in sorted(org_count.items(), key=lambda x: -x[1])[:30]:
stat_rows.append((org, cnt))
stat_rows.append(("", ""))
stat_rows.append(("── 按发布年份分布 ──", ""))
stat_rows.append(("年份", "标准数量"))
for y, cnt in sorted(year_count.items(), reverse=True):
stat_rows.append((y, cnt))
stat_rows.append(("", ""))
stat_rows.append(("── 按发布批次分布 ──", ""))
stat_rows.append(("批次", "标准数量"))
for b, cnt in sorted(batch_count.items(), key=lambda x: -x[1]):
stat_rows.append((b, cnt))
if tc_count:
stat_rows.append(("", ""))
stat_rows.append(("── 分技术委员会分布 (Top 20) ──", ""))
stat_rows.append(("分技术委员会", "标准数量"))
for tc, cnt in sorted(tc_count.items(), key=lambda x: -x[1])[:20]:
stat_rows.append((tc, cnt))
for row_idx, (a, b) in enumerate(stat_rows, 1):
cell_a = ws_stat.cell(row=row_idx, column=1, value=a)
cell_b = ws_stat.cell(row=row_idx, column=2, value=b)
if a.startswith("──"):
cell_a.font = Font(name="微软雅黑", bold=True, size=11)
else:
cell_a.font = Font(name="微软雅黑", size=10)
cell_b.font = Font(name="微软雅黑", size=10)
ws_stat.column_dimensions["A"].width = 50
ws_stat.column_dimensions["B"].width = 20
wb.save(output_path)
# ─── 主流程 ─────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="全国汽车标准化技术委员会 — ISO发布标准数据采集工具",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
python catarc_iso_scraper.py # 全量采集
python catarc_iso_scraper.py --resume # 断点续采
python catarc_iso_scraper.py --delay 1.0 # 每次请求间隔1秒
python catarc_iso_scraper.py --output ISO标准 # 自定义输出文件名
""",
)
parser.add_argument("--resume", "-r", action="store_true", help="从上次中断处继续采集")
parser.add_argument("--delay", "-d", type=float, default=DEFAULT_DELAY, help="请求间隔秒数 (默认0.5)")
parser.add_argument("--output", "-o", default=None, help="输出文件名 (不含扩展名)")
args = parser.parse_args()
if args.output:
output_name = args.output
else:
output_name = f"ISO发布标准_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
output_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), f"{output_name}.xlsx")
print("=" * 60)
print(" 全国汽车标准化技术委员会 — ISO发布标准数据采集工具")
print("=" * 60)
print(f" 数据来源: catarc.org.cn/gjbzh/isoiec/iso/gzdt/fbbz/")
print(f" 请求间隔: {args.delay}s")
print(f" 输出文件: {output_path}")
print("-" * 60)
session = requests.Session()
all_records = []
done_urls = set()
# 断点续采
if args.resume:
cache = load_cache()
if cache["records"]:
all_records = cache["records"]
done_urls = set(cache["done_urls"])
print(f" [*] 从缓存恢复: 已有 {len(all_records)} 条, {len(done_urls)} 个子页面已完成")
# Step 1: 发现子页面
print(" [1/3] 发现子页面...")
subpages = discover_subpages(session)
print(f" 共发现 {len(subpages)} 个子页面")
if not subpages:
print(" [!] 未发现任何子页面, 请检查网络")
sys.exit(1)
# Step 2: 逐个采集
print(f" [2/3] 采集标准数据...")
start_time = time.time()
for idx, (url, title, date) in enumerate(subpages, 1):
if url in done_urls:
continue
records = scrape_detail_page(session, url, title, date)
all_records.extend(records)
done_urls.add(url)
print(f" [{idx}/{len(subpages)}] {title} — 获取 {len(records)} 条标准", end="\r")
# 保存进度
cache = {"records": all_records, "done_urls": list(done_urls), "total_pages": len(subpages)}
save_cache(cache)
time.sleep(args.delay)
elapsed = time.time() - start_time
print()
print(f" 采集完成! 用时 {elapsed:.1f} 秒, 共获取 {len(all_records)} 条标准")
if not all_records:
print(" [!] 未获取到任何数据")
sys.exit(1)
# Step 3: 导出 Excel
print(f" [3/3] 生成 Excel 文件...")
export_to_excel(all_records, output_path)
file_size = os.path.getsize(output_path) / 1024
print()
print(f" {'=' * 50}")
print(f" 导出完成: {output_path}")
print(f" 文件大小: {file_size:.1f} KB")
print(f" 标准总数: {len(all_records)}")
print(f" 子页面数: {len(subpages)}")
print(f" {'=' * 50}")
# 清理缓存
if not args.resume and os.path.exists(CACHE_FILE):
os.remove(CACHE_FILE)
if __name__ == "__main__":
main()