1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
"""
抓取url网址内容
"""
async def fetch_page(doc:Dict[str,Any], currLevel: int = 1, maxLevel: int = 1, kbId: str = None, white_list: List[str] = None, loadURL:Dict[str,int]={}, expression:List[str] = None, headers: Dict[str, Any] ={}):
url = doc.get("path")
if currLevel > maxLevel:
return
if loadURL.get(url) is not None:
return
loadURL[url] = 1
# 检查是否有文件被上传
# 使用 Playwright 抓取网页
async with async_playwright() as p:
try:
browser = await p.chromium.connect(yamlConfig.get("playwright").get("url"))
# 创建带有自定义 headers 的上下文
context = await browser.new_context(extra_http_headers=headers)
# 在该上下文中打开新页面
page = await context.new_page()
# networkidle:至少500毫秒没有网络连接活动时,Playwright 将认为页面已加载完毕
await page.goto(url, wait_until='networkidle')
raw_html = await page.content() # HTML 内容(主要用于获取页面标题)
links = [] # 页面中的链接
full_text = "" # 纯文本
# 如果有XPath表达式
if len(expression) > 0 :
# 处理每个 XPath 表达式
for xpath in expression:
elements_html = await get_elements_by_xpath(page, xpath)
if not elements_html:
continue
# 每个 XPath 可能匹配多个元素
for html in elements_html:
soup = BeautifulSoup(html, 'lxml')
# 提取链接并加入到全局links列表中
links.extend([a.get('href') for a in soup.find_all('a', href=True) if a.get('href') not in links])
# 提取纯文本并拼接到full_text
full_text += soup.get_text(separator=' ', strip=True) + " "
else:
# 使用 BeautifulSoup 解析 HTML
soup = BeautifulSoup(raw_html, "lxml")
# 提取页面中的所有链接
links = {a.get('href') for a in soup.find_all('a', href=True)}
# 纯文本内容
full_text = await page.evaluate('''() => {
return document.body.innerText.replace(/\\s+/g, ' ').trim();
}''')
# 关闭页面
await page.close()
if len(full_text) == 0:
return ReturnDatas.ErrorResponse(message="The uploaded webpage text is empty!")
# 查找 <title> 标签并获取其内容
start_title = raw_html.find('<title>')
end_title = raw_html.find('</title>')
if start_title != -1 and end_title != -1:
# 提取 <title> 中间的文本内容
title = raw_html[start_title + 7:end_title].strip()
else:
title = "No Title Found"
if len(title) == 0:
return ReturnDatas.ErrorResponse(message="The title is empty!")
except Exception as e:
core_logger.exception(e)
traceback.print_exc()
finally:
if 'context' in locals():
await context.close()
"""
根据 XPath 表达式获取标签
"""
async def get_elements_by_xpath(page, xpath_expr):
# 使用 page.locator() 方法结合 XPath 选择器
locator = page.locator(f'xpath={xpath_expr}')
# 获取所有匹配的元素数量
count = await locator.count()
if count == 0:
return []
results = []
for i in range(count):
# 获取每个元素的 HTML 内容
html = await locator.nth(i).evaluate('e => e.outerHTML')
results.append(html)
return results
|