|
|
@@ -0,0 +1,399 @@
|
|
|
+"""
|
|
|
+抓取微信“小程序话题”下的公众号文章(贴图)并输出为 JSON。
|
|
|
+
|
|
|
+参考实现来源:
|
|
|
+- server/src/api/WxAppmsgContent.ts
|
|
|
+- server/src/services/content/appmsg/AppMsgService.ts
|
|
|
+
|
|
|
+脚本参数
|
|
|
+
|
|
|
+ --topic:话题名称,例如 #乡源文化挖掘
|
|
|
+ --out:输出文件名,默认 data.json
|
|
|
+
|
|
|
+输出
|
|
|
+
|
|
|
+ JSON 数组,每条为 AppMsg:
|
|
|
+ * outlinkId: int : 微信官方贴图的 ID,可以拿来去重
|
|
|
+ * userId: int : 从小程序传来的用户ID
|
|
|
+ * url: str : 实际跳转的 URL
|
|
|
+ * title: str : 标题
|
|
|
+ * images: List[str] : 封面图
|
|
|
+ * content: str : 内容
|
|
|
+
|
|
|
+用法(PowerShell / pwsh):
|
|
|
+ pwsh -NoProfile -Command "python ./collect-app-msg.py --topic '#乡源文化挖掘' --out data.json"
|
|
|
+
|
|
|
+依赖:
|
|
|
+ python -m pip install -U requests playwright
|
|
|
+ python -m playwright install chromium
|
|
|
+"""
|
|
|
+
|
|
|
+from __future__ import annotations
|
|
|
+
|
|
|
+import argparse
|
|
|
+import json
|
|
|
+import re
|
|
|
+import time
|
|
|
+from dataclasses import dataclass, asdict
|
|
|
+from typing import Any, Dict, Iterable, List, Optional, Tuple, Set
|
|
|
+
|
|
|
+import requests
|
|
|
+
|
|
|
+
|
|
|
+TOPIC_LIST_ENDPOINT = "https://mp.weixin.qq.com/mp/appmsgtopic"
|
|
|
+UA = (
|
|
|
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
|
|
+ "AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
|
+ "Chrome/91.0.4472.124 Safari/537.36"
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class AppMsg:
|
|
|
+ # 结构同 server/src/models/content/appmsg/AppMsgModel.ts
|
|
|
+ outlinkId: int = 0
|
|
|
+ userId: int = 0
|
|
|
+ url: str = ""
|
|
|
+ title: str = ""
|
|
|
+ images: List[str] = None # type: ignore[assignment]
|
|
|
+ content: str = ""
|
|
|
+
|
|
|
+ def __post_init__(self) -> None:
|
|
|
+ if self.images is None:
|
|
|
+ self.images = []
|
|
|
+
|
|
|
+
|
|
|
+def _lazy_import_playwright():
|
|
|
+ """
|
|
|
+ 延迟导入 Playwright,避免未安装时报错(只有用户选择浏览器抓取才需要)。
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ from playwright.sync_api import sync_playwright # type: ignore
|
|
|
+
|
|
|
+ return sync_playwright
|
|
|
+ except Exception as e:
|
|
|
+ raise RuntimeError(
|
|
|
+ "Playwright 未安装或不可用。请执行:python -m pip install -U playwright && python -m playwright install chromium"
|
|
|
+ ) from e
|
|
|
+
|
|
|
+def _load_existing_appmsgs(path: str) -> List[Dict[str, Any]]:
|
|
|
+ try:
|
|
|
+ # 兼容:历史文件可能是 utf-8 或 utf-8-sig(带 BOM)
|
|
|
+ try:
|
|
|
+ with open(path, "r", encoding="utf-8-sig") as f:
|
|
|
+ data = json.load(f)
|
|
|
+ except UnicodeError:
|
|
|
+ with open(path, "r", encoding="utf-8") as f:
|
|
|
+ data = json.load(f)
|
|
|
+ if isinstance(data, list):
|
|
|
+ return [x for x in data if isinstance(x, dict)]
|
|
|
+ except FileNotFoundError:
|
|
|
+ return []
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ # 文件存在但不是合法 JSON:当作无历史数据
|
|
|
+ return []
|
|
|
+ return []
|
|
|
+
|
|
|
+
|
|
|
+def _build_seen_keys(existing: List[Dict[str, Any]]) -> Set[str]:
|
|
|
+ """
|
|
|
+ 用 outlinkId 优先去重;缺失时用 url 兜底。
|
|
|
+ """
|
|
|
+ seen: Set[str] = set()
|
|
|
+ for it in existing:
|
|
|
+ outlink_id = it.get("outlinkId")
|
|
|
+ if isinstance(outlink_id, int) and outlink_id:
|
|
|
+ seen.add(f"id:{outlink_id}")
|
|
|
+ url = it.get("url")
|
|
|
+ if isinstance(url, str) and url:
|
|
|
+ seen.add(f"url:{url}")
|
|
|
+ return seen
|
|
|
+
|
|
|
+
|
|
|
+def _topic_msg_key(topic_msg: Dict[str, Any]) -> Tuple[Optional[str], Optional[int], Optional[str]]:
|
|
|
+ outlink_id: Optional[int] = None
|
|
|
+ try:
|
|
|
+ outlink_id = int(((topic_msg.get("id") or {}).get("msgid")) or 0) or None
|
|
|
+ except Exception:
|
|
|
+ outlink_id = None
|
|
|
+ url = topic_msg.get("jump_url")
|
|
|
+ if not isinstance(url, str) or not url:
|
|
|
+ url = None
|
|
|
+ key = None
|
|
|
+ if outlink_id:
|
|
|
+ key = f"id:{outlink_id}"
|
|
|
+ elif url:
|
|
|
+ key = f"url:{url}"
|
|
|
+ return key, outlink_id, url
|
|
|
+
|
|
|
+
|
|
|
+def _requests_session() -> requests.Session:
|
|
|
+ s = requests.Session()
|
|
|
+ s.headers.update(
|
|
|
+ {
|
|
|
+ "User-Agent": UA,
|
|
|
+ "Accept": "application/json,text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|
|
+ "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
|
|
|
+ }
|
|
|
+ )
|
|
|
+ return s
|
|
|
+
|
|
|
+
|
|
|
+def get_topic_list(
|
|
|
+ sess: requests.Session, *, topic: str, paging: str = ""
|
|
|
+) -> Dict[str, Any]:
|
|
|
+ """
|
|
|
+ 等价于 WxAppmsgContentApi.getTopicList:
|
|
|
+ GET /mp/appmsgtopic?action=topic_list&topic=...&paging=...&f=json...
|
|
|
+ """
|
|
|
+ params = {
|
|
|
+ "action": "topic_list",
|
|
|
+ "topic": topic,
|
|
|
+ "paging": paging or "",
|
|
|
+ "sort_type": 1,
|
|
|
+ "from": 1,
|
|
|
+ # 下面这些字段在 TS 侧是固定值/空值,保持一致以提高兼容性
|
|
|
+ "from_biz": 3636524509,
|
|
|
+ "from_msgid": 2247483692,
|
|
|
+ "from_itemidx": 1,
|
|
|
+ "appid": "",
|
|
|
+ "silent": 1,
|
|
|
+ "uin": 0,
|
|
|
+ "key": "",
|
|
|
+ "pass_ticket": "",
|
|
|
+ "wxtoken": "",
|
|
|
+ "devicetype": "",
|
|
|
+ "clientversion": "false",
|
|
|
+ "version": "false",
|
|
|
+ "appmsg_token": "",
|
|
|
+ "x5": 0,
|
|
|
+ "f": "json",
|
|
|
+ "user_article_role": 0,
|
|
|
+ }
|
|
|
+ r = sess.get(TOPIC_LIST_ENDPOINT, params=params, timeout=30)
|
|
|
+ r.raise_for_status()
|
|
|
+ data = r.json()
|
|
|
+ # 微信侧常见返回:{ code, message, ... } 或直接业务字段
|
|
|
+ if isinstance(data, dict) and "code" in data and data.get("code") not in (None, 0):
|
|
|
+ raise RuntimeError(f"topic_list failed: code={data.get('code')} message={data.get('message')}")
|
|
|
+ return data
|
|
|
+
|
|
|
+
|
|
|
+def parse_cgi_data_new_from_html(html: str) -> Dict[str, Any]:
|
|
|
+ raise RuntimeError("已移除基于 HTML 的解析方式,请使用 Playwright 读取 window.cgiDataNew")
|
|
|
+
|
|
|
+
|
|
|
+def get_appmsg_detail_via_playwright(url: str, *, timeout_ms: int = 30000) -> Dict[str, Any]:
|
|
|
+ """
|
|
|
+ 使用浏览器直接读取 window.cgiDataNew(最接近“页面真实运行态”)。
|
|
|
+ 优点:避免 HTML 文本解析/对象字面量兼容问题。
|
|
|
+ """
|
|
|
+ sync_playwright = _lazy_import_playwright()
|
|
|
+ with sync_playwright() as p:
|
|
|
+ browser = p.chromium.launch(headless=True)
|
|
|
+ context = browser.new_context(
|
|
|
+ user_agent=UA,
|
|
|
+ locale="zh-CN",
|
|
|
+ extra_http_headers={
|
|
|
+ "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
|
|
|
+ },
|
|
|
+ )
|
|
|
+ page = context.new_page()
|
|
|
+ try:
|
|
|
+ page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms)
|
|
|
+ data = page.evaluate(
|
|
|
+ """() => {
|
|
|
+ const d = (globalThis && globalThis.cgiDataNew) ? globalThis.cgiDataNew : (window && window.cgiDataNew);
|
|
|
+ if (!d) return null;
|
|
|
+ return JSON.parse(JSON.stringify(d));
|
|
|
+ }"""
|
|
|
+ )
|
|
|
+ # python side validation
|
|
|
+ if not isinstance(data, dict):
|
|
|
+ raise RuntimeError("window.cgiDataNew not found or not an object")
|
|
|
+ return data
|
|
|
+ finally:
|
|
|
+ try:
|
|
|
+ context.close()
|
|
|
+ finally:
|
|
|
+ browser.close()
|
|
|
+
|
|
|
+
|
|
|
+def get_appmsg_detail(sess: requests.Session, url: str) -> Dict[str, Any]:
|
|
|
+ # 已移除 json5 / node 的 HTML 解析逻辑,统一走浏览器读取 window.cgiDataNew
|
|
|
+ return get_appmsg_detail_via_playwright(url)
|
|
|
+
|
|
|
+
|
|
|
+def _extract_user_id_from_detail(detail: Dict[str, Any]) -> int:
|
|
|
+ """
|
|
|
+ 复刻 AppMsgService.collectAppMsgContent 中的 userId 推导逻辑。
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ eps = detail.get("ext_publish_source") or {}
|
|
|
+ weapp = eps.get("weapp_info") or {}
|
|
|
+ desc = weapp.get("desc") or ""
|
|
|
+ if isinstance(desc, str) and desc.startswith("亮乡源话题"):
|
|
|
+ m = re.search(r"亮乡源话题(\d+)", desc)
|
|
|
+ if m:
|
|
|
+ return int(m.group(1))
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+ return 0
|
|
|
+
|
|
|
+
|
|
|
+def _extract_images_from_detail(detail: Dict[str, Any]) -> List[str]:
|
|
|
+ pics = detail.get("picture_page_info_list") or []
|
|
|
+ images: List[str] = []
|
|
|
+ if isinstance(pics, list):
|
|
|
+ for it in pics:
|
|
|
+ if isinstance(it, dict):
|
|
|
+ url = it.get("cdn_url")
|
|
|
+ if isinstance(url, str) and url:
|
|
|
+ images.append(url)
|
|
|
+ return images
|
|
|
+
|
|
|
+
|
|
|
+def build_appmsg(topic_msg: Dict[str, Any], detail: Dict[str, Any]) -> AppMsg:
|
|
|
+ outlink_id = 0
|
|
|
+ try:
|
|
|
+ outlink_id = int(((topic_msg.get("id") or {}).get("msgid")) or 0)
|
|
|
+ except Exception:
|
|
|
+ outlink_id = 0
|
|
|
+
|
|
|
+ url = topic_msg.get("jump_url") or ""
|
|
|
+ if not isinstance(url, str):
|
|
|
+ url = ""
|
|
|
+
|
|
|
+ title = detail.get("title") or ""
|
|
|
+ if not isinstance(title, str):
|
|
|
+ title = ""
|
|
|
+
|
|
|
+ content = detail.get("content_noencode") or ""
|
|
|
+ if not isinstance(content, str):
|
|
|
+ content = ""
|
|
|
+
|
|
|
+ return AppMsg(
|
|
|
+ id=0,
|
|
|
+ outlinkId=outlink_id,
|
|
|
+ userId=_extract_user_id_from_detail(detail),
|
|
|
+ url=url,
|
|
|
+ title=title,
|
|
|
+ images=_extract_images_from_detail(detail),
|
|
|
+ content=content,
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+def collect_topic_appmsgs(
|
|
|
+ *,
|
|
|
+ topic: str,
|
|
|
+ max_pages: int = 10,
|
|
|
+ sleep_sec: float = 0.3,
|
|
|
+ seen_keys: Optional[Set[str]] = None,
|
|
|
+) -> List[AppMsg]:
|
|
|
+ sess = _requests_session()
|
|
|
+ paging = ""
|
|
|
+ page = 0
|
|
|
+ results: List[AppMsg] = []
|
|
|
+ local_seen: Set[str] = set(seen_keys or set())
|
|
|
+
|
|
|
+ while True:
|
|
|
+ page += 1
|
|
|
+ if max_pages > 0 and page > max_pages:
|
|
|
+ break
|
|
|
+
|
|
|
+ data = get_topic_list(sess, topic=topic, paging=paging)
|
|
|
+ topic_msgs = data.get("topic_msgs") or []
|
|
|
+ if not isinstance(topic_msgs, list):
|
|
|
+ break
|
|
|
+
|
|
|
+ page_has_new = False
|
|
|
+ for item in topic_msgs:
|
|
|
+ if not isinstance(item, dict):
|
|
|
+ continue
|
|
|
+
|
|
|
+ key, outlink_id, jump_url = _topic_msg_key(item)
|
|
|
+ if key and key in local_seen:
|
|
|
+ continue
|
|
|
+ if not jump_url:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 只有未抓取过的条目才请求详情页
|
|
|
+ detail = get_appmsg_detail(sess, jump_url)
|
|
|
+ appmsg = build_appmsg(item, detail)
|
|
|
+
|
|
|
+ # 标记为已抓取
|
|
|
+ if appmsg.outlinkId:
|
|
|
+ local_seen.add(f"id:{appmsg.outlinkId}")
|
|
|
+ if appmsg.url:
|
|
|
+ local_seen.add(f"url:{appmsg.url}")
|
|
|
+ results.append(appmsg)
|
|
|
+ page_has_new = True
|
|
|
+
|
|
|
+ if sleep_sec > 0:
|
|
|
+ time.sleep(sleep_sec)
|
|
|
+
|
|
|
+ # 如果这一页全部在已有 JSON 中(没有任何新条目),终止后续翻页
|
|
|
+ if not page_has_new:
|
|
|
+ break
|
|
|
+
|
|
|
+ paging = data.get("topic_paging") or ""
|
|
|
+ if not isinstance(paging, str) or not paging:
|
|
|
+ break
|
|
|
+
|
|
|
+ # 如果本页没抓到任何内容,就停止继续翻页
|
|
|
+ if len(topic_msgs) == 0:
|
|
|
+ break
|
|
|
+
|
|
|
+ return results
|
|
|
+
|
|
|
+
|
|
|
+def main(argv: Optional[List[str]] = None) -> int:
|
|
|
+ parser = argparse.ArgumentParser(description="抓取微信话题下的公众号文章(贴图)并输出 JSON")
|
|
|
+ parser.add_argument("--topic", required=True, help="话题名称,例如:#乡源文化挖掘")
|
|
|
+ parser.add_argument("--out", default="data.json", help="输出文件名,默认 data.json")
|
|
|
+ parser.add_argument("--max-pages", type=int, default=10, help="最多抓取页数,默认 10(<=0 表示不限制)")
|
|
|
+ parser.add_argument("--sleep", type=float, default=0.3, help="每条之间的延迟秒数,默认 0.3")
|
|
|
+ args = parser.parse_args(argv)
|
|
|
+
|
|
|
+ existing = _load_existing_appmsgs(args.out)
|
|
|
+ seen = _build_seen_keys(existing)
|
|
|
+
|
|
|
+ new_appmsgs = collect_topic_appmsgs(
|
|
|
+ topic=args.topic,
|
|
|
+ max_pages=args.max_pages,
|
|
|
+ sleep_sec=args.sleep,
|
|
|
+ seen_keys=seen,
|
|
|
+ )
|
|
|
+ new_payload = [asdict(x) for x in new_appmsgs]
|
|
|
+
|
|
|
+ # 保存时:最新抓到的放在最上面;并按 outlinkId/url 去重(新覆盖旧)
|
|
|
+ merged: List[Dict[str, Any]] = []
|
|
|
+ merged_seen: Set[str] = set()
|
|
|
+ for it in new_payload + existing:
|
|
|
+ if not isinstance(it, dict):
|
|
|
+ continue
|
|
|
+ key: Optional[str] = None
|
|
|
+ outlink_id = it.get("outlinkId")
|
|
|
+ url = it.get("url")
|
|
|
+ if isinstance(outlink_id, int) and outlink_id:
|
|
|
+ key = f"id:{outlink_id}"
|
|
|
+ elif isinstance(url, str) and url:
|
|
|
+ key = f"url:{url}"
|
|
|
+ if key and key in merged_seen:
|
|
|
+ continue
|
|
|
+ if key:
|
|
|
+ merged_seen.add(key)
|
|
|
+ merged.append(it)
|
|
|
+
|
|
|
+ # 写出 utf-8-sig(带 BOM),避免部分 Windows 工具按 ANSI/GBK 误判导致“中文乱码”
|
|
|
+ with open(args.out, "w", encoding="utf-8-sig") as f:
|
|
|
+ json.dump(merged, f, ensure_ascii=False, indent=2)
|
|
|
+
|
|
|
+ print(
|
|
|
+ f"OK: topic={args.topic} new={len(new_payload)} total={len(merged)} out={args.out}"
|
|
|
+ )
|
|
|
+ return 0
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ raise SystemExit(main())
|