""" 国家标准全文公开系统 — 推荐性国家标准 PDF 批量下载工具 数据来源: https://openstd.samr.gov.cn/bzgk/std/std_list_type (p.p1=2 推荐性国家标准) 下载地址: http://c.gb688.cn/bzgk/gb/viewGb 功能: 1. 按关键词搜索推荐性国家标准 (如 "车" 可匹配所有车辆相关标准) 2. 自动识别验证码 (ddddocr) 并下载 PDF 全文 3. 支持筛选: 现行/即将实施/废止 4. 文件命名: "标准号 标准名称.pdf" (如 "GB/T 1234-2024 xxx技术要求.pdf") 5. 断点续传: 已下载的文件自动跳过 6. 导出标准元数据 Excel 用法: python openstd_gb_t_downloader.py # 下载"车"相关推荐性国家标准 python openstd_gb_t_downloader.py --keyword "制动" # 搜索关键词 python openstd_gb_t_downloader.py --status "现行" # 只下载现行标准 python openstd_gb_t_downloader.py --page-size 50 # 每页50条 python openstd_gb_t_downloader.py --output-dir ./GB_T_Doc # 自定义下载目录 python openstd_gb_t_downloader.py --no-download # 仅采集元数据, 不下载PDF 依赖: pip install requests ddddocr openpyxl """ import sys import io import os import re import json import time import argparse from datetime import datetime import requests from openpyxl import Workbook from openpyxl.styles import Font, Alignment, PatternFill, Border, Side from openpyxl.utils import get_column_letter # ─── Windows 控制台中文输出修复 ───────────────────────── if sys.platform == "win32": sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace") sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace") # ─── 配置 ─────────────────────────────────────────────── LIST_URL = "https://openstd.samr.gov.cn/bzgk/std/std_list_type" DOWNLOAD_INIT_URL = "http://c.gb688.cn/bzgk/gb/showGb?type=download&hcno={hcno}" CAPTCHA_URL = "http://c.gb688.cn/bzgk/gb/gc?_{ts}" VERIFY_URL = "http://c.gb688.cn/bzgk/gb/verifyCode" PDF_URL = "http://c.gb688.cn/bzgk/gb/viewGb?hcno={hcno}" # 推荐性国家标准 p.p1=2 STD_TYPE_P1 = "2" DEFAULT_KEYWORD = "车" MAX_CAPTCHA_RETRIES = 8 REQUEST_TIMEOUT = 30 CACHE_FILE = ".openstd_gb_t_cache.json" HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", } # 输出列定义 OUTPUT_COLUMNS = [ ("标准号", 22), ("标准名称", 50), ("标准状态", 10), ("发布日期", 14), ("实施日期", 14), ("是否采标", 10), ("hcno", 35), ("文件名", 60), ("下载状态", 10), ] # ─── 列表页解析 ───────────────────────────────────────── def fetch_list_page(session, keyword, page_num, page_size): """请求列表页, 返回 HTML""" params = { "p.p1": STD_TYPE_P1, # 推荐性国家标准 "p.p2": keyword, "p.p90": "circulation_date", "p.p91": "desc", } if page_num > 1: params["page"] = page_num params["pageSize"] = page_size for attempt in range(3): try: resp = session.get(LIST_URL, params=params, headers=HEADERS, timeout=REQUEST_TIMEOUT) resp.raise_for_status() return resp.content.decode("utf-8") except Exception as e: print(f" [!] 列表页请求失败 (第 {attempt+1} 次): {e}") time.sleep(2) return None def parse_list_page(html): """解析列表页 HTML, 返回标准列表和总数""" hcnos = list(dict.fromkeys(re.findall(r"showInfo\('([A-F0-9]{32})'\)", html))) status_counts = re.findall(r'现行\((\d+)\).*?即将实施\((\d+)\).*?废止\((\d+)\)', html, re.S) total = 0 if status_counts: total = sum(int(x) for x in status_counts[0]) rows = re.findall(r']*>(.*?)', html, re.S) standards = [] for row in rows: cells = re.findall(r']*>(.*?)', row, re.S) if len(cells) < 6: continue first_cell = re.sub(r'<[^>]+>', '', cells[0]).strip() if not first_cell.isdigit(): continue std_code = re.sub(r'<[^>]+>', '', cells[1]).strip() std_name_raw = cells[3] std_name = re.sub(r'<[^>]+>', '', std_name_raw).strip() std_status = re.sub(r'<[^>]+>', '', cells[4]).strip() issue_date = re.sub(r'<[^>]+>', '', cells[5]).strip() act_date = re.sub(r'<[^>]+>', '', cells[6]).strip() if len(cells) > 6 else "" hcno_m = re.search(r"showInfo\('([A-F0-9]{32})'\)", cells[1]) hcno = hcno_m.group(1) if hcno_m else "" adopted = re.sub(r'<[^>]+>', '', cells[2]).strip() if len(cells) > 2 else "" standards.append({ "标准号": std_code, "标准名称": std_name, "标准状态": std_status, "发布日期": issue_date[:10] if issue_date else "", "实施日期": act_date[:10] if act_date else "", "是否采标": adopted, "hcno": hcno, }) return standards, total def collect_all_standards(keyword, page_size, status_filter=""): """采集所有标准列表""" session = requests.Session() all_standards = [] html = fetch_list_page(session, keyword, 1, page_size) if not html: return all_standards standards, total = parse_list_page(html) all_standards.extend(standards) total_pages = (total + page_size - 1) // page_size if total > 0 else 1 print(f" 总计: {total} 条标准, {total_pages} 页") for page_num in range(2, total_pages + 1): html = fetch_list_page(session, keyword, page_num, page_size) if not html: break standards, _ = parse_list_page(html) if not standards: break all_standards.extend(standards) print(f" 已采集: {len(all_standards)}/{total} (第 {page_num}/{total_pages} 页)", end="\r") time.sleep(0.3) print() if status_filter: all_standards = [s for s in all_standards if status_filter in s.get("标准状态", "")] print(f" 筛选 [{status_filter}]: {len(all_standards)} 条") return all_standards # ─── PDF 下载 ─────────────────────────────────────────── def download_pdf(hcno, save_path, max_retries=3): """下载单个标准 PDF, 自动识别验证码""" import ddddocr ocr = ddddocr.DdddOcr(show_ad=False) for retry in range(max_retries): s = requests.Session() s.headers.update(HEADERS) try: s.get(DOWNLOAD_INIT_URL.format(hcno=hcno), timeout=REQUEST_TIMEOUT) verified = False for captcha_attempt in range(MAX_CAPTCHA_RETRIES): r = s.get(CAPTCHA_URL.format(ts=int(time.time() * 1000)), timeout=REQUEST_TIMEOUT) if len(r.content) < 100: time.sleep(1) continue code = ocr.classification(r.content) vr = s.post(VERIFY_URL, data={"verifyCode": code}, timeout=REQUEST_TIMEOUT) if vr.text.strip() == "success": verified = True break if not verified: if retry < max_retries - 1: print(f"验证码失败,重试({retry+1})") continue dr = s.get(PDF_URL.format(hcno=hcno), timeout=60) if len(dr.content) > 1000: with open(save_path, "wb") as f: f.write(dr.content) return True, len(dr.content) else: return False, -1 except Exception as e: print(f" [!] 下载异常: {e}, 重试 ({retry+1}/{max_retries})") time.sleep(2) return False, 0 def sanitize_filename(name): """清理文件名中的非法字符""" return re.sub(r'[\\/:*?"<>|]', ' ', name).strip() # ─── Excel 导出 ───────────────────────────────────────── def export_to_excel(records, output_path): wb = Workbook() ws = wb.active ws.title = "推荐性国家标准清单" hdr_font = Font(name="微软雅黑", bold=True, color="FFFFFF", size=11) hdr_fill = PatternFill(start_color="375623", end_color="375623", fill_type="solid") # 绿色表示推荐性 hdr_align = Alignment(horizontal="center", vertical="center", wrap_text=True) dat_font = Font(name="微软雅黑", size=10) dat_align = Alignment(vertical="center", wrap_text=True) even_fill = PatternFill(start_color="E2EFDA", end_color="E2EFDA", fill_type="solid") border = Border( left=Side(style="thin", color="A9D08E"), right=Side(style="thin", color="A9D08E"), top=Side(style="thin", color="A9D08E"), bottom=Side(style="thin", color="A9D08E"), ) col_names = [c[0] for c in OUTPUT_COLUMNS] for ci, name in enumerate(col_names, 1): cell = ws.cell(row=1, column=ci, value=name) cell.font = hdr_font cell.fill = hdr_fill cell.alignment = hdr_align cell.border = border for ri, rec in enumerate(records, 2): for ci, (col_name, _) in enumerate(OUTPUT_COLUMNS, 1): val = rec.get(col_name, "") cell = ws.cell(row=ri, column=ci, value=val) cell.font = dat_font cell.alignment = dat_align cell.border = border if ri % 2 == 0: cell.fill = even_fill for ci, (_, w) in enumerate(OUTPUT_COLUMNS, 1): ws.column_dimensions[get_column_letter(ci)].width = w ws.freeze_panes = "A2" ws.auto_filter.ref = f"A1:{get_column_letter(len(col_names))}{len(records) + 1}" wb.save(output_path) # ─── 缓存 ─────────────────────────────────────────────── def load_cache(): if os.path.exists(CACHE_FILE): try: with open(CACHE_FILE, "r", encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, IOError): pass return {"downloaded_hcnos": [], "records": []} def save_cache(cache): with open(CACHE_FILE, "w", encoding="utf-8") as f: json.dump(cache, f, ensure_ascii=False) # ─── 主流程 ───────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="国家标准全文公开系统 — 推荐性国家标准 PDF 批量下载工具", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: python openstd_gb_t_downloader.py # 下载"车"相关推荐性国家标准 python openstd_gb_t_downloader.py --keyword "制动" # 搜索关键词 python openstd_gb_t_downloader.py --status "现行" # 只下载现行标准 python openstd_gb_t_downloader.py --no-download # 仅采集元数据, 不下载PDF """, ) parser.add_argument("--keyword", "-k", default=DEFAULT_KEYWORD, help="搜索关键词 (默认: 车)") parser.add_argument("--status", "-s", default="", help="状态筛选: 现行/即将实施/废止 (默认: 全部)") parser.add_argument("--page-size", "-p", type=int, default=50, help="每页条数 (默认50)") parser.add_argument("--output-dir", "-o", default="GB_T_Doc", help="PDF下载目录 (默认: GB_T_Doc)") parser.add_argument("--no-download", action="store_true", help="仅采集元数据, 不下载PDF") parser.add_argument("--max-count", "-n", type=int, default=0, help="最大下载数量 (0=全部)") args = parser.parse_args() os.makedirs(args.output_dir, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") excel_path = os.path.join(args.output_dir, f"推荐性国家标准清单_{args.keyword}_{timestamp}.xlsx") print("=" * 60) print(" 国家标准全文公开系统 — 推荐性国家标准 PDF 批量下载工具") print("=" * 60) print(f" 关键词: {args.keyword}") print(f" 类型: 推荐性国家标准 (GB/T)") print(f" 状态: {args.status or '全部'}") print(f" 下载目录: {args.output_dir}/") print(f" 下载PDF: {'否' if args.no_download else '是'}") print("-" * 60) # Step 1: 采集标准列表 print(" [1/2] 采集标准列表...") standards = collect_all_standards(args.keyword, args.page_size, args.status) if not standards: print(" 未找到任何标准") sys.exit(1) print(f" 共 {len(standards)} 条标准") if args.max_count > 0: standards = standards[:args.max_count] print(f" 限制下载前 {args.max_count} 条") # Step 2: 下载 PDF cache = load_cache() downloaded_hcnos = set(cache.get("downloaded_hcnos", [])) if not args.no_download: print(f"\n [2/2] 下载 PDF 文件...") success_count = 0 skip_count = 0 fail_count = 0 for idx, std in enumerate(standards, 1): hcno = std.get("hcno", "") code = std.get("标准号", "") name = std.get("标准名称", "") filename = sanitize_filename(f"{code} {name}.pdf") filepath = os.path.join(args.output_dir, filename) if hcno in downloaded_hcnos or os.path.exists(filepath): std["下载状态"] = "已存在" std["文件名"] = filename skip_count += 1 continue print(f" [{idx}/{len(standards)}] {code} {name[:30]}...", end=" ") ok, size = download_pdf(hcno, filepath) if ok: std["下载状态"] = "成功" std["文件名"] = filename downloaded_hcnos.add(hcno) success_count += 1 print(f"OK ({size/1024:.0f} KB)") elif size == -1: std["下载状态"] = "无PDF" std["文件名"] = "" fail_count += 1 print("NO PDF") else: std["下载状态"] = "失败" std["文件名"] = "" fail_count += 1 print("FAILED") cache["downloaded_hcnos"] = list(downloaded_hcnos) save_cache(cache) time.sleep(1) print(f"\n 下载完成: 成功 {success_count}, 跳过 {skip_count}, 无PDF/失败 {fail_count}") else: for std in standards: std["下载状态"] = "跳过" std["文件名"] = "" print("\n [2/2] 跳过下载 (--no-download)") export_to_excel(standards, excel_path) print(f"\n 元数据已导出: {excel_path}") print(f"\n {'=' * 50}") print(f" 总计: {len(standards)} 条推荐性国家标准") print(f" Excel: {excel_path}") if not args.no_download: print(f" PDF目录: {args.output_dir}/") pdfs = [f for f in os.listdir(args.output_dir) if f.endswith('.pdf')] print(f" PDF文件数: {len(pdfs)}") print(f" {'=' * 50}") if __name__ == "__main__": main()