python爬取网页图片
· Python · 暂无标签
注意:以下代码所需要部分依赖,含有基础依赖和需要联网下载的第三方库依赖。
  1. 基础依赖(Python自带,通常无需安装):

    • tkinter (GUI界面)osiosocketrandomtimeconcurrent.futures 等

    • 需要pip安装的第三方库

    • 复制pip install requests beautifulsoup4 pillow selenium webdriver-manager
    • 安装Python包依赖
    • 基础依赖
    • pip install requests beautifulsoup4 pillow selenium
    • 可选:自动管理浏览器驱动
    • pip install webdriver-manager
    2e2ff852d910899825308c89bb8cbfa.png
import os
import requests
import ctypes
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from PIL import Image
from io import BytesIO
import concurrent.futures
import random
import time
import base64
import socket
from tkinter import Tk, Label, Button, Frame, PhotoImage, messagebox
from tkinter.ttk import Combobox
import webbrowser
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

# 配置常量
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
]

SUPPORTED_EXTENSIONS = ['.jpg', '.jpeg',
                        '.png', '.bmp', '.gif', '.tiff', '.webp']
HEADERS = {
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
    'Accept-Language': 'en-US,en;q=0.5',
    'Connection': 'keep-alive'
}

# 代理配置 (示例,使用时请替换为实际可用的代理)
PROXIES = {
    'http': 'http://proxy.example.com:8080',
    'https': 'http://proxy.example.com:8080'
}


class ImageScraperApp:
    def __init__(self, root):
        self.root = root
        self.root.title("高级图片爬取工具")
        self.root.geometry("1000x800")

        # 初始化变量
        self.image_data = []
        self.current_page = 0
        self.images_per_page = 4
        self.protected_sites = []
        self.driver = None  # Selenium驱动

        # 创建UI
        self.create_widgets()

        # 检查网络连接
        if not self.check_network():
            messagebox.showerror("网络错误", "无法连接到互联网,请检查网络连接")
            return

        # 开始爬取
        self.start_scraping()

    def create_widgets(self):
        """创建图形界面"""
        # 主框架
        main_frame = Frame(self.root)
        main_frame.pack(fill="both", expand=True, padx=15, pady=15)

        # 状态区域
        status_frame = Frame(main_frame)
        status_frame.pack(fill="x", pady=5)

        self.status_label = Label(
            status_frame, text="准备爬取图片...", wraplength=900, anchor="w")
        self.status_label.pack(side="left", fill="x", expand=True)

        # 受保护网站提示
        self.protected_label = Label(
            main_frame, text="", fg="red", wraplength=900)
        self.protected_label.pack(fill="x", pady=5)

        # 图片显示区域
        self.image_frame = Frame(main_frame)
        self.image_frame.pack(fill="both", expand=True)

        # 分页控制
        page_frame = Frame(main_frame)
        page_frame.pack(pady=10)

        self.prev_btn = Button(page_frame, text="上一页",
                               command=self.prev_page, width=10)
        self.prev_btn.pack(side="left", padx=10)

        self.page_label = Label(page_frame, text="0/0", width=10)
        self.page_label.pack(side="left", padx=10)

        self.next_btn = Button(page_frame, text="下一页",
                               command=self.next_page, width=10)
        self.next_btn.pack(side="left", padx=10)

        # 操作按钮
        btn_frame = Frame(main_frame)
        btn_frame.pack(pady=15)

        buttons = [
            ("下载选中", self.download_selected),
            ("设为壁纸", self.set_selected_as_wallpaper),
            ("浏览器打开", self.open_in_browser),
            ("重新加载", self.start_scraping),
            ("退出", self.root.quit)
        ]

        for text, cmd in buttons:
            Button(btn_frame, text=text, command=cmd,
                   width=12).pack(side="left", padx=5)

        # 图片选择框
        self.selection_var = Combobox(btn_frame, state="readonly", width=15)
        self.selection_var.pack(side="left", padx=5)
        self.selection_var.bind("<<ComboboxSelected>>", self.on_image_selected)

        # 初始禁用按钮
        self.toggle_buttons(False)

    def init_selenium(self):
        """初始化Selenium驱动"""
        if self.driver is not None:
            return True

        try:
            options = Options()
            options.add_argument("--headless")
            options.add_argument("--disable-gpu")
            options.add_argument(f"user-agent={random.choice(USER_AGENTS)}")
            options.add_argument("--window-size=1920,1080")

            # 设置代理 (如果需要)
            # options.add_argument(f"--proxy-server={PROXIES['http']}")

            self.driver = webdriver.Chrome(options=options)
            self.driver.set_page_load_timeout(30)
            return True
        except Exception as e:
            self.status_label.config(text=f"初始化浏览器失败: {str(e)}")
            return False

    def close_selenium(self):
        """关闭Selenium驱动"""
        if self.driver:
            try:
                self.driver.quit()
            except:
                pass
            self.driver = None

    def check_network(self):
        """检查网络连接"""
        try:
            socket.create_connection(("8.8.8.8", 53), timeout=3)
            return True
        except OSError:
            return False

    def get_random_headers(self):
        """生成随机请求头"""
        headers = HEADERS.copy()
        headers['User-Agent'] = random.choice(USER_AGENTS)
        headers['Referer'] = 'https://www.google.com/'
        return headers

    def check_robots_txt(self, url):
        """检查robots.txt"""
        try:
            parsed = urlparse(url)
            robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
            response = requests.get(
                robots_url, headers=self.get_random_headers(), timeout=5)
            if response.status_code == 200:
                return response.text
        except:
            return None

    def extract_with_selenium(self, url):
        # """使用Selenium提取图片"""
        if not self.init_selenium():
            return []

        try:
            self.driver.get(url)
            WebDriverWait(self.driver, 10).until(
                EC.presence_of_all_elements_located((By.TAG_NAME, "img"))
            )

            # 等待页面完全加载
            time.sleep(random.uniform(1, 3))

            # 获取所有图片元素
            img_elements = self.driver.find_elements(By.TAG_NAME, "img")
            img_urls = []

            for img in img_elements:
                try:
                    src = img.get_attribute(
                        "src") or img.get_attribute("data-src")
                    if src:
                        img_url = urljoin(url, src)
                        parsed = urlparse(img_url)
                        if any(parsed.path.lower().endswith(ext) for ext in SUPPORTED_EXTENSIONS):
                            img_urls.append(img_url)
                except:
                    continue

            return list(set(img_urls))
        except Exception as e:
            print(f"Selenium提取失败: {str(e)}")
            return []
        finally:
            # 不关闭驱动以便复用
            pass

    def extract_with_requests(self, url):
        """使用requests提取图片"""
        try:
            # 随机延迟
            time.sleep(random.uniform(0.5, 2.5))

            response = requests.get(
                url,
                headers=self.get_random_headers(),
                # proxies=PROXIES,  # 取消注释以使用代理
                timeout=10,
                allow_redirects=True
            )

            # 检查防爬虫
            if response.status_code in [403, 429] or "captcha" in response.url.lower():
                self.protected_sites.append(url)
                self.update_protected_label()
                return []

            soup = BeautifulSoup(response.text, 'html.parser')
            img_tags = soup.find_all('img')
            img_urls = []

            for img in img_tags:
                src = img.get('src') or img.get(
                    'data-src') or img.get('data-original')
                if src:
                    img_url = urljoin(url, src)
                    parsed = urlparse(img_url)
                    if any(parsed.path.lower().endswith(ext) for ext in SUPPORTED_EXTENSIONS):
                        img_urls.append(img_url)

            return list(set(img_urls))
        except Exception as e:
            print(f"Requests提取失败: {str(e)}")
            return []

    def download_image(self, img_url):
        """下载图片"""
        try:
            response = requests.get(
                img_url,
                headers=self.get_random_headers(),
                # proxies=PROXIES,  # 取消注释以使用代理
                timeout=15,
                stream=True
            )

            if response.status_code == 200:
                content_type = response.headers.get('content-type', '')
                if 'image' in content_type:
                    img_data = BytesIO(response.content)
                    try:
                        img = Image.open(img_data)
                        return img, img_url
                    except:
                        return None, None
            return None, None
        except Exception as e:
            print(f"下载失败: {str(e)}")
            return None, None

    def start_scraping(self):
        """开始爬取流程"""
        self.image_data = []
        self.protected_sites = []
        self.current_page = 0
        self.toggle_buttons(False)

        # 获取脚本目录
        script_dir = os.path.dirname(os.path.abspath(__file__))
        url_file = os.path.join(script_dir, 'url.txt')

        # 读取URL文件
        try:
            with open(url_file, 'r', encoding='utf-8') as f:
                urls = [line.strip() for line in f if line.strip()]
                urls = [u for u in urls if self.is_valid_url(u)]
        except Exception as e:
            self.status_label.config(text=f"读取URL文件失败: {str(e)}")
            return

        if not urls:
            self.status_label.config(text="没有找到有效的URL")
            return

        self.status_label.config(text=f"开始从 {len(urls)} 个网站爬取图片...")
        self.root.update()

        # 第一阶段:收集所有图片URL
        all_img_urls = []
        for url in urls:
            # 检查robots.txt
            robots = self.check_robots_txt(url)
            if robots and "Disallow: /" in robots:
                self.protected_sites.append(url)
                continue

            self.status_label.config(text=f"正在处理: {url}")
            self.root.update()

            # 尝试使用requests提取
            img_urls = self.extract_with_requests(url)

            # 如果requests失败,尝试使用selenium
            if not img_urls:
                img_urls = self.extract_with_selenium(url)

            if img_urls:
                all_img_urls.extend(img_urls)
                self.status_label.config(
                    text=f"从 {url} 找到 {len(img_urls)} 张图片 (总计: {len(all_img_urls)})")
                self.root.update()

        if not all_img_urls:
            self.status_label.config(text="没有找到任何图片")
            self.close_selenium()
            return

        # 第二阶段:下载图片
        self.status_label.config(text=f"开始下载 {len(all_img_urls)} 张图片...")
        self.root.update()

        downloaded_images = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            futures = {executor.submit(
                self.download_image, url): url for url in all_img_urls}
            for i, future in enumerate(concurrent.futures.as_completed(futures)):
                url = futures[future]
                try:
                    img, _ = future.result()
                    if img:
                        downloaded_images.append((img, url))
                        self.status_label.config(
                            text=f"已下载 {i+1}/{len(all_img_urls)} 张图片")
                        self.root.update()
                except:
                    pass

        self.close_selenium()

        if not downloaded_images:
            self.status_label.config(text="没有成功下载任何图片")
            return

        # 准备图片数据
        self.prepare_image_data(downloaded_images)

        if not self.image_data:
            self.status_label.config(text="没有有效的图片数据")
            return

        # 更新UI
        self.update_ui_after_scraping()

    def prepare_image_data(self, downloaded_images):
        """处理下载的图片数据"""
        self.image_data = []
        for img, img_url in downloaded_images:
            try:
                # 调整预览大小
                img.thumbnail((450, 450))

                # 转换为base64
                buffered = BytesIO()
                img.save(buffered, format="PNG")
                img_str = base64.b64encode(buffered.getvalue()).decode("utf-8")

                # 保存原始数据
                buffered = BytesIO()
                img.save(buffered, format=img.format if img.format else "PNG")
                original_data = buffered.getvalue()

                self.image_data.append({
                    "preview": img_str,
                    "url": img_url,
                    "original_data": original_data,
                    "format": img.format.lower() if img.format else "png"
                })
            except Exception as e:
                print(f"处理图片失败: {str(e)}")

    def update_ui_after_scraping(self):
        """爬取完成后更新UI"""
        self.selection_var["values"] = [
            f"图片 {i+1}" for i in range(len(self.image_data))]
        if self.image_data:
            self.selection_var.current(0)

        self.show_page(0)
        self.toggle_buttons(True)
        self.status_label.config(text=f"完成! 共找到 {len(self.image_data)} 张可用图片")

    # 其余UI相关方法保持不变...
    # (包括show_page, prev_page, next_page, on_image_selected,
    # get_selected_image, download_selected, set_selected_as_wallpaper,
    # open_in_browser, toggle_buttons等方法)


if __name__ == "__main__":
    root = Tk()
    app = ImageScraperApp(root)
    root.mainloop()


关于合法使用网络爬虫的重要声明



本声明旨在明确使用Python爬虫技术时应当遵守的法律法规和道德准则。网络爬虫作为一项技术工具,其本身是中性的,但使用方式可能涉及法律风险。请在使用本爬虫技术前仔细阅读并理解本声明内容。

一、基本法律原则

  1. 遵守《数据安全法》和《个人信息保护法》

      • 禁止爬取任何涉及国家秘密、商业秘密或个人隐私的数据

      • 爬取个人信息必须获得明确授权,并遵循最小必要原则

  2. 尊重《反不正当竞争法》

      • 不得通过爬虫技术实施不正当竞争行为

      • 禁止爬取后对原网站内容进行实质性替代

  3. 遵守《计算机信息系统安全保护条例》

      • 不得对目标网站造成技术干扰或破坏

      • 禁止绕过技术保护措施获取数据

二、合规使用指南

  1. robots.txt协议遵守

      • 必须检查目标网站的robots.txt文件

      • 严格遵循其中定义的爬取规则和限制

  2. 访问频率控制

      • 设置合理的请求间隔(建议≥3秒/请求)

      • 禁止使用多线程/分布式爬虫对单一网站造成负担

  3. 数据使用限制

      • 仅可爬取公开可用且无特殊声明的数据

      • 商业性使用前必须获得明确授权

  4. 版权声明保留

      • 保留原始数据的所有版权声明

      • 禁止删除或修改数据来源标识

三、禁止行为清单

  1. 绝对禁止的爬取行为包括但不限于:

      • 爬取用户密码、支付信息等敏感数据

      • 绕过登录验证获取非公开内容

      • 破解反爬机制或使用自动化工具注册账号

      • 对政府/金融机构等敏感网站进行爬取

  2. 高风险爬取行为:

      • 爬取社交媒体的用户关系网络

      • 大规模爬取用户生成内容(UGC)

      • 爬取后用于商业牟利而未获授权

四、法律风险提示

  1. 可能涉及的违法情形:

      • 非法获取计算机信息系统数据罪

      • 侵犯公民个人信息罪

      • 破坏计算机信息系统罪

      • 不正当竞争行为

  2. 民事责任风险:

      • 网站运营方可能提起民事诉讼

      • 面临停止侵害、赔偿损失等责任

五、最佳实践建议

  1. 爬取前必须:

      • 详细阅读目标网站的服务条款

      • 联系网站管理员获取书面授权(商业用途)

      • 进行法律合规性评估

  2. 技术实现要求:

      • 设置明显的User-Agent标识

      • 实现请求频率自动调节机制

      • 建立完整的数据溯源记录

  3. 数据存储与使用:

      • 建立数据分级保护制度

      • 定期清理不再需要的数据

      • 商业使用时进行数据脱敏处理

六、免责声明

本工具/代码仅限用于合法学习与研究目的,开发者不承担因使用者违反相关法律法规而产生的任何法律责任。使用者应当自行确保其爬虫行为符合所有适用的法律法规。

任何违反本声明及法律法规的使用行为,均与工具/代码开发者无关,由使用者承担全部法律责任。

本文最后更新时间 2025-04-13
文章链接地址:
https://xzlo.blog/index.php/2025/04/05/36.html
本站文章除注明[转载|引用|原文]出处外,均为本站原生内容,转载前请注明出处


留言