使用openclaw获取微信公众号文章详情详细过程记录 获取公众号文章标题 作者 正文内容
使用openclaw获取微信公众号文章详情详细过程记录 获取公众号文章标题 作者 正文内容

使用openclaw获取微信公众号文章详情详细过程记录 获取公众号文章标题 作者 正文内容最新版

官方版无广告8,511

更新日期:
2026年4月17日
语言:
中文
平台:
没限制

189 人已下载 手机查看

 背景

我有一个对公众号改写的服务已经上线,上线后发现获取文章详情经常被微信风控,一天甚至会出现两次无法获取微信公众号文章的情况,于是我就想办法优化,openclaw装了一个多月了,终于该他上场实战了

 

过程

直接对话告诉openclaw,获取微信公众号“https://mp.weixin.qq.com/s/xxx”文本内容,欻欻,几秒就搞定了,于是我发现这家伙这次靠谱,于是我就接着告诉他,“将这个功能做成一个api吧,就是发给你文章链接,你返回公众号文本内容”,全部是json格式

请求入参示例:


{"link":"https://mp.weixin.qq.com/s/qP7DRV86j0Z1U4_kHXj8wA"}

使用openclaw获取微信公众号文章详情详细过程记录 获取公众号文章标题 作者 正文内容

 

 

响应格式:


{
        "code": 200,
        "data": {
                "title": "标题",
                "author": "作者",
                "content": "正文内容"
        },
        "message": "成功"
}

使用openclaw获取微信公众号文章详情详细过程记录 获取公众号文章标题 作者 正文内容

 

结果

大约1分钟就生成好了,实测也正常可用,还告诉了我怎么使用,怎么部署服务

使用:


curl -X POST "http://<服务器IP>:8080/api/fetch" \
  -H "Content-Type: application/json" \
  -d '{"link": "https://mp.weixin.qq.com/s/qP7DRV86j0Z1U4_kHXj8wA", "format": "content"}'

使用openclaw获取微信公众号文章详情详细过程记录 获取公众号文章标题 作者 正文内容

 

是一个python服务,源代码

#!/usr/bin/env python3
"""
微信公众号文章获取 API 服务

提供HTTP接口获取微信公众号文章内容

用法:
  python3 wechat_article_api.py          # 启动服务 (默认端口 8080)
  python3 wechat_article_api.py --port 8888  # 指定端口

API 端点:
  POST /api/fetch
  请求体: {"url": "https://mp.weixin.qq.com/s/xxxxx", "format": "text"}
  返回: {"success": true, "data": {...}}

  GET /api/fetch?url=https://mp.weixin.qq.com/s/xxxxx&format=text
"""

import sys
import json
import urllib.request
import urllib.error
import urllib.parse
import re
import argparse
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs


class WeChatArticleExtractor:
    """微信公众号文章提取器"""
    
    @staticmethod
    def fetch_url(url, headers=None):
        """抓取URL内容"""
        if headers is None:
            headers = {
                'User-Agent': 'Mozilla/5.0 (Linux; Android 10; SM-G981B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.162 Mobile Safari/537.36',
                'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
                'Accept-Language': 'zh-CN,zh;q=0.9',
            }
        
        try:
            req = urllib.request.Request(url, headers=headers)
            with urllib.request.urlopen(req, timeout=30) as response:
                return response.read().decode('utf-8', errors='ignore'), None
        except Exception as e:
            return None, str(e)
    
    @staticmethod
    def extract_article_content(html):
        """提取单篇文章内容"""
        result = {
            'title': '',
            'author': '',
            'publish_time': '',
            'content': '',
            'read_count': '',
            'like_count': '',
            'cover_image': ''
        }
        
        # 提取标题
        patterns = [
            r'var msg_title = [\'"](.+?)[\'"];',
            r'<h1[^>]*class="rich_media_title[^"]*"[^>]*>(.*?)</h1>',
            r'<h2[^>]*class="rich_media_title[^"]*"[^>]*>(.*?)</h2>',
        ]
        for pattern in patterns:
            match = re.search(pattern, html, re.DOTALL)
            if match:
                title = match.group(1)
                title = title.replace('\\x26', '&').replace('\\x0a', '').replace('\\x20', ' ').replace('\\x2c', ',')
                result['title'] = re.sub(r'<[^>]+>', '', title).strip()
                break
        
        # 清理标题中的JavaScript代码
        if result['title'] and "'.html" in result['title']:
            result['title'] = result['title'].split("'.html")[0]
        
        # 提取作者
        author_patterns = [
            r'var nickname = [\'"](.+?)[\'"];',
            r'<a[^>]*id="js_name"[^>]*>(.*?)</a>',
            r'<span[^>]*class="profile_nickname"[^>]*>(.*?)</span>',
            r'<a[^>]*class="profile_nickname"[^>]*>(.*?)</a>',
        ]
        for pattern in author_patterns:
            match = re.search(pattern, html, re.DOTALL)
            if match:
                result['author'] = re.sub(r'<[^>]+>', '', match.group(1)).strip()
                break
        
        # 提取发布时间
        time_patterns = [
            r'<em[^>]*id="publish_time"[^>]*>(.*?)</em>',
            r'var publish_time = [\'"](.+?)[\'"];',
            r'<span[^>]*class="publish_time"[^>]*>(.*?)</span>',
        ]
        for pattern in time_patterns:
            match = re.search(pattern, html, re.DOTALL)
            if match:
                result['publish_time'] = re.sub(r'<[^>]+>', '', match.group(1)).strip()
                break
        
        # 提取封面图
        cover_patterns = [
            r'var msg_cdn_url = [\'"](https?://[^\'"]+)[\'"];',
            r'<img[^>]*data-src="(https?://mmbiz\.qpic\.cn[^"]+)"[^>]*class="rich_media_thumb',
        ]
        for pattern in cover_patterns:
            match = re.search(pattern, html, re.DOTALL)
            if match:
                result['cover_image'] = match.group(1)
                break
        
        # 提取正文
        content_match = re.search(r'<div[^>]*id="js_content"[^>]*>(.*?)</div>\s*(?:<script|</div>|<!--)', html, re.DOTALL)
        if content_match:
            content_html = content_match.group(1)
            # 处理图片
            content_html = re.sub(r'<img[^>]*data-src="([^"]+)"[^>]*>', r'\n[图片: \1]\n', content_html)
            # 保留段落和换行
            content_html = content_html.replace('</p>', '\n\n').replace('</div>', '\n').replace('</section>', '\n').replace('<br>', '\n').replace('<br/>', '\n')
            content = re.sub(r'<[^>]+>', '', content_html)
            content = content.replace('&nbsp;', ' ').replace('&quot;', '"').replace('&amp;', '&').replace('&lt;', '<').replace('&gt;', '>')
            content = re.sub(r'\n\s*\n', '\n\n', content)
            content = re.sub(r'\n{3,}', '\n\n', content)
            result['content'] = content.strip()
        
        return result
    
    @classmethod
    def fetch_article(cls, url):
        """获取文章完整信息"""
        html, error = cls.fetch_url(url)
        if html is None:
            return None, error
        
        article = cls.extract_article_content(html)
        article['url'] = url
        
        if not article['title']:
            return None, "无法提取文章标题,文章可能需要登录或已被删除"
        
        return article, None


class APIHandler(BaseHTTPRequestHandler):
    """API请求处理器"""
    
    def log_message(self, format, *args):
        """自定义日志输出"""
        print(f"[{self.log_date_time_string()}] {args[0]}")
    
    def _send_json_response(self, status_code, data):
        """发送JSON响应"""
        self.send_response(status_code)
        self.send_header('Content-Type', 'application/json; charset=utf-8')
        self.send_header('Access-Control-Allow-Origin', '*')
        self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
        self.send_header('Access-Control-Allow-Headers', 'Content-Type')
        self.end_headers()
        self.wfile.write(json.dumps(data, ensure_ascii=False, indent=2).encode('utf-8'))
    
    def _send_success(self, data, message="成功"):
        """发送成功响应 - 标准格式"""
        self._send_json_response(200, {
            'code': 200,
            'data': data,
            'message': message
        })
    
    def _send_error(self, status_code, message):
        """发送错误响应 - 标准格式"""
        self._send_json_response(status_code, {
            'code': status_code,
            'data': None,
            'message': message
        })
    
    def do_OPTIONS(self):
        """处理CORS预检请求"""
        self.send_response(200)
        self.send_header('Access-Control-Allow-Origin', '*')
        self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
        self.send_header('Access-Control-Allow-Headers', 'Content-Type')
        self.end_headers()
    
    def do_GET(self):
        """处理GET请求"""
        parsed_path = urlparse(self.path)
        path = parsed_path.path
        query_params = parse_qs(parsed_path.query)
        
        # 健康检查
        if path == '/health':
            self._send_json_response(200, {'status': 'ok', 'service': 'wechat-article-api'})
            return
        
        # 获取文章 - 支持 link 或 url 参数
        if path == '/api/fetch':
            url = query_params.get('link', [''])[0] or query_params.get('url', [''])[0]
            format_type = query_params.get('format', ['text'])[0]  # 默认返回纯文本
            self._handle_fetch(url, format_type)
            return
        
        # 404
        self._send_error(404, 'Not Found')
    
    def do_POST(self):
        """处理POST请求"""
        parsed_path = urlparse(self.path)
        path = parsed_path.path
        
        # 读取请求体
        content_length = int(self.headers.get('Content-Length', 0))
        if content_length > 0:
            body = self.rfile.read(content_length).decode('utf-8')
            try:
                data = json.loads(body) if body else {}
            except json.JSONDecodeError:
                self._send_error(400, 'Invalid JSON')
                return
        else:
            data = {}
        
        # 获取文章 - 支持 link 或 url 参数
        if path == '/api/fetch':
            url = data.get('link') or data.get('url', '')
            format_type = data.get('format', 'text')  # 默认返回纯文本
            self._handle_fetch(url, format_type)
            return
        
        # 404
        self._send_error(404, 'Not Found')
    
    def _handle_fetch(self, url, format_type):
        """处理获取文章请求"""
        # 验证URL
        if not url:
            self._send_error(400, 'Missing required parameter: url')
            return
        
        if not url.startswith('https://mp.weixin.qq.com/'):
            self._send_error(400, 'Invalid URL. Only mp.weixin.qq.com URLs are supported')
            return
        
        print(f"Fetching article: {url}")
        
        # 获取文章
        article, error = WeChatArticleExtractor.fetch_article(url)
        
        if article is None:
            self._send_error(500, f'Failed to fetch article: {error}')
            return
        
        # 根据格式返回 - 统一使用标准格式 {code, data, message}
        # data 直接返回内容,不包裹在对象中
        if format_type == 'content':
            # 只返回正文内容字符串
            self._send_success(article['content'], "成功")
        elif format_type == 'text':
            # 纯文本格式
            text_output = self._format_as_text(article)
            self._send_success(text_output, "成功")
        elif format_type == 'markdown':
            # Markdown格式
            md_output = self._format_as_markdown(article)
            self._send_success(md_output, "成功")
        else:
            # 完整JSON格式 - 返回文章对象
            self._send_success(article, "成功")
    
    def _format_as_text(self, article):
        """格式化为纯文本"""
        lines = [
            '=' * 60,
            f"标题:{article['title']}",
            f"作者:{article['author']}",
            f"发布时间:{article['publish_time']}",
            f"原文链接:{article['url']}",
            '=' * 60,
            '',
            article['content'],
            '',
            '=' * 60,
        ]
        return '\n'.join(lines)
    
    def _format_as_markdown(self, article):
        """格式化为Markdown"""
        lines = [
            f"# {article['title']}",
            '',
            f"**作者:** {article['author']}",
            '',
            f"**发布时间:** {article['publish_time']}",
            '',
            f"**原文链接:** [{article['url']}]({article['url']})",
            '',
            '---',
            '',
            article['content'],
        ]
        return '\n'.join(lines)


def run_server(port=8080):
    """启动API服务器"""
    server_address = ('0.0.0.0', port)
    httpd = HTTPServer(server_address, APIHandler)
    print(f"=" * 60)
    print(f"微信公众号文章获取 API 服务")
    print(f"=" * 60)
    print(f"服务地址: http://0.0.0.0:{port}")
    print(f"")
    print(f"API 端点:")
    print(f"  GET  /health          - 健康检查")
    print(f"  GET  /api/fetch?url=<URL>&format=<format>")
    print(f"  POST /api/fetch       - 获取文章内容")
    print(f"")
    print(f"参数:")
    print(f"  url: 微信公众号文章链接 (必填)")
    print(f"  format: 输出格式 (可选: json, text, markdown, 默认: json)")
    print(f"")
    print(f"示例:")
    print(f'  curl "http://localhost:{port}/api/fetch?url=https://mp.weixin.qq.com/s/xxxxx&format=text"')
    print(f"=" * 60)
    print(f"按 Ctrl+C 停止服务")
    print(f"")
    
    try:
        httpd.serve_forever()
    except KeyboardInterrupt:
        print("\n服务已停止")
        httpd.server_close()


def main():
    parser = argparse.ArgumentParser(
        description='微信公众号文章获取 API 服务',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
示例:
  # 启动服务 (默认端口 8080)
  python3 wechat_article_api.py

  # 指定端口
  python3 wechat_article_api.py --port 8888

  # 测试 API
  curl "http://localhost:8080/api/fetch?url=https://mp.weixin.qq.com/s/xxxxx&format=text"
        """
    )
    parser.add_argument('--port', type=int, default=8080, help='服务端口 (默认: 8080)')
    
    args = parser.parse_args()
    run_server(args.port)


if __name__ == '__main__':
    main()

使用openclaw获取微信公众号文章详情详细过程记录 获取公众号文章标题 作者 正文内容

部署教程:


好的,我来帮你整理部署文档和修改接口。 部署步骤 1. 复制API文件到新服务器 需要复制这个文件: Copy ~/.openclaw/workspace/skills/wechat-account-articles/api/wechat_article_api.py

2. 安装依赖 Copy pip3 install urllib3

3. 启动服务 Copy python3 wechat_article_api.py --port 8080

 

 

 

现在我要去对接到我的系统上了

 

代码已经长传到gitee和github

gitee地址

https://gitee.com/hhxdn/wechat-article-detail-api

使用openclaw获取微信公众号文章详情详细过程记录 获取公众号文章标题 作者 正文内容

 

github地址

https://github.com/wang1688/wechat-article-detail-api

使用openclaw获取微信公众号文章详情详细过程记录 获取公众号文章标题 作者 正文内容

 

夸克网盘地址:

https://pan.quark.cn/s/f730add345b9

使用openclaw获取微信公众号文章详情详细过程记录 获取公众号文章标题 作者 正文内容

相关软件

暂无评论

none
暂无评论...