发布于 

tk获取粉丝数量

spider_followers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import re
import asyncio
import random
import pandas as pd
from playwright.async_api import async_playwright

# 随机延迟范围(秒)
MIN_DELAY = 2
MAX_DELAY = 8

# 用户代理池
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15"
]

def parse_follower_count(text):
"""解析粉丝数量文本,支持K、M等单位"""
try:
text = text.strip().upper()
# 移除可能的逗号
text = text.replace(',', '')

# 处理K(千)
if 'K' in text:
number = float(text.replace('K', ''))
return int(number * 1000)
# 处理M(百万)
elif 'M' in text:
number = float(text.replace('M', ''))
return int(number * 1000000)
# 处理B(十亿)
elif 'B' in text:
number = float(text.replace('B', ''))
return int(number * 1000000000)
# 纯数字
else:
return int(float(text))
except:
return None

async def crawl_followers(index, homepage):
"""异步爬取单个主页的粉丝数量"""
result_msg = "未抓取到粉丝数" # 默认状态

try:
# 随机延迟
await asyncio.sleep(random.uniform(MIN_DELAY, MAX_DELAY))

async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(
user_agent=random.choice(USER_AGENTS)
)
page = await context.new_page()

try:
# 设置超时
await page.goto(homepage, timeout=30000, wait_until="domcontentloaded")

# 随机滚动
if random.random() > 0.5:
await page.evaluate("window.scrollBy(0, window.innerHeight)")
await asyncio.sleep(random.uniform(0.5, 2))

# 尝试获取粉丝数量信息
try:
# 多种选择器尝试
selectors = [
'[data-e2e="followers-count"]',
'[data-e2e="followers-number"]',
'strong[data-e2e="followers-count"]',
'strong[title*="Followers"]',
'[title*="Followers"]'
]

follower_text = None
for selector in selectors:
try:
await page.wait_for_selector(selector, timeout=5000)
follower_text = await page.locator(selector).first.inner_text()
if follower_text:
break
except:
continue

# 如果以上选择器都失败,尝试通过文本匹配查找
if not follower_text:
try:
# 查找包含"Followers"文本的元素
followers_element = page.locator('strong:near(:text("Followers"))')
follower_text = await followers_element.first.inner_text(timeout=5000)
except:
pass

if follower_text:
# 解析粉丝数量
count = parse_follower_count(follower_text)
if count is not None:
return index, count # 成功抓取粉丝数
else:
result_msg = f"解析失败: {follower_text}"
else:
result_msg = "页面元素未找到"

except Exception as e:
result_msg = f"页面元素加载失败: {str(e)[:30]}"

except Exception as e:
result_msg = f"页面加载失败: {str(e)[:30]}"

finally:
await browser.close()

except Exception as e:
result_msg = f"浏览器启动失败: {str(e)[:30]}..." # 截取部分错误信息

return index, result_msg

async def main():

file_name = "tiktok_links_table"

column_name = "tiktok_url"
# 读取Excel文件
file_path = "./target/" + file_name + ".xlsx"
df = pd.read_excel(file_path)

output_path = "./result/" + file_name + "_followers.xlsx"


# 处理粉丝数列数据类型
if "粉丝数" in df.columns:
df["粉丝数"] = df["粉丝数"].astype(str) # 确保是字符串类型
else:
df["粉丝数"] = "" # 新建列默认就是字符串类型


# 创建任务列表
tasks = []
for i, row in df.iterrows():
homepage = row.get(column_name)
if pd.isna(homepage) or not isinstance(homepage, str):
df.at[i, "粉丝数"] = "无效链接"
continue
tasks.append(crawl_followers(i, homepage))

# 并发执行任务
for i in range(0, len(tasks), 5):
batch = tasks[i:i + 3]
results = await asyncio.gather(*batch)

# 更新结果
for index, result in results:
df.at[index, "粉丝数"] = result
status = "✅" if isinstance(result, int) else "❌"
print(f"{status} 索引:{index} {df.at[index,column_name]} -> {result}")

# 保存结果
df.to_excel(output_path, index=False)
print(f"\n已保存到 {output_path}")
print(f"统计结果:")

# 统计成功抓取的数量(数值型)
success_count = len(df[df['粉丝数'].apply(lambda x: isinstance(x, (int, float)) and x != "")])
print(f"- 成功抓取:{success_count}条")
print(f"- 失败数量:{len(df) - success_count}条")

# 如果有成功抓取的,显示统计信息
if success_count > 0:
numeric_followers = df[df['粉丝数'].apply(lambda x: isinstance(x, (int, float)))]
if len(numeric_followers) > 0:
print(f"- 平均粉丝数:{int(numeric_followers['粉丝数'].mean())}人")
print(f"- 最高粉丝数:{int(numeric_followers['粉丝数'].max())}人")
print(f"- 最低粉丝数:{int(numeric_followers['粉丝数'].min())}人")

if __name__ == "__main__":
asyncio.run(main())

spider_followers_retry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
import re
import asyncio
import random
import pandas as pd
from playwright.async_api import async_playwright

# 配置参数
CONCURRENCY = 5 # 并发数量
MIN_DELAY = 2 # 最小延迟(秒)
MAX_DELAY = 8 # 最大延迟(秒)

# 用户代理池
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15"
]

def parse_follower_count(text):
"""解析粉丝数量文本,支持K、M等单位"""
try:
text = text.strip().upper()
# 移除可能的逗号
text = text.replace(',', '')

# 处理K(千)
if 'K' in text:
number = float(text.replace('K', ''))
return int(number * 1000)
# 处理M(百万)
elif 'M' in text:
number = float(text.replace('M', ''))
return int(number * 1000000)
# 处理B(十亿)
elif 'B' in text:
number = float(text.replace('B', ''))
return int(number * 1000000000)
# 纯数字
else:
return int(float(text))
except:
return None

async def crawl_followers(index, homepage):
"""异步爬取单个主页的粉丝数量"""
result_msg = "未抓取到粉丝数"

try:
await asyncio.sleep(random.uniform(MIN_DELAY, MAX_DELAY))

async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(
user_agent=random.choice(USER_AGENTS),
viewport={"width": 1920, "height": 1080} # 添加默认视口大小
)
page = await context.new_page()

try:
await page.goto(homepage, timeout=30000, wait_until="domcontentloaded")

# 随机模拟人类操作
if random.random() > 0.3:
await page.mouse.move(
random.randint(0, 500),
random.randint(0, 500)
)
await page.evaluate("window.scrollBy(0, window.innerHeight/2)")
await asyncio.sleep(random.uniform(0.5, 1.5))

try:
# 多种选择器尝试
selectors = [
'[data-e2e="followers-count"]',
'[data-e2e="followers-number"]',
'strong[data-e2e="followers-count"]',
'strong[title*="Followers"]',
'[title*="Followers"]'
]

follower_text = None
for selector in selectors:
try:
await page.wait_for_selector(selector, timeout=8000) # 延长等待时间
follower_text = await page.locator(selector).first.inner_text()
if follower_text:
break
except:
continue

# 如果以上选择器都失败,尝试通过文本匹配查找
if not follower_text:
try:
# 查找包含"Followers"文本的元素
followers_element = page.locator('strong:near(:text("Followers"))')
follower_text = await followers_element.first.inner_text(timeout=8000)
except:
pass

if follower_text:
# 解析粉丝数量
count = parse_follower_count(follower_text)
if count is not None:
return index, count
else:
result_msg = f"解析失败: {follower_text}"
else:
result_msg = "页面元素未找到"

except Exception as e:
result_msg = "页面元素加载失败"

except Exception as e:
result_msg = f"页面加载失败: {str(e)[:50]}"

finally:
await browser.close()

except Exception as e:
result_msg = f"浏览器错误: {str(e)[:50]}"

return index, result_msg

async def main():
print(f"启动粉丝数重试爬虫,并发数: {CONCURRENCY}")

# 读取文件并预处理
file_name = "tiktok_links_table_followers"
column_name = "tiktok_url"

file_path = "./result/" + file_name + ".xlsx"
df = pd.read_excel(file_path)
df["粉丝数"] = df["粉丝数"].astype(str)

output_path = "./retry/" + file_name + ".xlsx"


# 构建重试条件:筛选失败或为空的记录
retry_condition = (
df["粉丝数"].str.contains("失败", na=False) |
df["粉丝数"].str.contains("未抓取到", na=False) |
df["粉丝数"].str.contains("无效链接", na=False) |
df["粉丝数"].str.contains("元素", na=False) |
df["粉丝数"].str.contains("浏览器", na=False) |
df["粉丝数"].str.lower().isin(["nan", "none", "null", ""])
)
retry_df = df[retry_condition]

if len(retry_df) == 0:
print("没有需要重试的记录")
return

print(f"发现 {len(retry_df)} 条需要重试的记录")

# 准备任务队列
tasks = []
for index, row in retry_df.iterrows():
homepage = row[column_name]
if pd.isna(homepage) or not isinstance(homepage, str) or not homepage.startswith(('http', 'www')):
df.at[index, "粉丝数"] = "无效链接格式"
continue
tasks.append((index, homepage.strip()))

# 分批处理任务
total = len(tasks)
success_count = 0
for i in range(0, total, CONCURRENCY):
batch = tasks[i:i + CONCURRENCY]
batch_tasks = [crawl_followers(idx, url) for idx, url in batch]

# 显示批次信息
batch_num = i // CONCURRENCY + 1
total_batches = (total + CONCURRENCY - 1) // CONCURRENCY
print(f"\n处理批次 {batch_num}/{total_batches} (任务 {i+1}-{min(i+CONCURRENCY, total)})")

results = await asyncio.gather(*batch_tasks)

# 处理结果
for index, result in results:
original_status = df.at[index, "粉丝数"]
df.at[index, "粉丝数"] = result

if isinstance(result, int):
status = "✅ 成功"
success_count += 1
else:
status = "❌ 失败"

print(f"行{index+2}: {status} | 原状态: {original_status} | 新结果: {result}")

# 保存并输出统计
df.to_excel(output_path, index=False)

print("\n" + "="*50)
print(f"任务完成,文件已保存: {output_path}")
print(f"总记录数: {len(df)}")
print(f"本次处理: {len(retry_df)} 条")
print(f"成功抓取: {success_count} 条")
print(f"仍失败: {len(retry_df) - success_count} 条")

# 如果有成功抓取的,显示统计信息
if success_count > 0:
numeric_followers = df[df['粉丝数'].apply(lambda x: isinstance(x, (int, float)) and str(x).isdigit())]
if len(numeric_followers) > 0:
numeric_followers['粉丝数'] = pd.to_numeric(numeric_followers['粉丝数'], errors='coerce')
print(f"- 平均粉丝数: {int(numeric_followers['粉丝数'].mean())}人")
print(f"- 最高粉丝数: {int(numeric_followers['粉丝数'].max())}人")
print(f"- 最低粉丝数: {int(numeric_followers['粉丝数'].min())}人")

print("="*50)

if __name__ == "__main__":
asyncio.run(main())