# Playwright 使用


<!--more-->




## Docker部署

**前台运行（调试模式）**

前台运行模式适合开发和调试，退出容器时会被自动删除：

```bash
docker run --rm -it --init \
           --user pwuser --workdir /home/pwuser \
           -p 3000:3000 \
           mcr.microsoft.com/playwright:v1.51.0-noble \
           /bin/sh -c "npx -y playwright@1.51.0 run-server --port 3000 --host 0.0.0.0"
```

<hr>

**后台运行（生产模式）**

后台运行模式适合生产环境，容器会持续运行：

```bash
docker run -d -it --init \
           --user pwuser --workdir /home/pwuser \
           --add-host hostmachine:host-gateway \
           -p 3000:3000 \
           mcr.microsoft.com/playwright:v1.52.0-noble \
           /bin/sh -c "npx -y playwright@1.52.0 run-server --port 3000 --host 0.0.0.0"
```

> **💡 使用提示**: 如果 CI/脚本环境不支持 `\` 续行符，可以直接删除换行和 `\` 符号，合并成单行命令。

<hr>

**关键参数说明**

| 参数 | 说明 |
|------|------|
| `--rm` | 容器退出时自动删除，避免资源占用 |
| `-it` | `-i` 保持输入流开放，`-t` 分配伪终端 |
| `--init` | 使用轻量级初始化进程处理信号，防止僵尸进程 |
| `-p 3000:3000` | 映射主机端口到容器端口 |
| `--user pwuser` | 使用非 root 用户运行，提高安全性 |
| `--workdir /home/pwuser` | 设置容器工作目录 |

<hr>

**命令执行流程**

该命令将执行以下操作：

1. **启动容器**: 基于 `mcr.microsoft.com/playwright:v1.52.0-noble` 镜像
2. **端口映射**: 将主机 3000 端口映射到容器 3000 端口
3. **用户权限**: 使用 `pwuser` 用户而非 root 运行
4. **环境配置**: 设置工作目录为 `/home/pwuser`
5. **服务启动**: 通过 `npx` 启动 Playwright 服务器，监听所有网络接口



## Python示例


**测试用例**

``` python {data-open=true,title="测试用例"}
import unittest
from playwright.async_api import async_playwright

class TestPlaywright(unittest.IsolatedAsyncioTestCase):

    """
    获取的是整个网页的完整 HTML 内容
    """
    async def test_playwright(self):
        async with async_playwright() as p:
            browser = await p.chromium.connect("ws://ip:3000/")
            page = await browser.new_page()
            await page.goto("https://example.com")
            content = await page.content()
            print(content)
            await browser.close()

    """
    提取文本
    """
    async def test_playwright_extract_txt(self):
        async with async_playwright() as p:
            browser = await p.chromium.connect("ws://ip:3000/")
            page = await browser.new_page()
            await page.goto("https://example.com")

            # 只要纯文本，不要 HTML 标签
            # 使用 JavaScript 提取纯文本
            text_content = await page.evaluate('''() => {
                return document.body.innerText || document.documentElement.innerText;
            }''')

            # 清洗格式（空行、缩进等）
            # text_content = await page.evaluate('''() => {
            #     return document.body.innerText.replace(/\\s+/g, ' ').trim();
            # }''')

            print(text_content)

            await browser.close()


if __name__ == "__main__":
    unittest.main()
```

<br>

**设置请求头**

``` python {data-open=true,title="设置请求头（Headers）"}
async def test_playwright():
    async with async_playwright() as p:
        # 连接远程浏览器
        browser = await p.chromium.connect("ws://ip:3000/")

        # 创建带有自定义 headers 的上下文
        context = await browser.new_context(extra_http_headers={
            "User-Agent": "MyCustomUserAgent/1.0",
            "Authorization": "Bearer YOUR_TOKEN_HERE",
            "X-Custom-Header": "SomeValue"
        })

        # 在该上下文中打开新页面
        page = await context.new_page()
        await page.goto("https://example.com")

        # 获取内容或其他操作
        content = await page.content()
        print(content)

        # 关闭 context 和 browser（注意：不要关闭远程浏览器实例）
        await context.close()
        # await browser.close()  # 不建议关闭远程浏览器连接，除非你知道自己在做什么
```

<br>

**抓取多个网页**

``` python {data-open=true,title="抓取多个网页"}
import asyncio
from playwright.async_api import async_playwright

async def test_playwright():
    urls = [
        "https://example.com",
        "https://example.org",
        "https://example.net"
    ]
    
    async with async_playwright() as p:
        try:
            # 连接远程浏览器
            browser = await p.chromium.connect("ws://ip:3000/")
            
            # 创建带有自定义 headers 的上下文
            context = await browser.new_context(extra_http_headers={
                "User-Agent": "MyCustomUserAgent/1.0",
                "Authorization": "Bearer YOUR_TOKEN_HERE",
                "X-Custom-Header": "SomeValue"
            })

            for url in urls:
                try:
                    # 在该上下文中打开新页面
                    page = await context.new_page()
                    await page.goto(url)
                    
                    # 获取内容或其他操作
                    content = await page.content()
                    print(f"Content of {url}:")
                    print(content)
                
                except Exception as e:
                    print(f"Error while processing {url}: {e}")
                
                finally:
                    # 关闭当前页面
                    if 'page' in locals():
                        await page.close()

        except Exception as e:
            print(f"An error occurred: {e}")

        finally:
            # 确保上下文被关闭
            if 'context' in locals():
                await context.close()

# 运行异步函数
asyncio.run(test_playwright())
```

<br>

**完整样例**

``` python {data-open=true,title="例子"}
"""
抓取url网址内容
"""
async def fetch_page(doc:Dict[str,Any], currLevel: int = 1, maxLevel: int = 1, kbId: str = None, white_list: List[str] = None, loadURL:Dict[str,int]={}, expression:List[str] = None, headers: Dict[str, Any] ={}):
    url = doc.get("path")
    if currLevel > maxLevel:
        return
    if loadURL.get(url) is not None:
        return
    loadURL[url] = 1
    # 检查是否有文件被上传
    # 使用 Playwright 抓取网页
    async with async_playwright() as p:
        try:
            browser = await p.chromium.connect(yamlConfig.get("playwright").get("url"))

            # 创建带有自定义 headers 的上下文
            context = await browser.new_context(extra_http_headers=headers)

            # 在该上下文中打开新页面
            page = await context.new_page()
            # networkidle:至少500毫秒没有网络连接活动时，Playwright 将认为页面已加载完毕
            await page.goto(url, wait_until='networkidle')

            raw_html = await page.content() # HTML 内容(主要用于获取页面标题)
            links = []      # 页面中的链接
            full_text = ""  # 纯文本

            # 如果有XPath表达式
            if len(expression) > 0 :
                # 处理每个 XPath 表达式
                for xpath in expression:
                    elements_html = await get_elements_by_xpath(page, xpath)
                    if not elements_html:
                        continue

                    # 每个 XPath 可能匹配多个元素
                    for html in elements_html:
                        soup = BeautifulSoup(html, 'lxml')
                        # 提取链接并加入到全局links列表中
                        links.extend([a.get('href') for a in soup.find_all('a', href=True) if a.get('href') not in links])
                        # 提取纯文本并拼接到full_text
                        full_text += soup.get_text(separator=' ', strip=True) + " "

            else:
                # 使用 BeautifulSoup 解析 HTML
                soup = BeautifulSoup(raw_html, "lxml")

                # 提取页面中的所有链接
                links = {a.get('href') for a in soup.find_all('a', href=True)}

                # 纯文本内容
                full_text = await page.evaluate('''() => {
                    return document.body.innerText.replace(/\\s+/g, ' ').trim();
                }''')

            # 关闭页面
            await page.close()

            if len(full_text) == 0:
                return ReturnDatas.ErrorResponse(message="The uploaded webpage text is empty!")

            # 查找 <title> 标签并获取其内容
            start_title = raw_html.find('<title>')
            end_title = raw_html.find('</title>')

            if start_title != -1 and end_title != -1:
                # 提取 <title> 中间的文本内容
                title = raw_html[start_title + 7:end_title].strip()
            else:
                title = "No Title Found"

            if len(title) == 0:
                return ReturnDatas.ErrorResponse(message="The title is empty!")

        except Exception as e:
            core_logger.exception(e)
            traceback.print_exc()


        finally:
            if 'context' in locals():
                await context.close()


"""
根据 XPath 表达式获取标签
"""
async def get_elements_by_xpath(page, xpath_expr):
    # 使用 page.locator() 方法结合 XPath 选择器
    locator = page.locator(f'xpath={xpath_expr}')

    # 获取所有匹配的元素数量
    count = await locator.count()
    if count == 0:
        return []

    results = []
    for i in range(count):
        # 获取每个元素的 HTML 内容
        html = await locator.nth(i).evaluate('e => e.outerHTML')
        results.append(html)

    return results
```




---
{.awesome-hr}

---

> 作者: [piliqiu](https://piliqiu.com/)  
> URL: https://piliqiu.com/posts/playwright%E4%BD%BF%E7%94%A8/  

