Files
AIRegulations-Data/openstd_downloader.py

439 lines
16 KiB
Python
Raw Permalink Normal View History

2026-04-17 11:41:22 +08:00
"""
国家标准全文公开系统 PDF 批量下载工具
数据来源: https://openstd.samr.gov.cn/bzgk/std/std_list_type
下载地址: http://c.gb688.cn/bzgk/gb/viewGb
功能:
1. 按关键词搜索国家标准 ( "" 可匹配所有车辆相关标准)
2. 自动识别验证码 (ddddocr) 并下载 PDF 全文
3. 支持筛选: 强制性/推荐性/指导性/全部, 现行/即将实施/废止
4. 文件命名: "标准号 标准名称.pdf" ( "GB 34660-2026 道路车辆 电磁兼容性要求和试验方法.pdf")
5. 断点续传: 已下载的文件自动跳过
6. 导出标准元数据 Excel
用法:
python openstd_downloader.py # 下载"车"相关强制性国家标准
python openstd_downloader.py --keyword "制动" # 搜索关键词
python openstd_downloader.py --type 2 # 推荐性国家标准
python openstd_downloader.py --page-size 50 # 每页50条
python openstd_downloader.py --output-dir ./GB_Doc # 自定义下载目录
python openstd_downloader.py --status "现行" # 只下载现行标准
python openstd_downloader.py --no-download # 仅采集元数据, 不下载PDF
依赖:
pip install requests ddddocr openpyxl
"""
import sys
import io
import os
import re
import json
import time
import argparse
from datetime import datetime
import requests
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
from openpyxl.utils import get_column_letter
# ─── Windows 控制台中文输出修复 ─────────────────────────
if sys.platform == "win32":
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
# ─── 配置 ───────────────────────────────────────────────
LIST_URL = "https://openstd.samr.gov.cn/bzgk/std/std_list_type"
DOWNLOAD_INIT_URL = "http://c.gb688.cn/bzgk/gb/showGb?type=download&hcno={hcno}"
CAPTCHA_URL = "http://c.gb688.cn/bzgk/gb/gc?_{ts}"
VERIFY_URL = "http://c.gb688.cn/bzgk/gb/verifyCode"
PDF_URL = "http://c.gb688.cn/bzgk/gb/viewGb?hcno={hcno}"
MAX_CAPTCHA_RETRIES = 8
REQUEST_TIMEOUT = 30
CACHE_FILE = ".openstd_cache.json"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
}
# 标准类型参数 p.p1
STD_TYPES = {
"强制性": "1",
"推荐性": "2",
"指导性": "3",
"全部": "",
}
# 输出列定义
OUTPUT_COLUMNS = [
("标准号", 20),
("标准名称", 50),
("标准状态", 10),
("发布日期", 14),
("实施日期", 14),
("是否采标", 10),
("hcno", 35),
("文件名", 60),
("下载状态", 10),
]
# ─── 列表页解析 ─────────────────────────────────────────
def fetch_list_page(session, keyword, std_type, page_num, page_size):
"""请求列表页, 返回 HTML"""
params = {
"p.p1": std_type,
"p.p2": keyword,
"p.p90": "circulation_date",
"p.p91": "desc",
}
if page_num > 1:
params["page"] = page_num
params["pageSize"] = page_size
for attempt in range(3):
try:
resp = session.get(LIST_URL, params=params, headers=HEADERS, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
return resp.content.decode("utf-8")
except Exception as e:
print(f" [!] 列表页请求失败 (第 {attempt+1} 次): {e}")
time.sleep(2)
return None
def parse_list_page(html):
"""解析列表页 HTML, 返回标准列表和总数"""
# 提取每个标准的 hcno (去重, 每个标准有3个 onclick)
hcnos = list(dict.fromkeys(re.findall(r"showInfo\('([A-F0-9]{32})'\)", html)))
# 提取总数: 现行(216) 即将实施(36) 废止(408)
status_counts = re.findall(r'现行\((\d+)\).*?即将实施\((\d+)\).*?废止\((\d+)\)', html, re.S)
total = 0
if status_counts:
total = sum(int(x) for x in status_counts[0])
# 提取表格数据
rows = re.findall(r'<tr[^>]*>(.*?)</tr>', html, re.S)
standards = []
for row in rows:
cells = re.findall(r'<td[^>]*>(.*?)</td>', row, re.S)
if len(cells) < 6:
continue
first_cell = re.sub(r'<[^>]+>', '', cells[0]).strip()
if not first_cell.isdigit():
continue
std_code = re.sub(r'<[^>]+>', '', cells[1]).strip()
std_name_raw = cells[3]
std_name = re.sub(r'<[^>]+>', '', std_name_raw).strip()
std_status = re.sub(r'<[^>]+>', '', cells[4]).strip()
issue_date = re.sub(r'<[^>]+>', '', cells[5]).strip()
act_date = re.sub(r'<[^>]+>', '', cells[6]).strip() if len(cells) > 6 else ""
# 从 onclick 提取 hcno
hcno_m = re.search(r"showInfo\('([A-F0-9]{32})'\)", cells[1])
hcno = hcno_m.group(1) if hcno_m else ""
# 采标信息
adopted = re.sub(r'<[^>]+>', '', cells[2]).strip() if len(cells) > 2 else ""
standards.append({
"标准号": std_code,
"标准名称": std_name,
"标准状态": std_status,
"发布日期": issue_date[:10] if issue_date else "",
"实施日期": act_date[:10] if act_date else "",
"是否采标": adopted,
"hcno": hcno,
})
return standards, total
def collect_all_standards(keyword, std_type, page_size, status_filter=""):
"""采集所有标准列表"""
session = requests.Session()
all_standards = []
html = fetch_list_page(session, keyword, std_type, 1, page_size)
if not html:
return all_standards
standards, total = parse_list_page(html)
all_standards.extend(standards)
total_pages = (total + page_size - 1) // page_size if total > 0 else 1
print(f" 总计: {total} 条标准, {total_pages}")
for page_num in range(2, total_pages + 1):
html = fetch_list_page(session, keyword, std_type, page_num, page_size)
if not html:
break
standards, _ = parse_list_page(html)
if not standards:
break
all_standards.extend(standards)
print(f" 已采集: {len(all_standards)}/{total} (第 {page_num}/{total_pages} 页)", end="\r")
time.sleep(0.3)
print()
# 状态筛选
if status_filter:
all_standards = [s for s in all_standards if status_filter in s.get("标准状态", "")]
print(f" 筛选 [{status_filter}]: {len(all_standards)}")
return all_standards
# ─── PDF 下载 ───────────────────────────────────────────
def download_pdf(hcno, save_path, max_retries=3):
"""下载单个标准 PDF, 自动识别验证码"""
import ddddocr
ocr = ddddocr.DdddOcr(show_ad=False)
for retry in range(max_retries):
s = requests.Session()
s.headers.update(HEADERS)
try:
# Step 1: 访问下载页获取 session
s.get(DOWNLOAD_INIT_URL.format(hcno=hcno), timeout=REQUEST_TIMEOUT)
# Step 2: 获取验证码 + OCR
verified = False
for captcha_attempt in range(MAX_CAPTCHA_RETRIES):
r = s.get(CAPTCHA_URL.format(ts=int(time.time() * 1000)), timeout=REQUEST_TIMEOUT)
if len(r.content) < 100:
time.sleep(1)
continue
code = ocr.classification(r.content)
# Step 3: 验证验证码
vr = s.post(VERIFY_URL, data={"verifyCode": code}, timeout=REQUEST_TIMEOUT)
if vr.text.strip() == "success":
verified = True
break
if not verified:
if retry < max_retries - 1:
print(f"验证码失败,重试({retry+1})")
continue
# Step 4: 下载 PDF
dr = s.get(PDF_URL.format(hcno=hcno), timeout=60)
if len(dr.content) > 1000:
with open(save_path, "wb") as f:
f.write(dr.content)
return True, len(dr.content)
else:
# PDF 为空可能意味着该标准暂未提供全文, 不再重试
return False, -1
except Exception as e:
print(f" [!] 下载异常: {e}, 重试 ({retry+1}/{max_retries})")
time.sleep(2)
return False, 0
def sanitize_filename(name):
"""清理文件名中的非法字符"""
return re.sub(r'[\\/:*?"<>|]', ' ', name).strip()
# ─── Excel 导出 ─────────────────────────────────────────
def export_to_excel(records, output_path):
wb = Workbook()
ws = wb.active
ws.title = "国家标准清单"
hdr_font = Font(name="微软雅黑", bold=True, color="FFFFFF", size=11)
hdr_fill = PatternFill(start_color="2F5496", end_color="2F5496", fill_type="solid")
hdr_align = Alignment(horizontal="center", vertical="center", wrap_text=True)
dat_font = Font(name="微软雅黑", size=10)
dat_align = Alignment(vertical="center", wrap_text=True)
even_fill = PatternFill(start_color="D6E4F0", end_color="D6E4F0", fill_type="solid")
border = Border(
left=Side(style="thin", color="B4C6E7"),
right=Side(style="thin", color="B4C6E7"),
top=Side(style="thin", color="B4C6E7"),
bottom=Side(style="thin", color="B4C6E7"),
)
col_names = [c[0] for c in OUTPUT_COLUMNS]
for ci, name in enumerate(col_names, 1):
cell = ws.cell(row=1, column=ci, value=name)
cell.font = hdr_font
cell.fill = hdr_fill
cell.alignment = hdr_align
cell.border = border
for ri, rec in enumerate(records, 2):
for ci, (col_name, _) in enumerate(OUTPUT_COLUMNS, 1):
val = rec.get(col_name, "")
cell = ws.cell(row=ri, column=ci, value=val)
cell.font = dat_font
cell.alignment = dat_align
cell.border = border
if ri % 2 == 0:
cell.fill = even_fill
for ci, (_, w) in enumerate(OUTPUT_COLUMNS, 1):
ws.column_dimensions[get_column_letter(ci)].width = w
ws.freeze_panes = "A2"
ws.auto_filter.ref = f"A1:{get_column_letter(len(col_names))}{len(records) + 1}"
wb.save(output_path)
# ─── 缓存 ───────────────────────────────────────────────
def load_cache():
if os.path.exists(CACHE_FILE):
try:
with open(CACHE_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
pass
return {"downloaded_hcnos": [], "records": []}
def save_cache(cache):
with open(CACHE_FILE, "w", encoding="utf-8") as f:
json.dump(cache, f, ensure_ascii=False)
# ─── 主流程 ─────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="国家标准全文公开系统 — PDF 批量下载工具",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
python openstd_downloader.py # 下载"车"相关强制性国家标准
python openstd_downloader.py --keyword "制动" # 搜索关键词
python openstd_downloader.py --type 2 # 推荐性国家标准
python openstd_downloader.py --status "现行" # 只下载现行标准
python openstd_downloader.py --no-download # 仅采集元数据, 不下载PDF
""",
)
parser.add_argument("--keyword", "-k", default="", help="搜索关键词 (默认: 车)")
parser.add_argument("--type", "-t", default="强制性",
choices=["强制性", "推荐性", "指导性", "全部"], help="标准类型 (默认: 强制性)")
parser.add_argument("--status", "-s", default="", help="状态筛选: 现行/即将实施/废止 (默认: 全部)")
parser.add_argument("--page-size", "-p", type=int, default=50, help="每页条数 (默认50)")
parser.add_argument("--output-dir", "-o", default="GB_Doc", help="PDF下载目录 (默认: GB_Doc)")
parser.add_argument("--no-download", action="store_true", help="仅采集元数据, 不下载PDF")
parser.add_argument("--max-count", "-n", type=int, default=0, help="最大下载数量 (0=全部)")
args = parser.parse_args()
std_type = STD_TYPES.get(args.type, "1")
os.makedirs(args.output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
excel_path = os.path.join(args.output_dir, f"国家标准清单_{args.keyword}_{timestamp}.xlsx")
print("=" * 60)
print(" 国家标准全文公开系统 — PDF 批量下载工具")
print("=" * 60)
print(f" 关键词: {args.keyword}")
print(f" 类型: {args.type}")
print(f" 状态: {args.status or '全部'}")
print(f" 下载目录: {args.output_dir}/")
print(f" 下载PDF: {'' if args.no_download else ''}")
print("-" * 60)
# Step 1: 采集标准列表
print(" [1/2] 采集标准列表...")
standards = collect_all_standards(args.keyword, std_type, args.page_size, args.status)
if not standards:
print(" 未找到任何标准")
sys.exit(1)
print(f"{len(standards)} 条标准")
# 限制数量
if args.max_count > 0:
standards = standards[:args.max_count]
print(f" 限制下载前 {args.max_count}")
# Step 2: 下载 PDF
cache = load_cache()
downloaded_hcnos = set(cache.get("downloaded_hcnos", []))
if not args.no_download:
print(f"\n [2/2] 下载 PDF 文件...")
success_count = 0
skip_count = 0
fail_count = 0
for idx, std in enumerate(standards, 1):
hcno = std.get("hcno", "")
code = std.get("标准号", "")
name = std.get("标准名称", "")
filename = sanitize_filename(f"{code} {name}.pdf")
filepath = os.path.join(args.output_dir, filename)
# 跳过已下载
if hcno in downloaded_hcnos or os.path.exists(filepath):
std["下载状态"] = "已存在"
std["文件名"] = filename
skip_count += 1
continue
print(f" [{idx}/{len(standards)}] {code} {name[:30]}...", end=" ")
ok, size = download_pdf(hcno, filepath)
if ok:
std["下载状态"] = "成功"
std["文件名"] = filename
downloaded_hcnos.add(hcno)
success_count += 1
print(f"OK ({size/1024:.0f} KB)")
else:
std["下载状态"] = "失败"
std["文件名"] = ""
fail_count += 1
print("FAILED")
# 保存进度
cache["downloaded_hcnos"] = list(downloaded_hcnos)
save_cache(cache)
# 礼貌延迟
time.sleep(1)
print(f"\n 下载完成: 成功 {success_count}, 跳过 {skip_count}, 失败 {fail_count}")
else:
for std in standards:
std["下载状态"] = "跳过"
std["文件名"] = ""
print("\n [2/2] 跳过下载 (--no-download)")
# 导出 Excel
export_to_excel(standards, excel_path)
print(f"\n 元数据已导出: {excel_path}")
# 统计
print(f"\n {'=' * 50}")
print(f" 总计: {len(standards)} 条标准")
print(f" Excel: {excel_path}")
if not args.no_download:
print(f" PDF目录: {args.output_dir}/")
# 列出已下载文件
pdfs = [f for f in os.listdir(args.output_dir) if f.endswith('.pdf')]
print(f" PDF文件数: {len(pdfs)}")
print(f" {'=' * 50}")
if __name__ == "__main__":
main()