""" 国家标准全文公开系统 — PDF 批量下载工具 数据来源: https://openstd.samr.gov.cn/bzgk/std/std_list_type 下载地址: http://c.gb688.cn/bzgk/gb/viewGb 功能: 1. 按关键词搜索国家标准 (如 "车" 可匹配所有车辆相关标准) 2. 自动识别验证码 (ddddocr) 并下载 PDF 全文 3. 支持筛选: 强制性/推荐性/指导性/全部, 现行/即将实施/废止 4. 文件命名: "标准号 标准名称.pdf" (如 "GB 34660-2026 道路车辆 电磁兼容性要求和试验方法.pdf") 5. 断点续传: 已下载的文件自动跳过 6. 导出标准元数据 Excel 用法: python openstd_downloader.py # 下载"车"相关强制性国家标准 python openstd_downloader.py --keyword "制动" # 搜索关键词 python openstd_downloader.py --type 2 # 推荐性国家标准 python openstd_downloader.py --page-size 50 # 每页50条 python openstd_downloader.py --output-dir ./GB_Doc # 自定义下载目录 python openstd_downloader.py --status "现行" # 只下载现行标准 python openstd_downloader.py --no-download # 仅采集元数据, 不下载PDF 依赖: pip install requests ddddocr openpyxl """ import sys import io import os import re import json import time import argparse from datetime import datetime import requests from openpyxl import Workbook from openpyxl.styles import Font, Alignment, PatternFill, Border, Side from openpyxl.utils import get_column_letter # ─── Windows 控制台中文输出修复 ───────────────────────── if sys.platform == "win32": sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace") sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace") # ─── 配置 ─────────────────────────────────────────────── LIST_URL = "https://openstd.samr.gov.cn/bzgk/std/std_list_type" DOWNLOAD_INIT_URL = "http://c.gb688.cn/bzgk/gb/showGb?type=download&hcno={hcno}" CAPTCHA_URL = "http://c.gb688.cn/bzgk/gb/gc?_{ts}" VERIFY_URL = "http://c.gb688.cn/bzgk/gb/verifyCode" PDF_URL = "http://c.gb688.cn/bzgk/gb/viewGb?hcno={hcno}" MAX_CAPTCHA_RETRIES = 8 REQUEST_TIMEOUT = 30 CACHE_FILE = ".openstd_cache.json" HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", } # 标准类型参数 p.p1 STD_TYPES = { "强制性": "1", "推荐性": "2", "指导性": "3", "全部": "", } # 输出列定义 OUTPUT_COLUMNS = [ ("标准号", 20), ("标准名称", 50), ("标准状态", 10), ("发布日期", 14), ("实施日期", 14), ("是否采标", 10), ("hcno", 35), ("文件名", 60), ("下载状态", 10), ] # ─── 列表页解析 ───────────────────────────────────────── def fetch_list_page(session, keyword, std_type, page_num, page_size): """请求列表页, 返回 HTML""" params = { "p.p1": std_type, "p.p2": keyword, "p.p90": "circulation_date", "p.p91": "desc", } if page_num > 1: params["page"] = page_num params["pageSize"] = page_size for attempt in range(3): try: resp = session.get(LIST_URL, params=params, headers=HEADERS, timeout=REQUEST_TIMEOUT) resp.raise_for_status() return resp.content.decode("utf-8") except Exception as e: print(f" [!] 列表页请求失败 (第 {attempt+1} 次): {e}") time.sleep(2) return None def parse_list_page(html): """解析列表页 HTML, 返回标准列表和总数""" # 提取每个标准的 hcno (去重, 每个标准有3个 onclick) hcnos = list(dict.fromkeys(re.findall(r"showInfo\('([A-F0-9]{32})'\)", html))) # 提取总数: 现行(216) 即将实施(36) 废止(408) status_counts = re.findall(r'现行\((\d+)\).*?即将实施\((\d+)\).*?废止\((\d+)\)', html, re.S) total = 0 if status_counts: total = sum(int(x) for x in status_counts[0]) # 提取表格数据 rows = re.findall(r']*>(.*?)', html, re.S) standards = [] for row in rows: cells = re.findall(r']*>(.*?)', row, re.S) if len(cells) < 6: continue first_cell = re.sub(r'<[^>]+>', '', cells[0]).strip() if not first_cell.isdigit(): continue std_code = re.sub(r'<[^>]+>', '', cells[1]).strip() std_name_raw = cells[3] std_name = re.sub(r'<[^>]+>', '', std_name_raw).strip() std_status = re.sub(r'<[^>]+>', '', cells[4]).strip() issue_date = re.sub(r'<[^>]+>', '', cells[5]).strip() act_date = re.sub(r'<[^>]+>', '', cells[6]).strip() if len(cells) > 6 else "" # 从 onclick 提取 hcno hcno_m = re.search(r"showInfo\('([A-F0-9]{32})'\)", cells[1]) hcno = hcno_m.group(1) if hcno_m else "" # 采标信息 adopted = re.sub(r'<[^>]+>', '', cells[2]).strip() if len(cells) > 2 else "" standards.append({ "标准号": std_code, "标准名称": std_name, "标准状态": std_status, "发布日期": issue_date[:10] if issue_date else "", "实施日期": act_date[:10] if act_date else "", "是否采标": adopted, "hcno": hcno, }) return standards, total def collect_all_standards(keyword, std_type, page_size, status_filter=""): """采集所有标准列表""" session = requests.Session() all_standards = [] html = fetch_list_page(session, keyword, std_type, 1, page_size) if not html: return all_standards standards, total = parse_list_page(html) all_standards.extend(standards) total_pages = (total + page_size - 1) // page_size if total > 0 else 1 print(f" 总计: {total} 条标准, {total_pages} 页") for page_num in range(2, total_pages + 1): html = fetch_list_page(session, keyword, std_type, page_num, page_size) if not html: break standards, _ = parse_list_page(html) if not standards: break all_standards.extend(standards) print(f" 已采集: {len(all_standards)}/{total} (第 {page_num}/{total_pages} 页)", end="\r") time.sleep(0.3) print() # 状态筛选 if status_filter: all_standards = [s for s in all_standards if status_filter in s.get("标准状态", "")] print(f" 筛选 [{status_filter}]: {len(all_standards)} 条") return all_standards # ─── PDF 下载 ─────────────────────────────────────────── def download_pdf(hcno, save_path, max_retries=3): """下载单个标准 PDF, 自动识别验证码""" import ddddocr ocr = ddddocr.DdddOcr(show_ad=False) for retry in range(max_retries): s = requests.Session() s.headers.update(HEADERS) try: # Step 1: 访问下载页获取 session s.get(DOWNLOAD_INIT_URL.format(hcno=hcno), timeout=REQUEST_TIMEOUT) # Step 2: 获取验证码 + OCR verified = False for captcha_attempt in range(MAX_CAPTCHA_RETRIES): r = s.get(CAPTCHA_URL.format(ts=int(time.time() * 1000)), timeout=REQUEST_TIMEOUT) if len(r.content) < 100: time.sleep(1) continue code = ocr.classification(r.content) # Step 3: 验证验证码 vr = s.post(VERIFY_URL, data={"verifyCode": code}, timeout=REQUEST_TIMEOUT) if vr.text.strip() == "success": verified = True break if not verified: if retry < max_retries - 1: print(f"验证码失败,重试({retry+1})") continue # Step 4: 下载 PDF dr = s.get(PDF_URL.format(hcno=hcno), timeout=60) if len(dr.content) > 1000: with open(save_path, "wb") as f: f.write(dr.content) return True, len(dr.content) else: # PDF 为空可能意味着该标准暂未提供全文, 不再重试 return False, -1 except Exception as e: print(f" [!] 下载异常: {e}, 重试 ({retry+1}/{max_retries})") time.sleep(2) return False, 0 def sanitize_filename(name): """清理文件名中的非法字符""" return re.sub(r'[\\/:*?"<>|]', ' ', name).strip() # ─── Excel 导出 ───────────────────────────────────────── def export_to_excel(records, output_path): wb = Workbook() ws = wb.active ws.title = "国家标准清单" hdr_font = Font(name="微软雅黑", bold=True, color="FFFFFF", size=11) hdr_fill = PatternFill(start_color="2F5496", end_color="2F5496", fill_type="solid") hdr_align = Alignment(horizontal="center", vertical="center", wrap_text=True) dat_font = Font(name="微软雅黑", size=10) dat_align = Alignment(vertical="center", wrap_text=True) even_fill = PatternFill(start_color="D6E4F0", end_color="D6E4F0", fill_type="solid") border = Border( left=Side(style="thin", color="B4C6E7"), right=Side(style="thin", color="B4C6E7"), top=Side(style="thin", color="B4C6E7"), bottom=Side(style="thin", color="B4C6E7"), ) col_names = [c[0] for c in OUTPUT_COLUMNS] for ci, name in enumerate(col_names, 1): cell = ws.cell(row=1, column=ci, value=name) cell.font = hdr_font cell.fill = hdr_fill cell.alignment = hdr_align cell.border = border for ri, rec in enumerate(records, 2): for ci, (col_name, _) in enumerate(OUTPUT_COLUMNS, 1): val = rec.get(col_name, "") cell = ws.cell(row=ri, column=ci, value=val) cell.font = dat_font cell.alignment = dat_align cell.border = border if ri % 2 == 0: cell.fill = even_fill for ci, (_, w) in enumerate(OUTPUT_COLUMNS, 1): ws.column_dimensions[get_column_letter(ci)].width = w ws.freeze_panes = "A2" ws.auto_filter.ref = f"A1:{get_column_letter(len(col_names))}{len(records) + 1}" wb.save(output_path) # ─── 缓存 ─────────────────────────────────────────────── def load_cache(): if os.path.exists(CACHE_FILE): try: with open(CACHE_FILE, "r", encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, IOError): pass return {"downloaded_hcnos": [], "records": []} def save_cache(cache): with open(CACHE_FILE, "w", encoding="utf-8") as f: json.dump(cache, f, ensure_ascii=False) # ─── 主流程 ───────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="国家标准全文公开系统 — PDF 批量下载工具", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: python openstd_downloader.py # 下载"车"相关强制性国家标准 python openstd_downloader.py --keyword "制动" # 搜索关键词 python openstd_downloader.py --type 2 # 推荐性国家标准 python openstd_downloader.py --status "现行" # 只下载现行标准 python openstd_downloader.py --no-download # 仅采集元数据, 不下载PDF """, ) parser.add_argument("--keyword", "-k", default="车", help="搜索关键词 (默认: 车)") parser.add_argument("--type", "-t", default="强制性", choices=["强制性", "推荐性", "指导性", "全部"], help="标准类型 (默认: 强制性)") parser.add_argument("--status", "-s", default="", help="状态筛选: 现行/即将实施/废止 (默认: 全部)") parser.add_argument("--page-size", "-p", type=int, default=50, help="每页条数 (默认50)") parser.add_argument("--output-dir", "-o", default="GB_Doc", help="PDF下载目录 (默认: GB_Doc)") parser.add_argument("--no-download", action="store_true", help="仅采集元数据, 不下载PDF") parser.add_argument("--max-count", "-n", type=int, default=0, help="最大下载数量 (0=全部)") args = parser.parse_args() std_type = STD_TYPES.get(args.type, "1") os.makedirs(args.output_dir, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") excel_path = os.path.join(args.output_dir, f"国家标准清单_{args.keyword}_{timestamp}.xlsx") print("=" * 60) print(" 国家标准全文公开系统 — PDF 批量下载工具") print("=" * 60) print(f" 关键词: {args.keyword}") print(f" 类型: {args.type}") print(f" 状态: {args.status or '全部'}") print(f" 下载目录: {args.output_dir}/") print(f" 下载PDF: {'否' if args.no_download else '是'}") print("-" * 60) # Step 1: 采集标准列表 print(" [1/2] 采集标准列表...") standards = collect_all_standards(args.keyword, std_type, args.page_size, args.status) if not standards: print(" 未找到任何标准") sys.exit(1) print(f" 共 {len(standards)} 条标准") # 限制数量 if args.max_count > 0: standards = standards[:args.max_count] print(f" 限制下载前 {args.max_count} 条") # Step 2: 下载 PDF cache = load_cache() downloaded_hcnos = set(cache.get("downloaded_hcnos", [])) if not args.no_download: print(f"\n [2/2] 下载 PDF 文件...") success_count = 0 skip_count = 0 fail_count = 0 for idx, std in enumerate(standards, 1): hcno = std.get("hcno", "") code = std.get("标准号", "") name = std.get("标准名称", "") filename = sanitize_filename(f"{code} {name}.pdf") filepath = os.path.join(args.output_dir, filename) # 跳过已下载 if hcno in downloaded_hcnos or os.path.exists(filepath): std["下载状态"] = "已存在" std["文件名"] = filename skip_count += 1 continue print(f" [{idx}/{len(standards)}] {code} {name[:30]}...", end=" ") ok, size = download_pdf(hcno, filepath) if ok: std["下载状态"] = "成功" std["文件名"] = filename downloaded_hcnos.add(hcno) success_count += 1 print(f"OK ({size/1024:.0f} KB)") else: std["下载状态"] = "失败" std["文件名"] = "" fail_count += 1 print("FAILED") # 保存进度 cache["downloaded_hcnos"] = list(downloaded_hcnos) save_cache(cache) # 礼貌延迟 time.sleep(1) print(f"\n 下载完成: 成功 {success_count}, 跳过 {skip_count}, 失败 {fail_count}") else: for std in standards: std["下载状态"] = "跳过" std["文件名"] = "" print("\n [2/2] 跳过下载 (--no-download)") # 导出 Excel export_to_excel(standards, excel_path) print(f"\n 元数据已导出: {excel_path}") # 统计 print(f"\n {'=' * 50}") print(f" 总计: {len(standards)} 条标准") print(f" Excel: {excel_path}") if not args.no_download: print(f" PDF目录: {args.output_dir}/") # 列出已下载文件 pdfs = [f for f in os.listdir(args.output_dir) if f.endswith('.pdf')] print(f" PDF文件数: {len(pdfs)}") print(f" {'=' * 50}") if __name__ == "__main__": main()