collect-app-msg.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. """
  2. 抓取微信“小程序话题”下的公众号文章(贴图)并输出为 JSON。
  3. 参考实现来源:
  4. - server/src/api/WxAppmsgContent.ts
  5. - server/src/services/content/appmsg/AppMsgService.ts
  6. 脚本参数
  7. --topic:话题名称,例如 #乡源文化挖掘
  8. --out:输出文件名,默认 data.json
  9. 输出
  10. JSON 数组,每条为 AppMsg:
  11. * outlinkId: int : 微信官方贴图的 ID,可以拿来去重
  12. * userId: int : 从小程序传来的用户ID
  13. * url: str : 实际跳转的 URL
  14. * title: str : 标题
  15. * images: List[str] : 封面图
  16. * content: str : 内容
  17. 用法(PowerShell / pwsh):
  18. pwsh -NoProfile -Command "python ./collect-app-msg.py --topic '#乡源文化挖掘' --out data.json"
  19. 依赖:
  20. python -m pip install -U requests playwright
  21. python -m playwright install chromium
  22. """
  23. from __future__ import annotations
  24. import argparse
  25. import json
  26. import re
  27. import time
  28. from dataclasses import dataclass, asdict
  29. from typing import Any, Dict, Iterable, List, Optional, Tuple, Set
  30. import requests
  31. TOPIC_LIST_ENDPOINT = "https://mp.weixin.qq.com/mp/appmsgtopic"
  32. UA = (
  33. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
  34. "AppleWebKit/537.36 (KHTML, like Gecko) "
  35. "Chrome/91.0.4472.124 Safari/537.36"
  36. )
  37. @dataclass
  38. class AppMsg:
  39. # 结构同 server/src/models/content/appmsg/AppMsgModel.ts
  40. outlinkId: int = 0
  41. userId: int = 0
  42. url: str = ""
  43. title: str = ""
  44. images: List[str] = None # type: ignore[assignment]
  45. content: str = ""
  46. def __post_init__(self) -> None:
  47. if self.images is None:
  48. self.images = []
  49. def _lazy_import_playwright():
  50. """
  51. 延迟导入 Playwright,避免未安装时报错(只有用户选择浏览器抓取才需要)。
  52. """
  53. try:
  54. from playwright.sync_api import sync_playwright # type: ignore
  55. return sync_playwright
  56. except Exception as e:
  57. raise RuntimeError(
  58. "Playwright 未安装或不可用。请执行:python -m pip install -U playwright && python -m playwright install chromium"
  59. ) from e
  60. def _load_existing_appmsgs(path: str) -> List[Dict[str, Any]]:
  61. try:
  62. # 兼容:历史文件可能是 utf-8 或 utf-8-sig(带 BOM)
  63. try:
  64. with open(path, "r", encoding="utf-8-sig") as f:
  65. data = json.load(f)
  66. except UnicodeError:
  67. with open(path, "r", encoding="utf-8") as f:
  68. data = json.load(f)
  69. if isinstance(data, list):
  70. return [x for x in data if isinstance(x, dict)]
  71. except FileNotFoundError:
  72. return []
  73. except json.JSONDecodeError:
  74. # 文件存在但不是合法 JSON:当作无历史数据
  75. return []
  76. return []
  77. def _build_seen_keys(existing: List[Dict[str, Any]]) -> Set[str]:
  78. """
  79. 用 outlinkId 优先去重;缺失时用 url 兜底。
  80. """
  81. seen: Set[str] = set()
  82. for it in existing:
  83. outlink_id = it.get("outlinkId")
  84. if isinstance(outlink_id, int) and outlink_id:
  85. seen.add(f"id:{outlink_id}")
  86. url = it.get("url")
  87. if isinstance(url, str) and url:
  88. seen.add(f"url:{url}")
  89. return seen
  90. def _topic_msg_key(topic_msg: Dict[str, Any]) -> Tuple[Optional[str], Optional[int], Optional[str]]:
  91. outlink_id: Optional[int] = None
  92. try:
  93. outlink_id = int(((topic_msg.get("id") or {}).get("msgid")) or 0) or None
  94. except Exception:
  95. outlink_id = None
  96. url = topic_msg.get("jump_url")
  97. if not isinstance(url, str) or not url:
  98. url = None
  99. key = None
  100. if outlink_id:
  101. key = f"id:{outlink_id}"
  102. elif url:
  103. key = f"url:{url}"
  104. return key, outlink_id, url
  105. def _requests_session() -> requests.Session:
  106. s = requests.Session()
  107. s.headers.update(
  108. {
  109. "User-Agent": UA,
  110. "Accept": "application/json,text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
  111. "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
  112. }
  113. )
  114. return s
  115. def get_topic_list(
  116. sess: requests.Session, *, topic: str, paging: str = ""
  117. ) -> Dict[str, Any]:
  118. """
  119. 等价于 WxAppmsgContentApi.getTopicList:
  120. GET /mp/appmsgtopic?action=topic_list&topic=...&paging=...&f=json...
  121. """
  122. params = {
  123. "action": "topic_list",
  124. "topic": topic,
  125. "paging": paging or "",
  126. "sort_type": 1,
  127. "from": 1,
  128. # 下面这些字段在 TS 侧是固定值/空值,保持一致以提高兼容性
  129. "from_biz": 3636524509,
  130. "from_msgid": 2247483692,
  131. "from_itemidx": 1,
  132. "appid": "",
  133. "silent": 1,
  134. "uin": 0,
  135. "key": "",
  136. "pass_ticket": "",
  137. "wxtoken": "",
  138. "devicetype": "",
  139. "clientversion": "false",
  140. "version": "false",
  141. "appmsg_token": "",
  142. "x5": 0,
  143. "f": "json",
  144. "user_article_role": 0,
  145. }
  146. r = sess.get(TOPIC_LIST_ENDPOINT, params=params, timeout=30)
  147. r.raise_for_status()
  148. data = r.json()
  149. # 微信侧常见返回:{ code, message, ... } 或直接业务字段
  150. if isinstance(data, dict) and "code" in data and data.get("code") not in (None, 0):
  151. raise RuntimeError(f"topic_list failed: code={data.get('code')} message={data.get('message')}")
  152. return data
  153. def parse_cgi_data_new_from_html(html: str) -> Dict[str, Any]:
  154. raise RuntimeError("已移除基于 HTML 的解析方式,请使用 Playwright 读取 window.cgiDataNew")
  155. def get_appmsg_detail_via_playwright(url: str, *, timeout_ms: int = 30000) -> Dict[str, Any]:
  156. """
  157. 使用浏览器直接读取 window.cgiDataNew(最接近“页面真实运行态”)。
  158. 优点:避免 HTML 文本解析/对象字面量兼容问题。
  159. """
  160. sync_playwright = _lazy_import_playwright()
  161. with sync_playwright() as p:
  162. browser = p.chromium.launch(headless=True)
  163. context = browser.new_context(
  164. user_agent=UA,
  165. locale="zh-CN",
  166. extra_http_headers={
  167. "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
  168. },
  169. )
  170. page = context.new_page()
  171. try:
  172. page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms)
  173. data = page.evaluate(
  174. """() => {
  175. const d = (globalThis && globalThis.cgiDataNew) ? globalThis.cgiDataNew : (window && window.cgiDataNew);
  176. if (!d) return null;
  177. return JSON.parse(JSON.stringify(d));
  178. }"""
  179. )
  180. # python side validation
  181. if not isinstance(data, dict):
  182. raise RuntimeError("window.cgiDataNew not found or not an object")
  183. return data
  184. finally:
  185. try:
  186. context.close()
  187. finally:
  188. browser.close()
  189. def get_appmsg_detail(sess: requests.Session, url: str) -> Dict[str, Any]:
  190. # 已移除 json5 / node 的 HTML 解析逻辑,统一走浏览器读取 window.cgiDataNew
  191. return get_appmsg_detail_via_playwright(url)
  192. def _extract_user_id_from_detail(detail: Dict[str, Any]) -> int:
  193. """
  194. 复刻 AppMsgService.collectAppMsgContent 中的 userId 推导逻辑。
  195. """
  196. try:
  197. eps = detail.get("ext_publish_source") or {}
  198. weapp = eps.get("weapp_info") or {}
  199. desc = weapp.get("desc") or ""
  200. if isinstance(desc, str) and desc.startswith("亮乡源话题"):
  201. m = re.search(r"亮乡源话题(\d+)", desc)
  202. if m:
  203. return int(m.group(1))
  204. except Exception:
  205. pass
  206. return 0
  207. def _extract_images_from_detail(detail: Dict[str, Any]) -> List[str]:
  208. pics = detail.get("picture_page_info_list") or []
  209. images: List[str] = []
  210. if isinstance(pics, list):
  211. for it in pics:
  212. if isinstance(it, dict):
  213. url = it.get("cdn_url")
  214. if isinstance(url, str) and url:
  215. images.append(url)
  216. return images
  217. def build_appmsg(topic_msg: Dict[str, Any], detail: Dict[str, Any]) -> AppMsg:
  218. outlink_id = 0
  219. try:
  220. outlink_id = int(((topic_msg.get("id") or {}).get("msgid")) or 0)
  221. except Exception:
  222. outlink_id = 0
  223. url = topic_msg.get("jump_url") or ""
  224. if not isinstance(url, str):
  225. url = ""
  226. title = detail.get("title") or ""
  227. if not isinstance(title, str):
  228. title = ""
  229. content = detail.get("content_noencode") or ""
  230. if not isinstance(content, str):
  231. content = ""
  232. return AppMsg(
  233. id=0,
  234. outlinkId=outlink_id,
  235. userId=_extract_user_id_from_detail(detail),
  236. url=url,
  237. title=title,
  238. images=_extract_images_from_detail(detail),
  239. content=content,
  240. )
  241. def collect_topic_appmsgs(
  242. *,
  243. topic: str,
  244. max_pages: int = 10,
  245. sleep_sec: float = 0.3,
  246. seen_keys: Optional[Set[str]] = None,
  247. ) -> List[AppMsg]:
  248. sess = _requests_session()
  249. paging = ""
  250. page = 0
  251. results: List[AppMsg] = []
  252. local_seen: Set[str] = set(seen_keys or set())
  253. while True:
  254. page += 1
  255. if max_pages > 0 and page > max_pages:
  256. break
  257. data = get_topic_list(sess, topic=topic, paging=paging)
  258. topic_msgs = data.get("topic_msgs") or []
  259. if not isinstance(topic_msgs, list):
  260. break
  261. page_has_new = False
  262. for item in topic_msgs:
  263. if not isinstance(item, dict):
  264. continue
  265. key, outlink_id, jump_url = _topic_msg_key(item)
  266. if key and key in local_seen:
  267. continue
  268. if not jump_url:
  269. continue
  270. # 只有未抓取过的条目才请求详情页
  271. detail = get_appmsg_detail(sess, jump_url)
  272. appmsg = build_appmsg(item, detail)
  273. # 标记为已抓取
  274. if appmsg.outlinkId:
  275. local_seen.add(f"id:{appmsg.outlinkId}")
  276. if appmsg.url:
  277. local_seen.add(f"url:{appmsg.url}")
  278. results.append(appmsg)
  279. page_has_new = True
  280. if sleep_sec > 0:
  281. time.sleep(sleep_sec)
  282. # 如果这一页全部在已有 JSON 中(没有任何新条目),终止后续翻页
  283. if not page_has_new:
  284. break
  285. paging = data.get("topic_paging") or ""
  286. if not isinstance(paging, str) or not paging:
  287. break
  288. # 如果本页没抓到任何内容,就停止继续翻页
  289. if len(topic_msgs) == 0:
  290. break
  291. return results
  292. def main(argv: Optional[List[str]] = None) -> int:
  293. parser = argparse.ArgumentParser(description="抓取微信话题下的公众号文章(贴图)并输出 JSON")
  294. parser.add_argument("--topic", required=True, help="话题名称,例如:#乡源文化挖掘")
  295. parser.add_argument("--out", default="data.json", help="输出文件名,默认 data.json")
  296. parser.add_argument("--max-pages", type=int, default=10, help="最多抓取页数,默认 10(<=0 表示不限制)")
  297. parser.add_argument("--sleep", type=float, default=0.3, help="每条之间的延迟秒数,默认 0.3")
  298. args = parser.parse_args(argv)
  299. existing = _load_existing_appmsgs(args.out)
  300. seen = _build_seen_keys(existing)
  301. new_appmsgs = collect_topic_appmsgs(
  302. topic=args.topic,
  303. max_pages=args.max_pages,
  304. sleep_sec=args.sleep,
  305. seen_keys=seen,
  306. )
  307. new_payload = [asdict(x) for x in new_appmsgs]
  308. # 保存时:最新抓到的放在最上面;并按 outlinkId/url 去重(新覆盖旧)
  309. merged: List[Dict[str, Any]] = []
  310. merged_seen: Set[str] = set()
  311. for it in new_payload + existing:
  312. if not isinstance(it, dict):
  313. continue
  314. key: Optional[str] = None
  315. outlink_id = it.get("outlinkId")
  316. url = it.get("url")
  317. if isinstance(outlink_id, int) and outlink_id:
  318. key = f"id:{outlink_id}"
  319. elif isinstance(url, str) and url:
  320. key = f"url:{url}"
  321. if key and key in merged_seen:
  322. continue
  323. if key:
  324. merged_seen.add(key)
  325. merged.append(it)
  326. # 写出 utf-8-sig(带 BOM),避免部分 Windows 工具按 ANSI/GBK 误判导致“中文乱码”
  327. with open(args.out, "w", encoding="utf-8-sig") as f:
  328. json.dump(merged, f, ensure_ascii=False, indent=2)
  329. print(
  330. f"OK: topic={args.topic} new={len(new_payload)} total={len(merged)} out={args.out}"
  331. )
  332. return 0
  333. if __name__ == "__main__":
  334. raise SystemExit(main())