Files
AIRegulations-Data/samr_qc_scraper.py

407 lines
15 KiB
Python
Raw Permalink Normal View History

2026-04-17 11:41:22 +08:00
"""
全国标准信息公共服务平台 - 行业标准(汽车)数据采集脚本
数据来源: https://std.samr.gov.cn/hb/hbQuery?initnode=QC%20%E6%B1%BD%E8%BD%A6
API: GET https://std.samr.gov.cn/hb/search/hbPage
功能:
1. 全量采集 QC 汽车行业标准数据 (共约 990 )
2. 采集字段: 标准号标准名称发布日期实施日期所属行业标准状态
标准性质标准类别归口单位发布文号CCS分类ICS分类制修定备案号
3. 支持关键词搜索
4. 支持断点续采
5. 导出为格式化的 Excel 文件 (含统计 Sheet)
用法:
python samr_qc_scraper.py # 全量采集 QC 汽车行业标准
python samr_qc_scraper.py --search "制动" # 搜索关键词
python samr_qc_scraper.py --industry "QC 汽车" # 指定行业 (默认 QC 汽车)
python samr_qc_scraper.py --resume # 断点续采
python samr_qc_scraper.py --page-size 50 # 每页50条
python samr_qc_scraper.py --output 自定义名称 # 自定义输出文件名
"""
import sys
import io
import os
import json
import time
import argparse
from datetime import datetime
import requests
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
from openpyxl.utils import get_column_letter
# ─── Windows 控制台中文输出修复 ─────────────────────────
if sys.platform == "win32":
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
# ─── 配置 ───────────────────────────────────────────────
API_URL = "https://std.samr.gov.cn/hb/search/hbPage"
DETAIL_URL = "https://std.samr.gov.cn/hb/search/stdHBDetailed?id={id}"
SOURCE_PAGE = "https://std.samr.gov.cn/hb/hbQuery?initnode=QC%20%E6%B1%BD%E8%BD%A6"
DEFAULT_PAGE_SIZE = 50
MAX_RETRIES = 5
RETRY_DELAY = 3
REQUEST_TIMEOUT = 30
CACHE_FILE = ".samr_qc_cache.json"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Referer": "https://std.samr.gov.cn/hb/hbQuery?initnode=QC%20%E6%B1%BD%E8%BD%A6",
"X-Requested-With": "XMLHttpRequest",
}
# 字段映射: API 字段 → 中文显示名
FIELD_MAP = {
"C_STD_CODE": "标准号",
"C_NAME": "标准名称",
"ISSUE_DATE": "发布日期",
"ACT_DATE": "实施日期",
"TRADE_DEPT": "所属行业",
"STATE": "标准状态",
"STD_NATURE": "标准性质",
"STD_CATEGORY": "标准类别",
"CHARGE_DEPT": "归口单位",
"NOTICE_NO": "发布文号",
"CCS": "CCS分类",
"ICS": "ICS分类",
"ICS_NAME1_1": "ICS分类名称",
"STD_ZXD": "制修定",
"RECORD_NO": "备案号",
"STD_LEVEL": "标准层级",
"STD_DOMAIN": "标准领域",
"id": "标准ID",
}
# Excel 输出列定义 (列名, 列宽)
OUTPUT_COLUMNS = [
("标准号", 20),
("标准名称", 52),
("发布日期", 13),
("实施日期", 13),
("所属行业", 12),
("标准状态", 12),
("标准性质", 10),
("标准类别", 12),
("归口单位", 20),
("发布文号", 18),
("CCS分类", 10),
("ICS分类", 10),
("ICS分类名称", 20),
("制修定", 8),
("备案号", 16),
("标准层级", 10),
("标准领域", 10),
("详情链接", 55),
]
# ─── 网络请求 ───────────────────────────────────────────
def fetch_page(session, page_num, page_size, industry="QC 汽车"):
"""请求单页数据"""
params = {
"op": industry,
"ISSUE_DATE": "",
"pageNumber": page_num,
"pageSize": page_size,
}
for attempt in range(1, MAX_RETRIES + 1):
try:
resp = session.get(API_URL, params=params, headers=HEADERS, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
data = json.loads(resp.content.decode("utf-8"))
return data
except Exception as e:
print(f" [!] 第 {page_num} 页请求失败 (第 {attempt}/{MAX_RETRIES} 次): {e}")
if attempt < MAX_RETRIES:
time.sleep(RETRY_DELAY * attempt)
return None
def normalize_record(record):
"""标准化单条记录"""
result = {}
for api_key, cn_name in FIELD_MAP.items():
val = record.get(api_key, "")
if isinstance(val, str):
val = val.strip()
result[cn_name] = val if val else ""
# 补充详情链接
std_id = record.get("id", "")
result["详情链接"] = DETAIL_URL.format(id=std_id) if std_id else ""
# 清理 ICS 分类名称 (格式: "43_道路车辆工程" → "道路车辆工程")
ics_name = result.get("ICS分类名称", "")
if "_" in ics_name:
result["ICS分类名称"] = ics_name.split("_")[-1] or ics_name
return result
# ─── 缓存 ───────────────────────────────────────────────
def load_cache():
if os.path.exists(CACHE_FILE):
try:
with open(CACHE_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
pass
return {"records": [], "last_page": 0, "industry": "", "total": 0}
def save_cache(cache):
with open(CACHE_FILE, "w", encoding="utf-8") as f:
json.dump(cache, f, ensure_ascii=False)
# ─── 主采集流程 ─────────────────────────────────────────
def scrape_all(page_size, industry="QC 汽车", search_keyword="", resume=False):
"""采集所有数据"""
session = requests.Session()
all_records = []
start_page = 1
total = 0
if resume:
cache = load_cache()
if cache["records"] and cache["industry"] == industry:
all_records = cache["records"]
start_page = cache["last_page"] + 1
total = cache["total"]
print(f" [*] 从缓存恢复: 已有 {len(all_records)} 条, 从第 {start_page} 页继续")
else:
print(" [*] 缓存不匹配, 从头开始采集")
# 首次请求, 获取总数
first_data = fetch_page(session, start_page, page_size, industry)
if not first_data:
print(" [!] 无法获取数据, 请检查网络")
return all_records
total = first_data.get("total", 0)
rows = first_data.get("rows") or []
total_pages = (total + page_size - 1) // page_size
records = [normalize_record(r) for r in rows]
all_records.extend(records)
print(f" 总计: {total} 条标准, 共 {total_pages} 页, 每页 {page_size}")
print(f" 已采集: {len(all_records)}/{total}", end="\r")
cache = {"records": all_records, "last_page": start_page, "industry": industry, "total": total}
save_cache(cache)
for page_num in range(start_page + 1, total_pages + 1):
data = fetch_page(session, page_num, page_size, industry)
if data is None:
print(f"\n [!] 第 {page_num} 页失败, 保存进度退出")
save_cache(cache)
break
rows = data.get("rows") or []
records = [normalize_record(r) for r in rows]
all_records.extend(records)
cache["records"] = all_records
cache["last_page"] = page_num
save_cache(cache)
print(f" 已采集: {len(all_records)}/{total} (第 {page_num}/{total_pages} 页)", end="\r")
time.sleep(0.3)
print()
return all_records
# ─── Excel 导出 ─────────────────────────────────────────
def export_to_excel(records, output_path):
wb = Workbook()
ws = wb.active
ws.title = "汽车行业标准"
# ── 样式定义 ──
hdr_font = Font(name="微软雅黑", bold=True, color="FFFFFF", size=11)
hdr_fill = PatternFill(start_color="2F5496", end_color="2F5496", fill_type="solid")
hdr_align = Alignment(horizontal="center", vertical="center", wrap_text=True)
dat_font = Font(name="微软雅黑", size=10)
dat_align = Alignment(vertical="center", wrap_text=True)
even_fill = PatternFill(start_color="D6E4F0", end_color="D6E4F0", fill_type="solid")
border = Border(
left=Side(style="thin", color="B4C6E7"),
right=Side(style="thin", color="B4C6E7"),
top=Side(style="thin", color="B4C6E7"),
bottom=Side(style="thin", color="B4C6E7"),
)
# ── 标题行 ──
col_names = [c[0] for c in OUTPUT_COLUMNS]
for ci, name in enumerate(col_names, 1):
cell = ws.cell(row=1, column=ci, value=name)
cell.font = hdr_font
cell.fill = hdr_fill
cell.alignment = hdr_align
cell.border = border
# ── 数据行 ──
for ri, rec in enumerate(records, 2):
for ci, (col_name, _) in enumerate(OUTPUT_COLUMNS, 1):
val = rec.get(col_name, "")
cell = ws.cell(row=ri, column=ci, value=val)
cell.font = dat_font
cell.alignment = dat_align
cell.border = border
if ri % 2 == 0:
cell.fill = even_fill
# ── 列宽 ──
for ci, (_, w) in enumerate(OUTPUT_COLUMNS, 1):
ws.column_dimensions[get_column_letter(ci)].width = w
# ── 冻结 & 筛选 ──
ws.freeze_panes = "A2"
ws.auto_filter.ref = f"A1:{get_column_letter(len(col_names))}{len(records) + 1}"
# ── 统计信息 Sheet ──
ws_stat = wb.create_sheet("统计信息")
status_count = {}
nature_count = {}
category_count = {}
zxd_count = {}
year_count = {}
for r in records:
for field, target in [
("标准状态", status_count),
("标准性质", nature_count),
("标准类别", category_count),
("制修定", zxd_count),
]:
v = r.get(field, "未知") or "未知"
target[v] = target.get(v, 0) + 1
issue = r.get("发布日期", "") or ""
year = issue[:4] if len(issue) >= 4 else "未知"
year_count[year] = year_count.get(year, 0) + 1
stat_rows = [
("采集时间", datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
("数据来源", "全国标准信息公共服务平台 (std.samr.gov.cn)"),
("所属行业", "QC 汽车"),
("标准总数", len(records)),
("", ""),
("── 标准状态分布 ──", ""),
("状态", "数量"),
]
for k, v in sorted(status_count.items(), key=lambda x: -x[1]):
stat_rows.append((k, v))
stat_rows += [("", ""), ("── 标准性质分布 ──", ""), ("性质", "数量")]
for k, v in sorted(nature_count.items(), key=lambda x: -x[1]):
stat_rows.append((k, v))
stat_rows += [("", ""), ("── 标准类别分布 ──", ""), ("类别", "数量")]
for k, v in sorted(category_count.items(), key=lambda x: -x[1]):
stat_rows.append((k, v))
stat_rows += [("", ""), ("── 制修定分布 ──", ""), ("类型", "数量")]
for k, v in sorted(zxd_count.items(), key=lambda x: -x[1]):
stat_rows.append((k, v))
stat_rows += [("", ""), ("── 按发布年份分布 ──", ""), ("年份", "数量")]
for y, c in sorted(year_count.items(), reverse=True):
stat_rows.append((y, c))
for ri, (a, b) in enumerate(stat_rows, 1):
ca = ws_stat.cell(row=ri, column=1, value=a)
cb = ws_stat.cell(row=ri, column=2, value=b)
if a.startswith("──"):
ca.font = Font(name="微软雅黑", bold=True, size=11)
else:
ca.font = Font(name="微软雅黑", size=10)
cb.font = Font(name="微软雅黑", size=10)
ws_stat.column_dimensions["A"].width = 28
ws_stat.column_dimensions["B"].width = 50
wb.save(output_path)
# ─── 入口 ───────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="全国标准信息公共服务平台 — 行业标准(汽车)数据采集工具",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
python samr_qc_scraper.py # 全量采集
python samr_qc_scraper.py --resume # 断点续采
python samr_qc_scraper.py --page-size 50 # 每页50条
python samr_qc_scraper.py --output QC汽车标准 # 自定义文件名
""",
)
parser.add_argument("--industry", "-i", default="QC 汽车", help="行业筛选 (默认: QC 汽车)")
parser.add_argument("--resume", "-r", action="store_true", help="断点续采")
parser.add_argument("--page-size", "-p", type=int, default=DEFAULT_PAGE_SIZE, help="每页条数 (默认50)")
parser.add_argument("--output", "-o", default=None, help="输出文件名 (不含扩展名)")
args = parser.parse_args()
if args.output:
output_name = args.output
else:
output_name = f"行业标准_QC汽车_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
output_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), f"{output_name}.xlsx")
print("=" * 60)
print(" 全国标准信息公共服务平台 — 行业标准数据采集工具")
print("=" * 60)
print(f" 数据来源: std.samr.gov.cn")
print(f" 所属行业: {args.industry}")
print(f" 每页条数: {args.page_size}")
print(f" 输出文件: {output_path}")
print("-" * 60)
start_time = time.time()
records = scrape_all(args.page_size, args.industry, resume=args.resume)
elapsed = time.time() - start_time
if not records:
print(" 未获取到任何数据")
sys.exit(1)
# 去重
seen = set()
unique = []
for r in records:
code = r.get("标准号", "")
if code not in seen:
seen.add(code)
unique.append(r)
dup = len(records) - len(unique)
print("-" * 60)
print(f" 采集完成! 用时 {elapsed:.1f}")
print(f" 获取 {len(records)} 条, 去重后 {len(unique)}", end="")
print(f" (移除 {dup} 条重复)" if dup else "")
print(" 生成 Excel 文件...")
export_to_excel(unique, output_path)
size = os.path.getsize(output_path) / 1024
print(f"\n {'=' * 50}")
print(f" 导出完成: {output_path}")
print(f" 文件大小: {size:.1f} KB")
print(f" 标准总数: {len(unique)}")
print(f" {'=' * 50}")
if not args.resume and os.path.exists(CACHE_FILE):
os.remove(CACHE_FILE)
if __name__ == "__main__":
main()