用 Dify 处理乌云前辈们的报告并做成知识库能收获些什么?

前排提醒:吃水不忘挖井人,现在能挖出漏洞,大多数都是靠的前辈们那些数不清的技巧和手法,这些宝贵的经验不仅缩短了后者学习的路径,更让我们在面对日益复杂的防御机制时,能够站在巨人的肩膀上,观察到更远的安全边界

早在 Dify 刚出来时,我就已经着手用自己报告做 RAG 了,不过那时候报告很少,搞出来的东西基本上没什么用,于是在 2025 年初,用前辈们的乌云报告搓出了一个 bot,现在跟各位师傅汇报一下这一年使用下来的一些感受

在开始之前,需要把乌云的报告弄到手,我在很早之前写过一个爬乌云报告文章并转换成 pdf 的脚本,详见如下,当时是用来爬线上公开的乌云报告,后面有百度网盘会员了才下了乌云镜像导本地的报告

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
import os
import re
import time
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from tqdm import tqdm
from pathlib import Path
import html2text
import hashlib
import pdfkit
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

class WooYunCrawler:
def __init__(self, base_url="http://192.168.50.103", output_dir="output"):
self.base_url = base_url
self.output_dir = output_dir
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})

Path(self.output_dir).mkdir(parents=True, exist_ok=True)
self.images_dir = os.path.join(self.output_dir, 'images')
Path(self.images_dir).mkdir(parents=True, exist_ok=True)
self.wkhtmltopdf_path = self._find_wkhtmltopdf()
self.file_lock = threading.Lock()

def _find_wkhtmltopdf(self):
import shutil
common_paths = [
'/usr/local/bin/wkhtmltopdf',
'/usr/bin/wkhtmltopdf',
'/opt/homebrew/bin/wkhtmltopdf',
shutil.which('wkhtmltopdf')
]

for path in common_paths:
if path and os.path.exists(path):
return path
return None

def get_page(self, url, retry=3):
for i in range(retry):
try:
response = self.session.get(url, timeout=30)
response.raise_for_status()
response.encoding = 'utf-8'
return response.text
except Exception as e:
if i == retry - 1:
print(f"获取页面失败 {url}: {e}")
return None
time.sleep(2)
return None

def get_total_pages(self):
url = f"{self.base_url}/bugs.php?page=1500"
html = self.get_page(url)
if not html:
return 1

soup = BeautifulSoup(html, 'lxml')
max_page = 1

pagination = soup.find('div', class_=re.compile('page|pagination', re.I))
if not pagination:
pagination = soup.find(string=re.compile('末页|下一页|上一页', re.I))
if pagination:
pagination = pagination.find_parent()

if pagination:
page_links = pagination.find_all('a')
for link in page_links:
href = link.get('href', '')
if 'page=' in href:
try:
page_num = int(re.search(r'page=(\d+)', href).group(1))
max_page = max(max_page, page_num)
except:
pass

if max_page == 1:
page_text = soup.get_text()
page_match = re.search(r'共\s*\d+\s*条记录[,,]\s*(\d+)\s*页', page_text)
if page_match:
max_page = int(page_match.group(1))

if max_page == 1:
last_page_link = soup.find('a', string=re.compile('末页|最后一页', re.I))
if not last_page_link:
for link in soup.find_all('a'):
if '末页' in link.get_text() or '最后一页' in link.get_text():
last_page_link = link
break

if last_page_link:
href = last_page_link.get('href', '')
match = re.search(r'page=(\d+)', href)
if match:
max_page = int(match.group(1))

if max_page == 1:
print("无法自动检测总页数,将尝试递增查找...")
for page in range(2, 100):
test_url = f"{self.base_url}/bugs.php?page={page}"
html = self.get_page(test_url)
if html:
test_soup = BeautifulSoup(html, 'lxml')
test_links = test_soup.find_all('a', href=re.compile(r'bug_detail\.php.*wybug_id='))
if test_links:
max_page = page
else:
if page > 5:
break
else:
break
time.sleep(0.2)

if max_page > 1000:
print(f"检测到页数: {max_page},正在验证...")
test_pages = [1, 100, 500, 1000, max_page]
actual_max = 1
for test_page in test_pages:
if test_page > max_page:
break
test_url = f"{self.base_url}/bugs.php?page={test_page}"
html = self.get_page(test_url)
if html:
test_soup = BeautifulSoup(html, 'lxml')
test_links = test_soup.find_all('a', href=re.compile(r'bug_detail\.php.*wybug_id='))
if test_links:
actual_max = test_page
print(f" 第{test_page}页有内容")
else:
print(f" 第{test_page}页无内容,停止验证")
break
else:
break
time.sleep(0.1)

if actual_max >= 100:
print(f"验证通过,将爬取所有 {max_page} 页")
else:
print(f"验证失败,将爬取前 {actual_max * 2} 页")
max_page = actual_max * 2

return max(1, max_page)

def get_article_links_from_page(self, page_num):
url = f"{self.base_url}/bugs.php?page={page_num}"
html = self.get_page(url)
if not html:
return []

soup = BeautifulSoup(html, 'lxml')
links = []
seen_urls = set()

article_links = soup.find_all('a', href=re.compile(r'bug_detail\.php\?wybug_id='))

if not article_links:
article_links = soup.find_all('a', href=re.compile(r'bug_detail\.php'))

if not article_links:
all_links = soup.find_all('a', href=True)
for link in all_links:
href = link.get('href', '')
if 'bug_detail.php' in href and 'wybug_id=' in href:
article_links.append(link)

for link in article_links:
href = link.get('href', '')
if not href.startswith('http'):
href = urljoin(url, href)

id_match = re.search(r'wybug_id=([^&]+)', href)
if id_match:
article_id = id_match.group(1)
full_url = urljoin(self.base_url, href)

if full_url in seen_urls:
continue
seen_urls.add(full_url)

title = link.get_text(strip=True)
if not title or len(title) < 3:
parent = link.parent
if parent:
parent_text = parent.get_text(strip=True)
if parent_text and parent_text != title:
title = parent_text

if not title or len(title) < 3:
next_sibling = link.find_next_sibling()
if next_sibling:
title = next_sibling.get_text(strip=True)

if title:
title = re.sub(r'\s+', ' ', title).strip()

if title and len(title) > 3 and title not in ['首页', '登录', '注册', '上一页', '下一页', '末页', '搜索']:
links.append({
'url': full_url,
'title': title,
'id': article_id
})

if not links and page_num <= 5:
debug_file = os.path.join(self.output_dir, f'debug_page_{page_num}.html')
with open(debug_file, 'w', encoding='utf-8') as f:
f.write(html)

all_links = soup.find_all('a', href=True)
print(f"\n调试信息 - 第{page_num}页:")
print(f" 页面中所有链接数量: {len(all_links)}")
print(f" 包含'bugs.php'的链接: {len([l for l in all_links if 'bugs.php' in l.get('href', '')])}")
print(f" HTML已保存到: {debug_file}")

print(f" 前10个链接示例:")
for i, link in enumerate(all_links[:10], 1):
href = link.get('href', '')
text = link.get_text(strip=True)[:30]
print(f" {i}. {href} - {text}")

return links

def get_all_article_links(self):
print("正在获取总页数...")
total_pages = self.get_total_pages()
print(f"共找到 {total_pages} 页")

all_links = []

print("正在爬取所有文章链接...")
for page in tqdm(range(1, total_pages + 1), desc="爬取页面"):
links = self.get_article_links_from_page(page)
all_links.extend(links)
time.sleep(0.5)

print(f"共找到 {len(all_links)} 篇文章")
return all_links

def get_article_content(self, url, article_id, report_name):
html = self.get_page(url)
if not html:
return None

soup = BeautifulSoup(html, 'lxml')

title = soup.find('title')
if title:
title = title.get_text(strip=True)
else:
title = report_name

processed_html = self.prepare_html_with_local_images(html, url, article_id)

return {
'title': title,
'report_name': report_name,
'url': url,
'html': processed_html
}

def sanitize_filename(self, filename):
filename = re.sub(r'[<>:"/\\|?*]', '_', filename)
filename = filename.strip('. ')
if len(filename) > 200:
filename = filename[:200]
return filename

def download_image(self, img_url, article_id):
try:
if not img_url.startswith('http'):
img_url = urljoin(self.base_url, img_url)

url_hash = hashlib.md5(img_url.encode()).hexdigest()[:8]
ext = os.path.splitext(urlparse(img_url).path)[1] or '.jpg'
if ext not in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp']:
ext = '.jpg'

filename = f"{article_id}_{url_hash}{ext}"
filepath = os.path.join(self.images_dir, filename)

if os.path.exists(filepath):
return filepath

response = self.session.get(img_url, timeout=30, stream=True)
response.raise_for_status()

with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)

return filepath
except Exception as e:
return None

def prepare_html_with_local_images(self, html, url, article_id):
if not html:
return None

soup = BeautifulSoup(html, 'lxml')

for script in soup.find_all('script'):
script.decompose()

for nav in soup.find_all(['nav', 'header', 'footer', 'aside']):
nav.decompose()

for form in soup.find_all('form'):
if 'search' in str(form).lower():
form.decompose()

img_tags = soup.find_all('img')
for img in img_tags:
img_src = img.get('src') or img.get('data-src') or img.get('data-original')
if img_src:
img_src_lower = img_src.lower()
if any(skip in img_src_lower for skip in ['logo', 'icon', 'avatar', 'button', 'bg', 'background', 'favicon', 'ewm', 'weixin']):
continue

if not img_src.startswith('http'):
img_src = urljoin(url, img_src)

local_path = self.download_image(img_src, article_id)
if local_path:
rel_path = os.path.relpath(local_path, self.output_dir)
img['src'] = rel_path

return str(soup)

def save_to_pdf(self, article):
url = article.get('url', '')
report_name = article.get('report_name', '')
html = article.get('html', '')

if not html:
return None

safe_filename = self.sanitize_filename(report_name)
filename = f"{safe_filename}.pdf"
filepath = os.path.join(self.output_dir, filename)

if os.path.exists(filepath):
counter = 1
while os.path.exists(filepath):
filename = f"{safe_filename}_{counter}.pdf"
filepath = os.path.join(self.output_dir, filename)
counter += 1

try:
temp_html = os.path.join(self.output_dir, f'temp_{hashlib.md5(url.encode()).hexdigest()}.html')
with open(temp_html, 'w', encoding='utf-8') as f:
f.write(html)

options = {
'page-size': 'A4',
'margin-top': '10mm',
'margin-right': '10mm',
'margin-bottom': '10mm',
'margin-left': '10mm',
'encoding': "UTF-8",
'no-outline': None,
'enable-local-file-access': None,
'print-media-type': None,
}

if self.wkhtmltopdf_path:
pdfkit.from_file(temp_html, filepath, options=options, configuration=pdfkit.configuration(wkhtmltopdf=self.wkhtmltopdf_path))
else:
pdfkit.from_file(temp_html, filepath, options=options)

try:
os.remove(temp_html)
except:
pass

return filepath

except Exception as e:
print(f"生成PDF失败 {filename}: {e}")
return None
finally:
if 'temp_html' in locals():
try:
os.remove(temp_html)
except:
pass

def crawl_all(self):
print("正在获取总页数...")
total_pages = self.get_total_pages()
print(f"共找到 {total_pages} 页")

links_file = os.path.join(self.output_dir, 'article_links.txt')
links_fp = open(links_file, 'w', encoding='utf-8')

success_count = 0
fail_count = 0
total_articles = 0

print("\n开始爬取文章并生成PDF...")

empty_page_count = 0
max_empty_pages = 100

for page in range(1500, total_pages + 1):
print(f"\n正在处理第 {page}/{total_pages} 页...")
article_links = self.get_article_links_from_page(page)

if not article_links:
empty_page_count += 1
if empty_page_count >= max_empty_pages:
break
continue

empty_page_count = 0

def process_article(link):
try:
with self.file_lock:
links_fp.write(f"{link['url']}\t{link['title']}\n")
links_fp.flush()

article = self.get_article_content(link['url'], link.get('id', ''), link['title'])

if article:
filepath = self.save_to_pdf(article)
if filepath:
return (True, f"✓ 已保存: {link['title'][:50]}...")
else:
return (False, f"✗ 保存失败: {link['title'][:50]}...")
else:
return (False, f"✗ 爬取失败: {link['title'][:50]}...")
except Exception as e:
return (False, f"✗ 处理失败 {link['title'][:50]}: {str(e)[:50]}")

max_workers = 10
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_article, link): link for link in article_links}

with tqdm(total=len(futures), desc=f"第{page}页") as pbar:
for future in as_completed(futures):
total_articles += 1
try:
success, message = future.result()
if success:
success_count += 1
else:
fail_count += 1
print(message)
except Exception as e:
fail_count += 1
pbar.update(1)
if page < total_pages:
time.sleep(0.5)
links_fp.close()
print(f"\n完成!")

def main():
crawler = WooYunCrawler(
base_url="http://192.168.50.103",
output_dir="output"
)
crawler.crawl_all()

if __name__ == "__main__":
main()

乌云社区一共有 8w 左右的报告,加上一些 Wooyun Drops,以及各大网安会议,整理完这些报告后,接下来要做的就是部署 Dify + 导入知识库,Dify 搭建略过,这里来分享下我是如何配置知识库的,我先通过每篇 pdf 大小 + pdf 标题来判断这篇文章是否精华,然后放到自己的预训练集文件夹里,最后再通过人工复核一共筛选出了 1w 左右的报告(耐看王)

在入库之前,需要解决以下几个问题

  1. token 消耗问题
  2. pdf 图片问题
  3. 大模型理解问题

我目前所有的报告都使用 pdf 来进行存储,能更好的保存图片里的信息,不过问题就在于如何让大模型来理解图片里面的内容,这里我对每篇报告都做了 OCR 处理,并且在弹片报告末尾采用论文 #1、#2 形式来定位文内图片内容

在设置这里,我用的是 Parent-Child(父子模式),子块(Child)设为 300 token,父块(Parent)设为 1500 token,检索模式为混合检索(Hybrid Search),权重设置为了语义 (Vector) 0.7 : 关键字 (Full-text) 0.3,至于 Top K 和 Score 阈值,我 Top K 设置为了 6,因为安全报告往往比较长,给 AI 太多文档会导致它处理不过来,所以设置这个分段足够涵盖大多数漏洞的复现过程,Score 阈值为 0.5,Embedding 模型用的千问 text-embedding-v4,注:这里列举的为其中一个知识库的配置,还有别的调好的,这里就不放出来了

叠甲:本人对 AI 一窍不通,上文如有技术性错误请谅解

配置完这些后可以用召回测试来查询文本测试知识的召回效果,然后就是 token 消耗问题,这个没啥好说的,砸钱就行,至于大模型理解问题,这个写了一段 SYSTEM PROMPT 强行越狱允许输出一些恶意的代码

搞完了上面这些东西后,接下来就是配置工作流,这个没啥好弄的,直接在大模型前面甩一个知识库即可

全部配置完成后,就可以在探索界面里面开始和这些 bot 对话了


简单放两张吧,接下来就是总结了,经过这一年多的折腾,我个人认为这些报告确实有点老了,怎么说呢,就是感觉以前真的是一个捡漏洞的时代(),随便打开一个站点都能找到漏洞,并且厂商修复也是非常及时,你甚至能在评论区看见不少用户和厂商 battle,然而,这个 bot 搓出来后我已经很久没在用了,弄这个知识库还亏了我几十块钱

不过相比 Claude Skills 还好,就当学习 AI 路径上要过的一个门槛罢了,后续还会搞一些好玩的东西,先藏着掖着吧,看看安全圈的师傅们怎么搞的,某家安全厂商还做了一个熊猫 Wiki,我也尝试用这个东西来做乌云文章的 RAG,但是他们那个东西有文件上传限制,搞半天后就没再搞了

至于后续发展,那当然是往我最喜欢的服装制造方向进行发展,如何用 AI 来理解一件衣服的特性、亮点,这是非常重要的,我们要教给 AI 的,不仅仅是识别这是一件“真丝衬衫”,更重要的是,我们要让它读懂 19 姆米真丝的垂坠感指标、抗皱系数,以及它在不同光源下呈现的偏光特性,这就像是分析软件的底层架构,面料就是服装的底层代码

一件衣服之所以成为“神作”,往往藏在那些“非标”的细节里,是那道 0.1cm 的极细压线,还是领口处为了贴合颈部曲线而做的 15 度斜裁?AI 应该像扫描 PoC(漏洞证明)一样,精准地捕捉到这些设计师留下的“彩蛋”,并将这些抽象的美感,转化为结构化的核心卖点