Playwright 抓取网页

启动 docker 容器

1
2
3
4
5
# 前台运行,推出后删除容器
docker run -p 3000:3000 --rm --init -it --workdir /home/pwuser --user pwuser mcr.microsoft.com/playwright:v1.51.0-noble /bin/sh -c "npx -y playwright@1.51.0 run-server --port 3000 --host 0.0.0.0"

# 后台运行
docker run --add-host=hostmachine:host-gateway -p 3000:3000 -d --init -it --workdir /home/pwuser --user pwuser mcr.microsoft.com/playwright:v1.52.0-noble /bin/sh -c "npx -y playwright@1.52.0 run-server --port 3000 --host 0.0.0.0"
命令详解

参数说明

  • docker run: 启动一个新的容器。
  • -p 3000:3000: 将主机的端口 3000 映射到容器内的端口 3000。这允许外部网络通过主机的 IP 地址和端口 3000 访问运行在容器内的服务。
  • –rm: 当容器退出时自动删除该容器。这有助于避免未使用的容器占用系统资源。
  • –init: 在容器中使用一个小的初始化进程作为 PID 1,以确保正确处理信号并防止僵尸进程出现。
  • -it:
  • -i (interactive) 保持 STDIN 开放,即使没有附加也保持打开状态。
  • -t (tty) 分配一个伪 TTY(终端),这对于交互式命令非常有用。
  • –workdir /home/pwuser: 设置容器内的工作目录为 /home/pwuser。所有后续命令都会在这个目录下执行。
  • –user pwuser: 指定容器内运行进程的用户为 pwuser 而不是默认的 root 用户。这是为了提高安全性,尤其是在处理不受信任的内容时。
  • mcr.microsoft.com/playwright:v1.51.0-noble: 使用的 Docker 镜像的名字和标签。这里使用的是 Microsoft 提供的 Playwright 版本 1.51.0 的镜像。
  • /bin/sh -c “npx -y playwright@1.51.0 run-server –port 3000 –host 0.0.0.0”:

此命令的作用是:

  • 启动一个基于 mcr.microsoft.com/playwright:v1.51.0-noble 镜像的 Docker 容器。
  • 将主机的 3000 端口映射到容器内的 3000 端口。
  • 使用 pwuser 用户而非 root 用户来运行容器内的进程。
  • 设置容器的工作目录为 /home/pwuser。
  • 使用 /bin/sh shell 执行命令:使用 npx 工具以非交互方式安装并运行 Playwright 服务器,监听在 3000 端口且可被外部访问(通过设置 –host 0.0.0.0)。

在 python 中使用

测试用例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import unittest
from playwright.async_api import async_playwright

class TestPlaywright(unittest.IsolatedAsyncioTestCase):

    """
    获取的是整个网页的完整 HTML 内容
    """
    async def test_playwright(self):
        async with async_playwright() as p:
            browser = await p.chromium.connect("ws://ip:3000/")
            page = await browser.new_page()
            await page.goto("https://example.com")
            content = await page.content()
            print(content)
            await browser.close()

    """
    提取文本
    """
    async def test_playwright_extract_txt(self):
        async with async_playwright() as p:
            browser = await p.chromium.connect("ws://ip:3000/")
            page = await browser.new_page()
            await page.goto("https://example.com")

            # 只要纯文本,不要 HTML 标签
            # 使用 JavaScript 提取纯文本
            text_content = await page.evaluate('''() => {
                return document.body.innerText || document.documentElement.innerText;
            }''')

            # 清洗格式(空行、缩进等)
            # text_content = await page.evaluate('''() => {
            #     return document.body.innerText.replace(/\\s+/g, ' ').trim();
            # }''')

            print(text_content)

            await browser.close()


if __name__ == "__main__":
    unittest.main()

设置请求头(Headers)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
async def test_playwright():
    async with async_playwright() as p:
        # 连接远程浏览器
        browser = await p.chromium.connect("ws://ip:3000/")

        # 创建带有自定义 headers 的上下文
        context = await browser.new_context(extra_http_headers={
            "User-Agent": "MyCustomUserAgent/1.0",
            "Authorization": "Bearer YOUR_TOKEN_HERE",
            "X-Custom-Header": "SomeValue"
        })

        # 在该上下文中打开新页面
        page = await context.new_page()
        await page.goto("https://example.com")

        # 获取内容或其他操作
        content = await page.content()
        print(content)

        # 关闭 context 和 browser(注意:不要关闭远程浏览器实例)
        await context.close()
        # await browser.close()  # 不建议关闭远程浏览器连接,除非你知道自己在做什么

抓取多个网页

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import asyncio
from playwright.async_api import async_playwright

async def test_playwright():
    urls = [
        "https://example.com",
        "https://example.org",
        "https://example.net"
    ]
    
    async with async_playwright() as p:
        try:
            # 连接远程浏览器
            browser = await p.chromium.connect("ws://ip:3000/")
            
            # 创建带有自定义 headers 的上下文
            context = await browser.new_context(extra_http_headers={
                "User-Agent": "MyCustomUserAgent/1.0",
                "Authorization": "Bearer YOUR_TOKEN_HERE",
                "X-Custom-Header": "SomeValue"
            })

            for url in urls:
                try:
                    # 在该上下文中打开新页面
                    page = await context.new_page()
                    await page.goto(url)
                    
                    # 获取内容或其他操作
                    content = await page.content()
                    print(f"Content of {url}:")
                    print(content)
                
                except Exception as e:
                    print(f"Error while processing {url}: {e}")
                
                finally:
                    # 关闭当前页面
                    if 'page' in locals():
                        await page.close()

        except Exception as e:
            print(f"An error occurred: {e}")

        finally:
            # 确保上下文被关闭
            if 'context' in locals():
                await context.close()

# 运行异步函数
asyncio.run(test_playwright())

整体例子

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
"""
抓取url网址内容
"""
async def fetch_page(doc:Dict[str,Any], currLevel: int = 1, maxLevel: int = 1, kbId: str = None, white_list: List[str] = None, loadURL:Dict[str,int]={}, expression:List[str] = None, headers: Dict[str, Any] ={}):
    url = doc.get("path")
    if currLevel > maxLevel:
        return
    if loadURL.get(url) is not None:
        return
    loadURL[url] = 1
    # 检查是否有文件被上传
    # 使用 Playwright 抓取网页
    async with async_playwright() as p:
        try:
            browser = await p.chromium.connect(yamlConfig.get("playwright").get("url"))

            # 创建带有自定义 headers 的上下文
            context = await browser.new_context(extra_http_headers=headers)

            # 在该上下文中打开新页面
            page = await context.new_page()
            # networkidle:至少500毫秒没有网络连接活动时,Playwright 将认为页面已加载完毕
            await page.goto(url, wait_until='networkidle')

            raw_html = await page.content() # HTML 内容(主要用于获取页面标题)
            links = []      # 页面中的链接
            full_text = ""  # 纯文本

            # 如果有XPath表达式
            if len(expression) > 0 :
                # 处理每个 XPath 表达式
                for xpath in expression:
                    elements_html = await get_elements_by_xpath(page, xpath)
                    if not elements_html:
                        continue

                    # 每个 XPath 可能匹配多个元素
                    for html in elements_html:
                        soup = BeautifulSoup(html, 'lxml')
                        # 提取链接并加入到全局links列表中
                        links.extend([a.get('href') for a in soup.find_all('a', href=True) if a.get('href') not in links])
                        # 提取纯文本并拼接到full_text
                        full_text += soup.get_text(separator=' ', strip=True) + " "

            else:
                # 使用 BeautifulSoup 解析 HTML
                soup = BeautifulSoup(raw_html, "lxml")

                # 提取页面中的所有链接
                links = {a.get('href') for a in soup.find_all('a', href=True)}

                # 纯文本内容
                full_text = await page.evaluate('''() => {
                    return document.body.innerText.replace(/\\s+/g, ' ').trim();
                }''')

            # 关闭页面
            await page.close()

            if len(full_text) == 0:
                return ReturnDatas.ErrorResponse(message="The uploaded webpage text is empty!")

            # 查找 <title> 标签并获取其内容
            start_title = raw_html.find('<title>')
            end_title = raw_html.find('</title>')

            if start_title != -1 and end_title != -1:
                # 提取 <title> 中间的文本内容
                title = raw_html[start_title + 7:end_title].strip()
            else:
                title = "No Title Found"

            if len(title) == 0:
                return ReturnDatas.ErrorResponse(message="The title is empty!")

        except Exception as e:
            core_logger.exception(e)
            traceback.print_exc()


        finally:
            if 'context' in locals():
                await context.close()


"""
根据 XPath 表达式获取标签
"""
async def get_elements_by_xpath(page, xpath_expr):
    # 使用 page.locator() 方法结合 XPath 选择器
    locator = page.locator(f'xpath={xpath_expr}')

    # 获取所有匹配的元素数量
    count = await locator.count()
    if count == 0:
        return []

    results = []
    for i in range(count):
        # 获取每个元素的 HTML 内容
        html = await locator.nth(i).evaluate('e => e.outerHTML')
        results.append(html)

    return results

0%