发布于 

tk简介里获取邮箱

说明

xlsx表格和spider表格有红人id,主页链接,粉丝数,类型,邮箱。这5列,参考下面

红人id:xxx
主页链接:https://www.tiktok.com/@xxx

spider.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import re
import asyncio
import random
import pandas as pd
from playwright.async_api import async_playwright

# 邮箱正则表达式
EMAIL_REGEX = r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}"

# 随机延迟范围(秒)
MIN_DELAY = 2
MAX_DELAY = 8

# 用户代理池
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15"
]

async def crawl_email(index, homepage):
"""异步爬取单个主页的邮箱"""
result_msg = "未抓取到邮箱" # 默认状态

try:
# 随机延迟
await asyncio.sleep(random.uniform(MIN_DELAY, MAX_DELAY))

async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(
user_agent=random.choice(USER_AGENTS)
)
page = await context.new_page()

try:
# 设置超时
await page.goto(homepage, timeout=30000, wait_until="domcontentloaded")

# 随机滚动
if random.random() > 0.5:
await page.evaluate("window.scrollBy(0, window.innerHeight)")
await asyncio.sleep(random.uniform(0.5, 2))

# 尝试获取bio信息
try:
await page.wait_for_selector('[data-e2e="user-bio"]', timeout=5000)
bio_text = await page.locator('[data-e2e="user-bio"]').inner_text()

# 提取邮箱
emails = re.findall(EMAIL_REGEX, bio_text)
if emails:
return index, emails[0] # 成功抓取邮箱

except Exception as e:
result_msg = "页面元素加载失败"

except Exception as e:
result_msg = "页面加载失败"

finally:
await browser.close()

except Exception as e:
result_msg = f"浏览器启动失败: {str(e)[:30]}..." # 截取部分错误信息

return index, result_msg

async def main():

file_name = "stickertok"

column_name = "主页地址"
# 读取Excel文件
file_path = "./target/" + file_name + ".xlsx"
df = pd.read_excel(file_path)

output_path = "./result/" + file_name + ".xlsx"


# 处理邮箱列数据类型
if "邮箱" in df.columns:
df["邮箱"] = df["邮箱"].astype(str) # 确保是字符串类型
else:
df["邮箱"] = "" # 新建列默认就是字符串类型


# 创建任务列表
tasks = []
for i, row in df.iterrows():
homepage = row.get(column_name)
if pd.isna(homepage) or not isinstance(homepage, str):
df.at[i, "邮箱"] = "无效链接"
continue
tasks.append(crawl_email(i, homepage))

# 并发执行任务
for i in range(0, len(tasks), 5):
batch = tasks[i:i + 3]
results = await asyncio.gather(*batch)

# 更新结果
for index, result in results:
df.at[index, "邮箱"] = result
status = "✅" if "@" in str(result) else "❌"
print(f"{status} 索引:{index} {df.at[index,column_name]} -> {result}")

# 保存结果
df.to_excel(output_path, index=False)
print(f"\n已保存到 {output_path}")
print(f"统计结果:")
print(f"- 成功抓取:{len(df[df['邮箱'].str.contains('@', na=False)])}条")
print(f"- 失败数量:{len(df) - len(df[df['邮箱'].str.contains('@', na=False)])}条")

if __name__ == "__main__":
asyncio.run(main())

启动

我的环境是python3,安装依赖和启动命令如下

1
2
3
4
pip3 install playwright pandas openpyxl
python3 -m playwright install
python3 spider.py

spider_retry.py

页面加载失败重扫脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import re
import asyncio
import random
import pandas as pd
from playwright.async_api import async_playwright

# 配置参数
CONCURRENCY = 5 # 并发数量
EMAIL_REGEX = r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}"
MIN_DELAY = 2 # 最小延迟(秒)
MAX_DELAY = 8 # 最大延迟(秒)

# 用户代理池
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15"
]

async def crawl_email(index, homepage):
"""异步爬取单个主页的邮箱"""
result_msg = "未抓取到邮箱"

try:
await asyncio.sleep(random.uniform(MIN_DELAY, MAX_DELAY))

async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(
user_agent=random.choice(USER_AGENTS),
viewport={"width": 1920, "height": 1080} # 添加默认视口大小
)
page = await context.new_page()

try:
await page.goto(homepage, timeout=30000, wait_until="domcontentloaded")

# 随机模拟人类操作
if random.random() > 0.3:
await page.mouse.move(
random.randint(0, 500),
random.randint(0, 500)
)
await page.evaluate("window.scrollBy(0, window.innerHeight/2)")
await asyncio.sleep(random.uniform(0.5, 1.5))

try:
await page.wait_for_selector('[data-e2e="user-bio"]', timeout=8000) # 延长等待时间
bio_text = await page.locator('[data-e2e="user-bio"]').inner_text()

emails = re.findall(EMAIL_REGEX, bio_text)
if emails:
return index, emails[0]

except Exception as e:
result_msg = "页面元素加载失败"

except Exception as e:
result_msg = f"页面加载失败: {str(e)[:50]}"

finally:
await browser.close()

except Exception as e:
result_msg = f"浏览器错误: {str(e)[:50]}"

return index, result_msg

async def main():
print(f"启动爬虫,并发数: {CONCURRENCY}")

# 读取文件并预处理
file_name = "3dsticker_result"
column_name = "主页地址"

file_path = "./result/" + file_name + ".xlsx"
df = pd.read_excel(file_path)
df["邮箱"] = df["邮箱"].astype(str)

output_path = "./retry/" + file_name + ".xlsx"



# 构建重试条件
retry_condition = (
df["邮箱"].str.contains("加载失败", na=False) |
df["邮箱"].str.lower().isin(["nan", "none", "null", ""])
)
retry_df = df[retry_condition]

if len(retry_df) == 0:
print("没有需要重试的记录")
return

print(f"发现 {len(retry_df)} 条需要重试的记录")

# 准备任务队列
tasks = []
for index, row in retry_df.iterrows():
homepage = row[column_name]
if pd.isna(homepage) or not isinstance(homepage, str) or not homepage.startswith(('http', 'www')):
df.at[index, "邮箱"] = "无效链接格式"
continue
tasks.append((index, homepage.strip()))

# 分批处理任务
total = len(tasks)
success_count = 0
for i in range(0, total, CONCURRENCY):
batch = tasks[i:i + CONCURRENCY]
batch_tasks = [crawl_email(idx, url) for idx, url in batch]

# 显示批次信息
batch_num = i // CONCURRENCY + 1
total_batches = (total + CONCURRENCY - 1) // CONCURRENCY
print(f"\n处理批次 {batch_num}/{total_batches} (任务 {i+1}-{min(i+CONCURRENCY, total)})")

results = await asyncio.gather(*batch_tasks)

# 处理结果
for index, result in results:
original_status = df.at[index, "邮箱"]
df.at[index, "邮箱"] = result

if "@" in str(result):
status = "✅ 成功"
success_count += 1
else:
status = "❌ 失败"

print(f"行{index+2}: {status} | 原状态: {original_status} | 新结果: {result}")

# 保存并输出统计

df.to_excel(output_path, index=False)

print("\n" + "="*50)
print(f"任务完成,文件已保存: {output_path}")
print(f"总记录数: {len(df)}")
print(f"本次处理: {len(retry_df)} 条")
print(f"成功抓取: {success_count} 条")
print(f"仍失败: {len(retry_df) - success_count} 条")
print("="*50)

if __name__ == "__main__":
asyncio.run(main())