1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
| import re import asyncio import random import pandas as pd from playwright.async_api import async_playwright
CONCURRENCY = 5 EMAIL_REGEX = r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}" MIN_DELAY = 2 MAX_DELAY = 8
USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15" ]
async def crawl_email(index, homepage): """异步爬取单个主页的邮箱""" result_msg = "未抓取到邮箱" try: await asyncio.sleep(random.uniform(MIN_DELAY, MAX_DELAY)) async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( user_agent=random.choice(USER_AGENTS), viewport={"width": 1920, "height": 1080} ) page = await context.new_page() try: await page.goto(homepage, timeout=30000, wait_until="domcontentloaded") if random.random() > 0.3: await page.mouse.move( random.randint(0, 500), random.randint(0, 500) ) await page.evaluate("window.scrollBy(0, window.innerHeight/2)") await asyncio.sleep(random.uniform(0.5, 1.5)) try: await page.wait_for_selector('[data-e2e="user-bio"]', timeout=8000) bio_text = await page.locator('[data-e2e="user-bio"]').inner_text() emails = re.findall(EMAIL_REGEX, bio_text) if emails: return index, emails[0] except Exception as e: result_msg = "页面元素加载失败" except Exception as e: result_msg = f"页面加载失败: {str(e)[:50]}" finally: await browser.close() except Exception as e: result_msg = f"浏览器错误: {str(e)[:50]}" return index, result_msg
async def main(): print(f"启动爬虫,并发数: {CONCURRENCY}") file_name = "3dsticker_result" column_name = "主页地址"
file_path = "./result/" + file_name + ".xlsx" df = pd.read_excel(file_path) df["邮箱"] = df["邮箱"].astype(str)
output_path = "./retry/" + file_name + ".xlsx"
retry_condition = ( df["邮箱"].str.contains("加载失败", na=False) | df["邮箱"].str.lower().isin(["nan", "none", "null", ""]) ) retry_df = df[retry_condition] if len(retry_df) == 0: print("没有需要重试的记录") return print(f"发现 {len(retry_df)} 条需要重试的记录") tasks = [] for index, row in retry_df.iterrows(): homepage = row[column_name] if pd.isna(homepage) or not isinstance(homepage, str) or not homepage.startswith(('http', 'www')): df.at[index, "邮箱"] = "无效链接格式" continue tasks.append((index, homepage.strip())) total = len(tasks) success_count = 0 for i in range(0, total, CONCURRENCY): batch = tasks[i:i + CONCURRENCY] batch_tasks = [crawl_email(idx, url) for idx, url in batch] batch_num = i // CONCURRENCY + 1 total_batches = (total + CONCURRENCY - 1) // CONCURRENCY print(f"\n处理批次 {batch_num}/{total_batches} (任务 {i+1}-{min(i+CONCURRENCY, total)})") results = await asyncio.gather(*batch_tasks) for index, result in results: original_status = df.at[index, "邮箱"] df.at[index, "邮箱"] = result if "@" in str(result): status = "✅ 成功" success_count += 1 else: status = "❌ 失败" print(f"行{index+2}: {status} | 原状态: {original_status} | 新结果: {result}") df.to_excel(output_path, index=False) print("\n" + "="*50) print(f"任务完成,文件已保存: {output_path}") print(f"总记录数: {len(df)}") print(f"本次处理: {len(retry_df)} 条") print(f"成功抓取: {success_count} 条") print(f"仍失败: {len(retry_df) - success_count} 条") print("="*50)
if __name__ == "__main__": asyncio.run(main())
|