Louis Mylle ea4cab15c3 feat: Add installation scripts for Windows and Unix-based systems
- Created `install_and_run.bat` for Windows installation and setup.
- Created `install_and_run.sh` for Unix-based systems installation and setup.
- Removed `main.py` as it is no longer needed.
- Updated `requirements.txt` to specify package versions and added PyQt5.
- Deleted `start.bat` as it is redundant.
- Added unit tests for core functionality and scraping modes.
- Implemented input validation utilities in `utils/validators.py`.
- Added support for dual scraping modes in the scraper.
2026-01-10 14:45:00 +01:00

513 lines
19 KiB
Python

"""
Core scraper functionality extracted from main.py with callback support for GUI integration.
"""
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.options import Options
import time
import random
import os
import sys
from pathlib import Path
# Disable SSL verification warnings and errors
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class Scraper:
"""
EBoek.info web scraper with GUI callback support.
This class handles the core scraping functionality while providing
callback mechanisms for progress updates to a GUI application.
"""
def __init__(self, headless=False, progress_callback=None, scraping_mode=0):
"""
Initialize the scraper with optional GUI callback support.
Args:
headless (bool): Whether to run Chrome in headless mode
progress_callback (callable): Optional callback function for progress updates
Callback signature: callback(event_type: str, data: dict)
scraping_mode (int): Scraping mode (0=All Comics, 1=Latest Comics)
"""
self.progress_callback = progress_callback
self._stop_requested = False
self.scraping_mode = scraping_mode
# Set up Chrome options with anti-detection measures
chrome_options = Options()
if headless:
chrome_options.add_argument('--headless')
# Fix SSL and certificate issues
chrome_options.add_argument('--ignore-ssl-errors')
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_argument('--disable-web-security')
chrome_options.add_argument('--allow-running-insecure-content')
chrome_options.add_argument('--disable-extensions')
# Fix DevTools connection issues
chrome_options.add_argument('--remote-debugging-port=0')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--no-sandbox')
# Make it look more human
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
chrome_options.add_argument('user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
# Suppress logging
chrome_options.add_experimental_option('excludeSwitches', ['enable-logging'])
chrome_options.add_experimental_option('useAutomationExtension', False)
chrome_options.add_argument('--disable-logging')
chrome_options.add_argument('--log-level=3')
# Set cross-platform download directory
downloads_path = str(Path.home() / "Downloads")
prefs = {
"download.default_directory": downloads_path,
"download.prompt_for_download": False,
"download.directory_upgrade": True,
"safebrowsing.enabled": True
}
chrome_options.add_experimental_option("prefs", prefs)
self.driver = webdriver.Chrome(options=chrome_options)
self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
self._emit_progress("scraper_initialized", {"headless": headless, "downloads_path": downloads_path})
def _emit_progress(self, event_type, data):
"""
Internal method to emit progress updates via callback.
Args:
event_type (str): Type of event (e.g., 'page_started', 'comic_completed')
data (dict): Event data
"""
if self.progress_callback:
try:
self.progress_callback(event_type, data)
except Exception as e:
# Don't let callback errors crash the scraper
pass
def request_stop(self):
"""Request the scraper to stop gracefully at the next opportunity."""
self._stop_requested = True
self._emit_progress("stop_requested", {})
def human_delay(self, min_sec=0.5, max_sec=2):
"""
Simulate human-like delay with cancellation support.
Args:
min_sec (float): Minimum delay time
max_sec (float): Maximum delay time
"""
if self._stop_requested:
return
delay_time = random.uniform(min_sec, max_sec)
self._emit_progress("delay_started", {"duration": delay_time})
time.sleep(delay_time)
def human_type(self, element, text):
"""
Type text character by character with human-like delays.
Args:
element: Selenium web element to type into
text (str): Text to type
"""
for char in text:
if self._stop_requested:
return
element.send_keys(char)
time.sleep(random.uniform(0.05, 0.15))
def navigate(self, url):
"""
Navigate to a URL with human-like delay.
Args:
url (str): URL to navigate to
"""
if self._stop_requested:
return False
self._emit_progress("navigation_started", {"url": url})
self.driver.get(url)
self.human_delay(1, 3)
self._emit_progress("navigation_completed", {"url": url})
return True
def login(self, username, password):
"""
Login to EBoek.info with provided credentials.
Args:
username (str): Username for login
password (str): Password for login
Returns:
bool: True if login successful, False otherwise
"""
if self._stop_requested:
return False
self._emit_progress("login_started", {"username": username})
try:
self.driver.get("https://eboek.info/komerin")
self.human_delay(2, 4)
if self._stop_requested:
return False
# Find and fill username field
username_field = self.driver.find_element(By.CSS_SELECTOR, "input[type='text']")
self.human_type(username_field, username)
self.human_delay(0.5, 1)
if self._stop_requested:
return False
# Find and fill password field
password_field = self.driver.find_element(By.CSS_SELECTOR, "input[type='password']")
self.human_type(password_field, password)
self.human_delay(0.5, 1.5)
if self._stop_requested:
return False
# Submit the form
submit_button = self.driver.find_element(By.CSS_SELECTOR, "input[type='submit']")
submit_button.click()
self.human_delay(2, 4)
# Check if login was successful (basic check)
# You could enhance this by checking for specific elements that appear after login
current_url = self.driver.current_url
login_successful = "komerin" not in current_url
if login_successful:
self._emit_progress("login_success", {"username": username})
else:
self._emit_progress("login_failed", {"username": username, "error": "Login appears to have failed"})
return login_successful
except Exception as e:
self._emit_progress("login_failed", {"username": username, "error": str(e)})
return False
def trigger_download(self, url):
"""
Open URL in new tab to trigger browser download.
Args:
url (str): URL of file to download
Returns:
bool: True if download triggered successfully
"""
if self._stop_requested:
return False
try:
# Store current window handle
current_window = self.driver.current_window_handle
# Use JavaScript to open URL in new tab with same session
self.driver.execute_script(f"window.open('{url}', '_blank');")
# Wait for download to complete and tab to auto-close
self.human_delay(3, 5)
# Switch back to original window
self.driver.switch_to.window(current_window)
self._emit_progress("download_triggered", {"url": url})
return True
except Exception as e:
self._emit_progress("download_failed", {"url": url, "error": str(e)})
return False
def scrape(self, start_page=1, end_page=1):
"""
Scrape comics from specified page range.
Args:
start_page (int): Starting page number
end_page (int): Ending page number
Returns:
dict: Summary of scraping results
"""
if self._stop_requested:
return {"success": False, "reason": "Cancelled before starting"}
# Determine base URL and URL pattern based on scraping mode
if self.scraping_mode == 1: # Latest Comics
base_url = "https://eboek.info/laatste"
mode_name = "Latest Comics"
else: # All Comics (default)
base_url = "https://eboek.info/stripverhalen-alle"
mode_name = "All Comics"
total_pages = end_page - start_page + 1
total_comics_processed = 0
total_downloads_triggered = 0
errors = []
self._emit_progress("scraping_started", {
"start_page": start_page,
"end_page": end_page,
"total_pages": total_pages,
"mode": mode_name
})
for page_num in range(start_page, end_page + 1):
if self._stop_requested:
break
# Construct page URL based on scraping mode
if self.scraping_mode == 1: # Latest Comics
page_url = f"{base_url}?_page={page_num}&ref=dw"
else: # All Comics
if page_num == 1:
page_url = base_url
else:
page_url = f"{base_url}/page/{page_num}/"
current_page_index = page_num - start_page + 1
self._emit_progress("page_started", {
"page_number": page_num,
"page_index": current_page_index,
"total_pages": total_pages,
"url": page_url
})
# Navigate to the page
if not self.navigate(page_url):
continue
# Scroll down a bit like a human would to see content
self.driver.execute_script("window.scrollTo(0, 300)")
self.human_delay(1, 2)
if self._stop_requested:
break
try:
# Find all comic strip links using mode-specific CSS selectors
if self.scraping_mode == 1: # Latest Comics page
# For "laatste" page - target only title links to avoid duplicates
comic_links = self.driver.find_elements(By.CSS_SELECTOR, '.pt-cv-wrapper .pt-cv-ifield h5.pt-cv-title a')
else: # All Comics page (default)
# For "stripverhalen-alle" page - original selector
comic_links = self.driver.find_elements(By.CSS_SELECTOR, 'h2.post-title a')
comic_count = len(comic_links)
self._emit_progress("page_comics_found", {
"page_number": page_num,
"comic_count": comic_count
})
# Store URLs first to avoid stale element issues
comic_urls = [link.get_attribute('href') for link in comic_links]
# Take a break between pages (more likely and longer)
if page_num > start_page:
if random.random() < 0.7: # 70% chance of break
break_time = random.uniform(15, 45) # 15-45 seconds
self._emit_progress("page_break_started", {
"duration": break_time,
"page_number": page_num
})
time.sleep(break_time)
else:
# Even if no long break, always pause a bit
short_break = random.uniform(5, 10)
self._emit_progress("short_break", {
"duration": short_break,
"page_number": page_num
})
time.sleep(short_break)
# Process all comics on this page
for i, url in enumerate(comic_urls, 1):
if self._stop_requested:
break
self._emit_progress("comic_started", {
"page_number": page_num,
"comic_index": i,
"total_comics": comic_count,
"url": url
})
# Random chance to scroll on main page before clicking
if random.random() < 0.4:
scroll_amount = random.randint(100, 500)
self.driver.execute_script(f"window.scrollBy(0, {scroll_amount})")
self.human_delay(0.5, 1.5)
# Open in new tab to keep main page
self.driver.execute_script("window.open('');")
self.driver.switch_to.window(self.driver.window_handles[-1])
try:
self.driver.get(url)
self.human_delay(2, 4)
if self._stop_requested:
break
# Sometimes scroll down to see the content
if random.random() < 0.6:
self.driver.execute_script("window.scrollTo(0, 400)")
self.human_delay(0.5, 1.5)
# Extract title
try:
title = self.driver.find_element(By.CSS_SELECTOR, 'h1.entry-title').text
except:
title = f"Comic {i} on page {page_num}"
self._emit_progress("comic_title_extracted", {
"title": title,
"url": url
})
# Small delay before clicking download
self.human_delay(0.8, 2)
if self._stop_requested:
break
# Execute the downloadLinks() JavaScript function
self.driver.execute_script("downloadLinks()")
self.human_delay(1.5, 3)
# Find all download links in the table
download_links = self.driver.find_elements(By.CSS_SELECTOR, 'table a')
download_count = len(download_links)
self._emit_progress("download_links_found", {
"title": title,
"download_count": download_count
})
# Trigger download for each file
for j, link in enumerate(download_links):
if self._stop_requested:
break
file_url = link.get_attribute('href')
file_name = link.text.strip()
self._emit_progress("download_started", {
"file_name": file_name,
"url": file_url,
"index": j + 1,
"total": download_count
})
if self.trigger_download(file_url):
total_downloads_triggered += 1
# Human-like delay between downloads
if j < len(download_links) - 1:
delay_time = random.uniform(2, 5)
self._emit_progress("download_delay", {
"duration": delay_time,
"remaining": len(download_links) - j - 1
})
time.sleep(delay_time)
total_comics_processed += 1
self._emit_progress("comic_completed", {
"title": title,
"downloads_triggered": download_count,
"page_number": page_num,
"comic_index": i
})
# Take a longer break every 5 comics
if i % 5 == 0 and i < len(comic_urls):
break_time = random.uniform(3, 7)
self._emit_progress("comic_batch_break", {
"duration": break_time,
"comics_processed": i
})
time.sleep(break_time)
except Exception as e:
error_msg = f"Error processing {url}: {e}"
errors.append(error_msg)
self._emit_progress("comic_error", {
"url": url,
"error": str(e)
})
# Human would pause after an error
self.human_delay(2, 4)
# Close tab and switch back
try:
self.driver.close()
self.driver.switch_to.window(self.driver.window_handles[0])
except:
# Handle case where tab might have closed itself
if len(self.driver.window_handles) > 0:
self.driver.switch_to.window(self.driver.window_handles[0])
# Vary the delay between comics
self.human_delay(1, 3)
self._emit_progress("page_completed", {
"page_number": page_num,
"comics_processed": len(comic_urls)
})
except Exception as e:
error_msg = f"Error processing page {page_num}: {e}"
errors.append(error_msg)
self._emit_progress("page_error", {
"page_number": page_num,
"error": str(e)
})
# Generate summary
summary = {
"success": not self._stop_requested,
"total_pages_processed": min(page_num - start_page + 1, total_pages) if 'page_num' in locals() else 0,
"total_comics_processed": total_comics_processed,
"total_downloads_triggered": total_downloads_triggered,
"errors": errors,
"cancelled": self._stop_requested
}
self._emit_progress("scraping_completed", summary)
return summary
def close(self):
"""Close the browser and clean up resources."""
try:
self.driver.quit()
self._emit_progress("scraper_closed", {})
except Exception as e:
self._emit_progress("scraper_close_error", {"error": str(e)})