first commit

This commit is contained in:
2026-04-17 11:41:22 +08:00
commit 105ccf145c
164 changed files with 2206 additions and 0 deletions

416
openstd_gb_t_downloader.py Normal file
View File

@@ -0,0 +1,416 @@
"""
国家标准全文公开系统 — 推荐性国家标准 PDF 批量下载工具
数据来源: https://openstd.samr.gov.cn/bzgk/std/std_list_type (p.p1=2 推荐性国家标准)
下载地址: http://c.gb688.cn/bzgk/gb/viewGb
功能:
1. 按关键词搜索推荐性国家标准 (如 "" 可匹配所有车辆相关标准)
2. 自动识别验证码 (ddddocr) 并下载 PDF 全文
3. 支持筛选: 现行/即将实施/废止
4. 文件命名: "标准号 标准名称.pdf" (如 "GB/T 1234-2024 xxx技术要求.pdf")
5. 断点续传: 已下载的文件自动跳过
6. 导出标准元数据 Excel
用法:
python openstd_gb_t_downloader.py # 下载""相关推荐性国家标准
python openstd_gb_t_downloader.py --keyword "制动" # 搜索关键词
python openstd_gb_t_downloader.py --status "现行" # 只下载现行标准
python openstd_gb_t_downloader.py --page-size 50 # 每页50条
python openstd_gb_t_downloader.py --output-dir ./GB_T_Doc # 自定义下载目录
python openstd_gb_t_downloader.py --no-download # 仅采集元数据, 不下载PDF
依赖:
pip install requests ddddocr openpyxl
"""
import sys
import io
import os
import re
import json
import time
import argparse
from datetime import datetime
import requests
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
from openpyxl.utils import get_column_letter
# ─── Windows 控制台中文输出修复 ─────────────────────────
if sys.platform == "win32":
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
# ─── 配置 ───────────────────────────────────────────────
LIST_URL = "https://openstd.samr.gov.cn/bzgk/std/std_list_type"
DOWNLOAD_INIT_URL = "http://c.gb688.cn/bzgk/gb/showGb?type=download&hcno={hcno}"
CAPTCHA_URL = "http://c.gb688.cn/bzgk/gb/gc?_{ts}"
VERIFY_URL = "http://c.gb688.cn/bzgk/gb/verifyCode"
PDF_URL = "http://c.gb688.cn/bzgk/gb/viewGb?hcno={hcno}"
# 推荐性国家标准 p.p1=2
STD_TYPE_P1 = "2"
DEFAULT_KEYWORD = ""
MAX_CAPTCHA_RETRIES = 8
REQUEST_TIMEOUT = 30
CACHE_FILE = ".openstd_gb_t_cache.json"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
}
# 输出列定义
OUTPUT_COLUMNS = [
("标准号", 22),
("标准名称", 50),
("标准状态", 10),
("发布日期", 14),
("实施日期", 14),
("是否采标", 10),
("hcno", 35),
("文件名", 60),
("下载状态", 10),
]
# ─── 列表页解析 ─────────────────────────────────────────
def fetch_list_page(session, keyword, page_num, page_size):
"""请求列表页, 返回 HTML"""
params = {
"p.p1": STD_TYPE_P1, # 推荐性国家标准
"p.p2": keyword,
"p.p90": "circulation_date",
"p.p91": "desc",
}
if page_num > 1:
params["page"] = page_num
params["pageSize"] = page_size
for attempt in range(3):
try:
resp = session.get(LIST_URL, params=params, headers=HEADERS, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
return resp.content.decode("utf-8")
except Exception as e:
print(f" [!] 列表页请求失败 (第 {attempt+1} 次): {e}")
time.sleep(2)
return None
def parse_list_page(html):
"""解析列表页 HTML, 返回标准列表和总数"""
hcnos = list(dict.fromkeys(re.findall(r"showInfo\('([A-F0-9]{32})'\)", html)))
status_counts = re.findall(r'现行\((\d+)\).*?即将实施\((\d+)\).*?废止\((\d+)\)', html, re.S)
total = 0
if status_counts:
total = sum(int(x) for x in status_counts[0])
rows = re.findall(r'<tr[^>]*>(.*?)</tr>', html, re.S)
standards = []
for row in rows:
cells = re.findall(r'<td[^>]*>(.*?)</td>', row, re.S)
if len(cells) < 6:
continue
first_cell = re.sub(r'<[^>]+>', '', cells[0]).strip()
if not first_cell.isdigit():
continue
std_code = re.sub(r'<[^>]+>', '', cells[1]).strip()
std_name_raw = cells[3]
std_name = re.sub(r'<[^>]+>', '', std_name_raw).strip()
std_status = re.sub(r'<[^>]+>', '', cells[4]).strip()
issue_date = re.sub(r'<[^>]+>', '', cells[5]).strip()
act_date = re.sub(r'<[^>]+>', '', cells[6]).strip() if len(cells) > 6 else ""
hcno_m = re.search(r"showInfo\('([A-F0-9]{32})'\)", cells[1])
hcno = hcno_m.group(1) if hcno_m else ""
adopted = re.sub(r'<[^>]+>', '', cells[2]).strip() if len(cells) > 2 else ""
standards.append({
"标准号": std_code,
"标准名称": std_name,
"标准状态": std_status,
"发布日期": issue_date[:10] if issue_date else "",
"实施日期": act_date[:10] if act_date else "",
"是否采标": adopted,
"hcno": hcno,
})
return standards, total
def collect_all_standards(keyword, page_size, status_filter=""):
"""采集所有标准列表"""
session = requests.Session()
all_standards = []
html = fetch_list_page(session, keyword, 1, page_size)
if not html:
return all_standards
standards, total = parse_list_page(html)
all_standards.extend(standards)
total_pages = (total + page_size - 1) // page_size if total > 0 else 1
print(f" 总计: {total} 条标准, {total_pages}")
for page_num in range(2, total_pages + 1):
html = fetch_list_page(session, keyword, page_num, page_size)
if not html:
break
standards, _ = parse_list_page(html)
if not standards:
break
all_standards.extend(standards)
print(f" 已采集: {len(all_standards)}/{total} (第 {page_num}/{total_pages} 页)", end="\r")
time.sleep(0.3)
print()
if status_filter:
all_standards = [s for s in all_standards if status_filter in s.get("标准状态", "")]
print(f" 筛选 [{status_filter}]: {len(all_standards)}")
return all_standards
# ─── PDF 下载 ───────────────────────────────────────────
def download_pdf(hcno, save_path, max_retries=3):
"""下载单个标准 PDF, 自动识别验证码"""
import ddddocr
ocr = ddddocr.DdddOcr(show_ad=False)
for retry in range(max_retries):
s = requests.Session()
s.headers.update(HEADERS)
try:
s.get(DOWNLOAD_INIT_URL.format(hcno=hcno), timeout=REQUEST_TIMEOUT)
verified = False
for captcha_attempt in range(MAX_CAPTCHA_RETRIES):
r = s.get(CAPTCHA_URL.format(ts=int(time.time() * 1000)), timeout=REQUEST_TIMEOUT)
if len(r.content) < 100:
time.sleep(1)
continue
code = ocr.classification(r.content)
vr = s.post(VERIFY_URL, data={"verifyCode": code}, timeout=REQUEST_TIMEOUT)
if vr.text.strip() == "success":
verified = True
break
if not verified:
if retry < max_retries - 1:
print(f"验证码失败,重试({retry+1})")
continue
dr = s.get(PDF_URL.format(hcno=hcno), timeout=60)
if len(dr.content) > 1000:
with open(save_path, "wb") as f:
f.write(dr.content)
return True, len(dr.content)
else:
return False, -1
except Exception as e:
print(f" [!] 下载异常: {e}, 重试 ({retry+1}/{max_retries})")
time.sleep(2)
return False, 0
def sanitize_filename(name):
"""清理文件名中的非法字符"""
return re.sub(r'[\\/:*?"<>|]', ' ', name).strip()
# ─── Excel 导出 ─────────────────────────────────────────
def export_to_excel(records, output_path):
wb = Workbook()
ws = wb.active
ws.title = "推荐性国家标准清单"
hdr_font = Font(name="微软雅黑", bold=True, color="FFFFFF", size=11)
hdr_fill = PatternFill(start_color="375623", end_color="375623", fill_type="solid") # 绿色表示推荐性
hdr_align = Alignment(horizontal="center", vertical="center", wrap_text=True)
dat_font = Font(name="微软雅黑", size=10)
dat_align = Alignment(vertical="center", wrap_text=True)
even_fill = PatternFill(start_color="E2EFDA", end_color="E2EFDA", fill_type="solid")
border = Border(
left=Side(style="thin", color="A9D08E"),
right=Side(style="thin", color="A9D08E"),
top=Side(style="thin", color="A9D08E"),
bottom=Side(style="thin", color="A9D08E"),
)
col_names = [c[0] for c in OUTPUT_COLUMNS]
for ci, name in enumerate(col_names, 1):
cell = ws.cell(row=1, column=ci, value=name)
cell.font = hdr_font
cell.fill = hdr_fill
cell.alignment = hdr_align
cell.border = border
for ri, rec in enumerate(records, 2):
for ci, (col_name, _) in enumerate(OUTPUT_COLUMNS, 1):
val = rec.get(col_name, "")
cell = ws.cell(row=ri, column=ci, value=val)
cell.font = dat_font
cell.alignment = dat_align
cell.border = border
if ri % 2 == 0:
cell.fill = even_fill
for ci, (_, w) in enumerate(OUTPUT_COLUMNS, 1):
ws.column_dimensions[get_column_letter(ci)].width = w
ws.freeze_panes = "A2"
ws.auto_filter.ref = f"A1:{get_column_letter(len(col_names))}{len(records) + 1}"
wb.save(output_path)
# ─── 缓存 ───────────────────────────────────────────────
def load_cache():
if os.path.exists(CACHE_FILE):
try:
with open(CACHE_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
pass
return {"downloaded_hcnos": [], "records": []}
def save_cache(cache):
with open(CACHE_FILE, "w", encoding="utf-8") as f:
json.dump(cache, f, ensure_ascii=False)
# ─── 主流程 ─────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="国家标准全文公开系统 — 推荐性国家标准 PDF 批量下载工具",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
python openstd_gb_t_downloader.py # 下载""相关推荐性国家标准
python openstd_gb_t_downloader.py --keyword "制动" # 搜索关键词
python openstd_gb_t_downloader.py --status "现行" # 只下载现行标准
python openstd_gb_t_downloader.py --no-download # 仅采集元数据, 不下载PDF
""",
)
parser.add_argument("--keyword", "-k", default=DEFAULT_KEYWORD, help="搜索关键词 (默认: 车)")
parser.add_argument("--status", "-s", default="", help="状态筛选: 现行/即将实施/废止 (默认: 全部)")
parser.add_argument("--page-size", "-p", type=int, default=50, help="每页条数 (默认50)")
parser.add_argument("--output-dir", "-o", default="GB_T_Doc", help="PDF下载目录 (默认: GB_T_Doc)")
parser.add_argument("--no-download", action="store_true", help="仅采集元数据, 不下载PDF")
parser.add_argument("--max-count", "-n", type=int, default=0, help="最大下载数量 (0=全部)")
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
excel_path = os.path.join(args.output_dir, f"推荐性国家标准清单_{args.keyword}_{timestamp}.xlsx")
print("=" * 60)
print(" 国家标准全文公开系统 — 推荐性国家标准 PDF 批量下载工具")
print("=" * 60)
print(f" 关键词: {args.keyword}")
print(f" 类型: 推荐性国家标准 (GB/T)")
print(f" 状态: {args.status or '全部'}")
print(f" 下载目录: {args.output_dir}/")
print(f" 下载PDF: {'' if args.no_download else ''}")
print("-" * 60)
# Step 1: 采集标准列表
print(" [1/2] 采集标准列表...")
standards = collect_all_standards(args.keyword, args.page_size, args.status)
if not standards:
print(" 未找到任何标准")
sys.exit(1)
print(f"{len(standards)} 条标准")
if args.max_count > 0:
standards = standards[:args.max_count]
print(f" 限制下载前 {args.max_count}")
# Step 2: 下载 PDF
cache = load_cache()
downloaded_hcnos = set(cache.get("downloaded_hcnos", []))
if not args.no_download:
print(f"\n [2/2] 下载 PDF 文件...")
success_count = 0
skip_count = 0
fail_count = 0
for idx, std in enumerate(standards, 1):
hcno = std.get("hcno", "")
code = std.get("标准号", "")
name = std.get("标准名称", "")
filename = sanitize_filename(f"{code} {name}.pdf")
filepath = os.path.join(args.output_dir, filename)
if hcno in downloaded_hcnos or os.path.exists(filepath):
std["下载状态"] = "已存在"
std["文件名"] = filename
skip_count += 1
continue
print(f" [{idx}/{len(standards)}] {code} {name[:30]}...", end=" ")
ok, size = download_pdf(hcno, filepath)
if ok:
std["下载状态"] = "成功"
std["文件名"] = filename
downloaded_hcnos.add(hcno)
success_count += 1
print(f"OK ({size/1024:.0f} KB)")
elif size == -1:
std["下载状态"] = "无PDF"
std["文件名"] = ""
fail_count += 1
print("NO PDF")
else:
std["下载状态"] = "失败"
std["文件名"] = ""
fail_count += 1
print("FAILED")
cache["downloaded_hcnos"] = list(downloaded_hcnos)
save_cache(cache)
time.sleep(1)
print(f"\n 下载完成: 成功 {success_count}, 跳过 {skip_count}, 无PDF/失败 {fail_count}")
else:
for std in standards:
std["下载状态"] = "跳过"
std["文件名"] = ""
print("\n [2/2] 跳过下载 (--no-download)")
export_to_excel(standards, excel_path)
print(f"\n 元数据已导出: {excel_path}")
print(f"\n {'=' * 50}")
print(f" 总计: {len(standards)} 条推荐性国家标准")
print(f" Excel: {excel_path}")
if not args.no_download:
print(f" PDF目录: {args.output_dir}/")
pdfs = [f for f in os.listdir(args.output_dir) if f.endswith('.pdf')]
print(f" PDF文件数: {len(pdfs)}")
print(f" {'=' * 50}")
if __name__ == "__main__":
main()