""" 抓取微信“小程序话题”下的公众号文章(贴图)并输出为 JSON。 参考实现来源: - server/src/api/WxAppmsgContent.ts - server/src/services/content/appmsg/AppMsgService.ts 脚本参数 --topic:话题名称,例如 #乡源文化挖掘 --out:输出文件名,默认 data.json 输出 JSON 数组,每条为 AppMsg: * outlinkId: int : 微信官方贴图的 ID,可以拿来去重 * userId: int : 从小程序传来的用户ID * url: str : 实际跳转的 URL * title: str : 标题 * images: List[str] : 封面图 * content: str : 内容 用法(PowerShell / pwsh): pwsh -NoProfile -Command "python ./collect-app-msg.py --topic '#乡源文化挖掘' --out data.json" 依赖: python -m pip install -U requests playwright python -m playwright install chromium """ from __future__ import annotations import argparse import json import re import time from dataclasses import dataclass, asdict from typing import Any, Dict, Iterable, List, Optional, Tuple, Set import requests TOPIC_LIST_ENDPOINT = "https://mp.weixin.qq.com/mp/appmsgtopic" UA = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/91.0.4472.124 Safari/537.36" ) @dataclass class AppMsg: # 结构同 server/src/models/content/appmsg/AppMsgModel.ts outlinkId: int = 0 userId: int = 0 url: str = "" title: str = "" images: List[str] = None # type: ignore[assignment] content: str = "" def __post_init__(self) -> None: if self.images is None: self.images = [] def _lazy_import_playwright(): """ 延迟导入 Playwright,避免未安装时报错(只有用户选择浏览器抓取才需要)。 """ try: from playwright.sync_api import sync_playwright # type: ignore return sync_playwright except Exception as e: raise RuntimeError( "Playwright 未安装或不可用。请执行:python -m pip install -U playwright && python -m playwright install chromium" ) from e def _load_existing_appmsgs(path: str) -> List[Dict[str, Any]]: try: # 兼容:历史文件可能是 utf-8 或 utf-8-sig(带 BOM) try: with open(path, "r", encoding="utf-8-sig") as f: data = json.load(f) except UnicodeError: with open(path, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, list): return [x for x in data if isinstance(x, dict)] except FileNotFoundError: return [] except json.JSONDecodeError: # 文件存在但不是合法 JSON:当作无历史数据 return [] return [] def _build_seen_keys(existing: List[Dict[str, Any]]) -> Set[str]: """ 用 outlinkId 优先去重;缺失时用 url 兜底。 """ seen: Set[str] = set() for it in existing: outlink_id = it.get("outlinkId") if isinstance(outlink_id, int) and outlink_id: seen.add(f"id:{outlink_id}") url = it.get("url") if isinstance(url, str) and url: seen.add(f"url:{url}") return seen def _topic_msg_key(topic_msg: Dict[str, Any]) -> Tuple[Optional[str], Optional[int], Optional[str]]: outlink_id: Optional[int] = None try: outlink_id = int(((topic_msg.get("id") or {}).get("msgid")) or 0) or None except Exception: outlink_id = None url = topic_msg.get("jump_url") if not isinstance(url, str) or not url: url = None key = None if outlink_id: key = f"id:{outlink_id}" elif url: key = f"url:{url}" return key, outlink_id, url def _requests_session() -> requests.Session: s = requests.Session() s.headers.update( { "User-Agent": UA, "Accept": "application/json,text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", } ) return s def get_topic_list( sess: requests.Session, *, topic: str, paging: str = "" ) -> Dict[str, Any]: """ 等价于 WxAppmsgContentApi.getTopicList: GET /mp/appmsgtopic?action=topic_list&topic=...&paging=...&f=json... """ params = { "action": "topic_list", "topic": topic, "paging": paging or "", "sort_type": 1, "from": 1, # 下面这些字段在 TS 侧是固定值/空值,保持一致以提高兼容性 "from_biz": 3636524509, "from_msgid": 2247483692, "from_itemidx": 1, "appid": "", "silent": 1, "uin": 0, "key": "", "pass_ticket": "", "wxtoken": "", "devicetype": "", "clientversion": "false", "version": "false", "appmsg_token": "", "x5": 0, "f": "json", "user_article_role": 0, } r = sess.get(TOPIC_LIST_ENDPOINT, params=params, timeout=30) r.raise_for_status() data = r.json() # 微信侧常见返回:{ code, message, ... } 或直接业务字段 if isinstance(data, dict) and "code" in data and data.get("code") not in (None, 0): raise RuntimeError(f"topic_list failed: code={data.get('code')} message={data.get('message')}") return data def parse_cgi_data_new_from_html(html: str) -> Dict[str, Any]: raise RuntimeError("已移除基于 HTML 的解析方式,请使用 Playwright 读取 window.cgiDataNew") def get_appmsg_detail_via_playwright(url: str, *, timeout_ms: int = 30000) -> Dict[str, Any]: """ 使用浏览器直接读取 window.cgiDataNew(最接近“页面真实运行态”)。 优点:避免 HTML 文本解析/对象字面量兼容问题。 """ sync_playwright = _lazy_import_playwright() with sync_playwright() as p: browser = p.chromium.launch(headless=True) context = browser.new_context( user_agent=UA, locale="zh-CN", extra_http_headers={ "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", }, ) page = context.new_page() try: page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms) data = page.evaluate( """() => { const d = (globalThis && globalThis.cgiDataNew) ? globalThis.cgiDataNew : (window && window.cgiDataNew); if (!d) return null; return JSON.parse(JSON.stringify(d)); }""" ) # python side validation if not isinstance(data, dict): raise RuntimeError("window.cgiDataNew not found or not an object") return data finally: try: context.close() finally: browser.close() def get_appmsg_detail(sess: requests.Session, url: str) -> Dict[str, Any]: # 已移除 json5 / node 的 HTML 解析逻辑,统一走浏览器读取 window.cgiDataNew return get_appmsg_detail_via_playwright(url) def _extract_user_id_from_detail(detail: Dict[str, Any]) -> int: """ 复刻 AppMsgService.collectAppMsgContent 中的 userId 推导逻辑。 """ try: eps = detail.get("ext_publish_source") or {} weapp = eps.get("weapp_info") or {} desc = weapp.get("desc") or "" if isinstance(desc, str) and desc.startswith("亮乡源话题"): m = re.search(r"亮乡源话题(\d+)", desc) if m: return int(m.group(1)) except Exception: pass return 0 def _extract_images_from_detail(detail: Dict[str, Any]) -> List[str]: pics = detail.get("picture_page_info_list") or [] images: List[str] = [] if isinstance(pics, list): for it in pics: if isinstance(it, dict): url = it.get("cdn_url") if isinstance(url, str) and url: images.append(url) return images def build_appmsg(topic_msg: Dict[str, Any], detail: Dict[str, Any]) -> AppMsg: outlink_id = 0 try: outlink_id = int(((topic_msg.get("id") or {}).get("msgid")) or 0) except Exception: outlink_id = 0 url = topic_msg.get("jump_url") or "" if not isinstance(url, str): url = "" title = detail.get("title") or "" if not isinstance(title, str): title = "" content = detail.get("content_noencode") or "" if not isinstance(content, str): content = "" return AppMsg( id=0, outlinkId=outlink_id, userId=_extract_user_id_from_detail(detail), url=url, title=title, images=_extract_images_from_detail(detail), content=content, ) def collect_topic_appmsgs( *, topic: str, max_pages: int = 10, sleep_sec: float = 0.3, seen_keys: Optional[Set[str]] = None, ) -> List[AppMsg]: sess = _requests_session() paging = "" page = 0 results: List[AppMsg] = [] local_seen: Set[str] = set(seen_keys or set()) while True: page += 1 if max_pages > 0 and page > max_pages: break data = get_topic_list(sess, topic=topic, paging=paging) topic_msgs = data.get("topic_msgs") or [] if not isinstance(topic_msgs, list): break page_has_new = False for item in topic_msgs: if not isinstance(item, dict): continue key, outlink_id, jump_url = _topic_msg_key(item) if key and key in local_seen: continue if not jump_url: continue # 只有未抓取过的条目才请求详情页 detail = get_appmsg_detail(sess, jump_url) appmsg = build_appmsg(item, detail) # 标记为已抓取 if appmsg.outlinkId: local_seen.add(f"id:{appmsg.outlinkId}") if appmsg.url: local_seen.add(f"url:{appmsg.url}") results.append(appmsg) page_has_new = True if sleep_sec > 0: time.sleep(sleep_sec) # 如果这一页全部在已有 JSON 中(没有任何新条目),终止后续翻页 if not page_has_new: break paging = data.get("topic_paging") or "" if not isinstance(paging, str) or not paging: break # 如果本页没抓到任何内容,就停止继续翻页 if len(topic_msgs) == 0: break return results def main(argv: Optional[List[str]] = None) -> int: parser = argparse.ArgumentParser(description="抓取微信话题下的公众号文章(贴图)并输出 JSON") parser.add_argument("--topic", required=True, help="话题名称,例如:#乡源文化挖掘") parser.add_argument("--out", default="data.json", help="输出文件名,默认 data.json") parser.add_argument("--max-pages", type=int, default=10, help="最多抓取页数,默认 10(<=0 表示不限制)") parser.add_argument("--sleep", type=float, default=0.3, help="每条之间的延迟秒数,默认 0.3") args = parser.parse_args(argv) existing = _load_existing_appmsgs(args.out) seen = _build_seen_keys(existing) new_appmsgs = collect_topic_appmsgs( topic=args.topic, max_pages=args.max_pages, sleep_sec=args.sleep, seen_keys=seen, ) new_payload = [asdict(x) for x in new_appmsgs] # 保存时:最新抓到的放在最上面;并按 outlinkId/url 去重(新覆盖旧) merged: List[Dict[str, Any]] = [] merged_seen: Set[str] = set() for it in new_payload + existing: if not isinstance(it, dict): continue key: Optional[str] = None outlink_id = it.get("outlinkId") url = it.get("url") if isinstance(outlink_id, int) and outlink_id: key = f"id:{outlink_id}" elif isinstance(url, str) and url: key = f"url:{url}" if key and key in merged_seen: continue if key: merged_seen.add(key) merged.append(it) # 写出 utf-8-sig(带 BOM),避免部分 Windows 工具按 ANSI/GBK 误判导致“中文乱码” with open(args.out, "w", encoding="utf-8-sig") as f: json.dump(merged, f, ensure_ascii=False, indent=2) print( f"OK: topic={args.topic} new={len(new_payload)} total={len(merged)} out={args.out}" ) return 0 if __name__ == "__main__": raise SystemExit(main())