| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400 |
- """
- 抓取微信“小程序话题”下的公众号文章(贴图)并输出为 JSON。
- 参考实现来源:
- - server/src/api/WxAppmsgContent.ts
- - server/src/services/content/appmsg/AppMsgService.ts
- 脚本参数
- --topic:话题名称,例如 #乡源文化挖掘
- --out:输出文件名,默认 data.json
- 输出
- JSON 数组,每条为 AppMsg:
- * outlinkId: int : 微信官方贴图的 ID,可以拿来去重
- * userId: int : 从小程序传来的用户ID
- * url: str : 实际跳转的 URL
- * title: str : 标题
- * images: List[str] : 封面图
- * content: str : 内容
- 用法(PowerShell / pwsh):
- pwsh -NoProfile -Command "python ./collect-app-msg.py --topic '#乡源文化挖掘' --out data.json"
- 依赖:
- python -m pip install -U requests playwright
- python -m playwright install chromium
- """
- from __future__ import annotations
- import argparse
- import json
- import re
- import time
- from dataclasses import dataclass, asdict
- from typing import Any, Dict, Iterable, List, Optional, Tuple, Set
- import requests
- TOPIC_LIST_ENDPOINT = "https://mp.weixin.qq.com/mp/appmsgtopic"
- UA = (
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
- "AppleWebKit/537.36 (KHTML, like Gecko) "
- "Chrome/91.0.4472.124 Safari/537.36"
- )
- @dataclass
- class AppMsg:
- # 结构同 server/src/models/content/appmsg/AppMsgModel.ts
- outlinkId: int = 0
- userId: int = 0
- url: str = ""
- title: str = ""
- images: List[str] = None # type: ignore[assignment]
- content: str = ""
- def __post_init__(self) -> None:
- if self.images is None:
- self.images = []
- def _lazy_import_playwright():
- """
- 延迟导入 Playwright,避免未安装时报错(只有用户选择浏览器抓取才需要)。
- """
- try:
- from playwright.sync_api import sync_playwright # type: ignore
- return sync_playwright
- except Exception as e:
- raise RuntimeError(
- "Playwright 未安装或不可用。请执行:python -m pip install -U playwright && python -m playwright install chromium"
- ) from e
- def _load_existing_appmsgs(path: str) -> List[Dict[str, Any]]:
- try:
- # 兼容:历史文件可能是 utf-8 或 utf-8-sig(带 BOM)
- try:
- with open(path, "r", encoding="utf-8-sig") as f:
- data = json.load(f)
- except UnicodeError:
- with open(path, "r", encoding="utf-8") as f:
- data = json.load(f)
- if isinstance(data, list):
- return [x for x in data if isinstance(x, dict)]
- except FileNotFoundError:
- return []
- except json.JSONDecodeError:
- # 文件存在但不是合法 JSON:当作无历史数据
- return []
- return []
- def _build_seen_keys(existing: List[Dict[str, Any]]) -> Set[str]:
- """
- 用 outlinkId 优先去重;缺失时用 url 兜底。
- """
- seen: Set[str] = set()
- for it in existing:
- outlink_id = it.get("outlinkId")
- if isinstance(outlink_id, int) and outlink_id:
- seen.add(f"id:{outlink_id}")
- url = it.get("url")
- if isinstance(url, str) and url:
- seen.add(f"url:{url}")
- return seen
- def _topic_msg_key(topic_msg: Dict[str, Any]) -> Tuple[Optional[str], Optional[int], Optional[str]]:
- outlink_id: Optional[int] = None
- try:
- outlink_id = int(((topic_msg.get("id") or {}).get("msgid")) or 0) or None
- except Exception:
- outlink_id = None
- url = topic_msg.get("jump_url")
- if not isinstance(url, str) or not url:
- url = None
- key = None
- if outlink_id:
- key = f"id:{outlink_id}"
- elif url:
- key = f"url:{url}"
- return key, outlink_id, url
- def _requests_session() -> requests.Session:
- s = requests.Session()
- s.headers.update(
- {
- "User-Agent": UA,
- "Accept": "application/json,text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
- "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
- }
- )
- return s
- def get_topic_list(
- sess: requests.Session, *, topic: str, paging: str = ""
- ) -> Dict[str, Any]:
- """
- 等价于 WxAppmsgContentApi.getTopicList:
- GET /mp/appmsgtopic?action=topic_list&topic=...&paging=...&f=json...
- """
- params = {
- "action": "topic_list",
- "topic": topic,
- "paging": paging or "",
- "sort_type": 1,
- "from": 1,
- # 下面这些字段在 TS 侧是固定值/空值,保持一致以提高兼容性
- "from_biz": 3636524509,
- "from_msgid": 2247483692,
- "from_itemidx": 1,
- "appid": "",
- "silent": 1,
- "uin": 0,
- "key": "",
- "pass_ticket": "",
- "wxtoken": "",
- "devicetype": "",
- "clientversion": "false",
- "version": "false",
- "appmsg_token": "",
- "x5": 0,
- "f": "json",
- "user_article_role": 0,
- }
- r = sess.get(TOPIC_LIST_ENDPOINT, params=params, timeout=30)
- r.raise_for_status()
- data = r.json()
- # 微信侧常见返回:{ code, message, ... } 或直接业务字段
- if isinstance(data, dict) and "code" in data and data.get("code") not in (None, 0):
- raise RuntimeError(f"topic_list failed: code={data.get('code')} message={data.get('message')}")
- return data
- def parse_cgi_data_new_from_html(html: str) -> Dict[str, Any]:
- raise RuntimeError("已移除基于 HTML 的解析方式,请使用 Playwright 读取 window.cgiDataNew")
- def get_appmsg_detail_via_playwright(url: str, *, timeout_ms: int = 30000) -> Dict[str, Any]:
- """
- 使用浏览器直接读取 window.cgiDataNew(最接近“页面真实运行态”)。
- 优点:避免 HTML 文本解析/对象字面量兼容问题。
- """
- sync_playwright = _lazy_import_playwright()
- with sync_playwright() as p:
- browser = p.chromium.launch(headless=True)
- context = browser.new_context(
- user_agent=UA,
- locale="zh-CN",
- extra_http_headers={
- "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
- },
- )
- page = context.new_page()
- try:
- page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms)
- data = page.evaluate(
- """() => {
- const d = (globalThis && globalThis.cgiDataNew) ? globalThis.cgiDataNew : (window && window.cgiDataNew);
- if (!d) return null;
- return JSON.parse(JSON.stringify(d));
- }"""
- )
- # python side validation
- if not isinstance(data, dict):
- raise RuntimeError("window.cgiDataNew not found or not an object")
- return data
- finally:
- try:
- context.close()
- finally:
- browser.close()
- def get_appmsg_detail(sess: requests.Session, url: str) -> Dict[str, Any]:
- # 已移除 json5 / node 的 HTML 解析逻辑,统一走浏览器读取 window.cgiDataNew
- return get_appmsg_detail_via_playwright(url)
- def _extract_user_id_from_detail(detail: Dict[str, Any]) -> int:
- """
- 复刻 AppMsgService.collectAppMsgContent 中的 userId 推导逻辑。
- """
- try:
- eps = detail.get("ext_publish_source") or {}
- weapp = eps.get("weapp_info") or {}
- desc = weapp.get("desc") or ""
- if isinstance(desc, str) and desc.startswith("亮乡源话题"):
- m = re.search(r"亮乡源话题(\d+)", desc)
- if m:
- return int(m.group(1))
- except Exception:
- pass
- return 0
- def _extract_images_from_detail(detail: Dict[str, Any]) -> List[str]:
- pics = detail.get("picture_page_info_list") or []
- images: List[str] = []
- if isinstance(pics, list):
- for it in pics:
- if isinstance(it, dict):
- url = it.get("cdn_url")
- if isinstance(url, str) and url:
- images.append(url)
- return images
- def build_appmsg(topic_msg: Dict[str, Any], detail: Dict[str, Any]) -> AppMsg:
- outlink_id = 0
- try:
- outlink_id = int(((topic_msg.get("id") or {}).get("msgid")) or 0)
- except Exception:
- outlink_id = 0
- url = topic_msg.get("jump_url") or ""
- if not isinstance(url, str):
- url = ""
- title = detail.get("title") or ""
- if not isinstance(title, str):
- title = ""
- content = detail.get("content_noencode") or ""
- if not isinstance(content, str):
- content = ""
- return AppMsg(
- id=0,
- outlinkId=outlink_id,
- userId=_extract_user_id_from_detail(detail),
- url=url,
- title=title,
- images=_extract_images_from_detail(detail),
- content=content,
- )
- def collect_topic_appmsgs(
- *,
- topic: str,
- max_pages: int = 10,
- sleep_sec: float = 0.3,
- seen_keys: Optional[Set[str]] = None,
- ) -> List[AppMsg]:
- sess = _requests_session()
- paging = ""
- page = 0
- results: List[AppMsg] = []
- local_seen: Set[str] = set(seen_keys or set())
- while True:
- page += 1
- if max_pages > 0 and page > max_pages:
- break
- data = get_topic_list(sess, topic=topic, paging=paging)
- topic_msgs = data.get("topic_msgs") or []
- if not isinstance(topic_msgs, list):
- break
- page_has_new = False
- for item in topic_msgs:
- if not isinstance(item, dict):
- continue
- key, outlink_id, jump_url = _topic_msg_key(item)
- if key and key in local_seen:
- continue
- if not jump_url:
- continue
- # 只有未抓取过的条目才请求详情页
- detail = get_appmsg_detail(sess, jump_url)
- appmsg = build_appmsg(item, detail)
- # 标记为已抓取
- if appmsg.outlinkId:
- local_seen.add(f"id:{appmsg.outlinkId}")
- if appmsg.url:
- local_seen.add(f"url:{appmsg.url}")
- results.append(appmsg)
- page_has_new = True
- if sleep_sec > 0:
- time.sleep(sleep_sec)
- # 如果这一页全部在已有 JSON 中(没有任何新条目),终止后续翻页
- if not page_has_new:
- break
- paging = data.get("topic_paging") or ""
- if not isinstance(paging, str) or not paging:
- break
- # 如果本页没抓到任何内容,就停止继续翻页
- if len(topic_msgs) == 0:
- break
- return results
- def main(argv: Optional[List[str]] = None) -> int:
- parser = argparse.ArgumentParser(description="抓取微信话题下的公众号文章(贴图)并输出 JSON")
- parser.add_argument("--topic", required=True, help="话题名称,例如:#乡源文化挖掘")
- parser.add_argument("--out", default="data.json", help="输出文件名,默认 data.json")
- parser.add_argument("--max-pages", type=int, default=10, help="最多抓取页数,默认 10(<=0 表示不限制)")
- parser.add_argument("--sleep", type=float, default=0.3, help="每条之间的延迟秒数,默认 0.3")
- args = parser.parse_args(argv)
- existing = _load_existing_appmsgs(args.out)
- seen = _build_seen_keys(existing)
- new_appmsgs = collect_topic_appmsgs(
- topic=args.topic,
- max_pages=args.max_pages,
- sleep_sec=args.sleep,
- seen_keys=seen,
- )
- new_payload = [asdict(x) for x in new_appmsgs]
- # 保存时:最新抓到的放在最上面;并按 outlinkId/url 去重(新覆盖旧)
- merged: List[Dict[str, Any]] = []
- merged_seen: Set[str] = set()
- for it in new_payload + existing:
- if not isinstance(it, dict):
- continue
- key: Optional[str] = None
- outlink_id = it.get("outlinkId")
- url = it.get("url")
- if isinstance(outlink_id, int) and outlink_id:
- key = f"id:{outlink_id}"
- elif isinstance(url, str) and url:
- key = f"url:{url}"
- if key and key in merged_seen:
- continue
- if key:
- merged_seen.add(key)
- merged.append(it)
- # 写出 utf-8-sig(带 BOM),避免部分 Windows 工具按 ANSI/GBK 误判导致“中文乱码”
- with open(args.out, "w", encoding="utf-8-sig") as f:
- json.dump(merged, f, ensure_ascii=False, indent=2)
- print(
- f"OK: topic={args.topic} new={len(new_payload)} total={len(merged)} out={args.out}"
- )
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|