import hashlib import json import csv import os import random import re import subprocess import tkinter as tk import sys from time import time, sleep from shutil import rmtree, copyfileobj from datetime import datetime, timedelta from tkinter import filedialog from urllib.parse import urlparse from unicodedata import normalize from tempfile import NamedTemporaryFile import asyncio import grequests import aiohttp from bs4 import BeautifulSoup from seleniumbase import SB import requests from packaging import version import ffmpeg_downloader as ffdl CURRENT_VERSION = "1.3.0" SUPPORTED_FORMATS = [".mp4", ".mkv", ".mov", ".avi", ".ts"] def read_config_by_key(config_file, key): script_dir = os.path.dirname(os.path.realpath(__file__)) config_path = os.path.join(script_dir, "config", f"{config_file}.json") with open(config_path, "r", encoding="utf-8") as input_config_file: config = json.load(input_config_file) return config.get(key, None) def get_default_video_format(): default_video_format = read_config_by_key("settings", "DEFAULT_VIDEO_FORMAT") if default_video_format in SUPPORTED_FORMATS: return default_video_format return ".mp4" def get_default_directory(): default_directory = read_config_by_key("settings", "DEFAULT_DIRECTORY") if not os.path.exists(default_directory): default_directory = "~/Downloads/" if os.name == "nt" and default_directory: default_directory = default_directory.replace("/", "\\") return os.path.expanduser(default_directory) def get_default_downloader(): try: default_downloader = read_config_by_key("settings", "DEFAULT_DOWNLOADER") if default_downloader in ["ffmpeg", "yt-dlp"]: return default_downloader return "ffmpeg" except Exception: return "ffmpeg" def get_yt_dlp_custom_options(): try: custom_options = read_config_by_key("settings", "YT_DLP_OPTIONS") if custom_options: return custom_options.split() return [] except Exception: return [] def print_main_menu(): default_video_format = get_default_video_format() or "mp4" menu_options = [ "1) VOD Recovery", "2) Clip Recovery", f"3) Download VOD ({default_video_format.lstrip('.')})", "4) Unmute & Check M3U8 Availability", "5) Options", "6) Exit", ] while True: print("\n".join(menu_options)) try: choice = int(input("\nChoose an option: ")) if choice not in range(1, len(menu_options) + 1): raise ValueError("Invalid option") return choice except ValueError: print("\n✖ Invalid option! Please try again:\n") def print_video_mode_menu(): vod_type_options = [ "1) Website Video Recovery", "2) Manual Recovery", "3) Bulk Video Recovery from SullyGnome CSV Export", "4) Return", ] while True: print("\n".join(vod_type_options)) try: choice = int(input("\nSelect VOD Recovery Type: ")) if choice not in range(1, len(vod_type_options) + 1): raise ValueError("Invalid option") return choice except ValueError: print("\n✖ Invalid option! Please try again:\n") def print_video_recovery_menu(): vod_recovery_options = [ "1) Website Video Recovery", "2) Manual Recovery", "3) Return", ] while True: print("\n".join(vod_recovery_options)) try: choice = int(input("\nSelect VOD Recovery Method: ")) if choice not in range(1, len(vod_recovery_options) + 1): raise ValueError("Invalid option") return choice except ValueError: print("\n✖ Invalid option! Please try again:\n") def print_clip_type_menu(): clip_type_options = [ "1) Recover All Clips from a VOD", "2) Find Random Clips from a VOD", "3) Download Clip from Twitch URL", "4) Bulk Recover Clips from SullyGnome CSV Export", "5) Return", ] while True: print("\n".join(clip_type_options)) try: choice = int(input("\nSelect Clip Recovery Type: ")) if choice not in range(1, len(clip_type_options) + 1): raise ValueError("Invalid option") return choice except ValueError: print("\n✖ Invalid option! Please try again:\n") def print_clip_recovery_menu(): clip_recovery_options = [ "1) Website Clip Recovery", "2) Manual Clip Recovery", "3) Return", ] while True: print("\n".join(clip_recovery_options)) try: choice = int(input("\nSelect Clip Recovery Method: ")) if choice not in range(1, len(clip_recovery_options) + 1): raise ValueError("Invalid option") return choice except ValueError: print("\n✖ Invalid option! Please try again:\n") def print_bulk_clip_recovery_menu(): bulk_clip_recovery_options = [ "1) Single CSV File", "2) Multiple CSV Files", "3) Return", ] while True: print("\n".join(bulk_clip_recovery_options)) try: choice = int(input("\nSelect Bulk Clip Recovery Source: ")) if choice not in range(1, len(bulk_clip_recovery_options) + 1): raise ValueError("Invalid option") return str(choice) except ValueError: print("\n✖ Invalid option! Please try again:\n") def print_clip_format_menu(): clip_format_options = [ "1) Default Format ([VodID]-offset-[interval])", "2) Alternate Format (vod-[VodID]-offset-[interval])", "3) Legacy Format ([VodID]-index-[interval])", "4) Return", ] print() while True: print("\n".join(clip_format_options)) try: choice = int(input("\nSelect Clip URL Format: ")) if choice == 4: return run_vod_recover() if choice not in range(1, len(clip_format_options) + 1): raise ValueError("Invalid option") else: return str(choice) except ValueError: print("\n✖ Invalid option! Please try again:\n") def print_download_type_menu(): download_type_options = [ "1) From M3U8 Link", "2) From M3U8 File", "3) From Twitch URL (Only for VODs or Highlights still up on Twitch)", "4) Return", ] while True: print("\n".join(download_type_options)) try: choice = int(input("\nSelect Download Type: ")) if choice not in range(1, len(download_type_options) + 1): raise ValueError("Invalid option") return choice except ValueError: print("\n✖ Invalid option! Please try again:\n") def print_handle_m3u8_availability_menu(): handle_m3u8_availability_options = [ "1) Check if M3U8 file has muted segments", "2) Unmute & Remove invalid segments", "3) Return", ] while True: print("\n".join(handle_m3u8_availability_options)) try: choice = int(input("\nSelect Option: ")) if choice not in range(1, len(handle_m3u8_availability_options) + 1): raise ValueError("Invalid option") return choice except ValueError: print("\n✖ Invalid option! Please try again:\n") def print_options_menu(): options_menu = [ f"1) Set Default Video Format \033[94m({get_default_video_format().lstrip('.') or 'mp4'})\033[0m", f"2) Set Download Directory \033[94m({get_default_directory() or '~/Downloads/'})\033[0m", f"3) Set Default Downloader \033[94m({read_config_by_key('settings', 'DEFAULT_DOWNLOADER') or 'ffmpeg'})\033[0m", "4) Check for Updates", "5) Open settings.json File", "6) Help", "7) Return", ] while True: print("\n".join(options_menu)) try: choice = int(input("\nSelect Option: ")) if choice not in range(1, len(options_menu) + 1): raise ValueError("Invalid option") return choice except ValueError: print("\n✖ Invalid option! Please try again:\n") def print_get_m3u8_link_menu(): m3u8_url = input("Enter M3U8 Link: ").strip(" \"'") if m3u8_url.endswith(".m3u8"): return m3u8_url print("\n✖ Invalid M3U8 link! Please try again:\n") return print_get_m3u8_link_menu() def get_websites_tracker_url(): while True: tracker_url = input("Enter Twitchtracker/Streamscharts/Sullygnome url: ").strip() if re.match(r"^(https?:\/\/)?(www\.)?(twitchtracker\.com|streamscharts\.com|sullygnome\.com)\/.", tracker_url): return tracker_url print("\n✖ Invalid URL! Please enter a URL from Twitchtracker, Streamscharts, or Sullygnome.\n") def print_get_twitch_url_menu(): twitch_url = input("Enter Twitch URL: ").strip(" \"'") if "twitch.tv" in twitch_url: return twitch_url print("\n✖ Invalid Twitch URL! Please try again:\n") return print_get_twitch_url_menu() def get_twitch_or_tracker_url(): while True: url = input("Enter Twitchtracker/Streamscharts/Sullygnome/Twitch URL: ").strip() if re.match(r"^(https?:\/\/)?(www\.)?(twitchtracker\.com|streamscharts\.com|sullygnome\.com|twitch\.tv)\/.", url): return url print("\n✖ Invalid URL! Please enter a URL from Twitchtracker, Streamscharts, Sullygnome, or Twitch.\n") def get_latest_version(): try: res = requests.get("https://api.github.com/repos/MacielG1/VodRecovery/releases/latest", timeout=30) if res.status_code == 200: release_info = res.json() return release_info["tag_name"] else: return None except Exception: return None def check_for_updates(): latest_version = version.parse(get_latest_version()) current_version = version.parse(CURRENT_VERSION) if latest_version and current_version: if latest_version != current_version: print(f"\n\033[34mNew version ({latest_version}) - Download at: https://github.com/MacielG1/VodRecovery/releases/latest\033[0m") input("\nPress Enter to continue...") return run_vod_recover() else: print(f"\n\033[92m\u2713 Vod Recovery is updated to {CURRENT_VERSION}!\033[0m") input("\nPress Enter to continue...") return else: print("\n✖ Could not check for updates!") def sanitize_filename(filename, restricted=False): if filename == "": return "" def replace_insane(char): if not restricted and char == "\n": return "\0 " elif not restricted and char in '":<>?|/\\': return {"/": "\u29f8", "\\": "\u29f9"}.get(char, chr(ord(char) + 0xFEE0)) elif char == "?" or ord(char) < 32 or ord(char) == 127: return "" elif char == '"': return "" if restricted else "'" elif char == ":": return "\0_\0-" if restricted else "\0 \0-" elif char in "\\/|<>": return "\0_" if restricted and ( char in "!&'()[]{}$;`^,#" or char.isspace() or ord(char) > 127 ): return "\0_" return char if restricted: filename = normalize("NFKC", filename) filename = re.sub( r"[0-9]+(?::[0-9]+)+", lambda m: m.group(0).replace(":", "_"), filename ) result = "".join(map(replace_insane, filename)) result = re.sub(r"(\0.)(?:(?=\1)..)+", r"\1", result) strip_re = r"(?:\0.|[ _-])" result = re.sub(f"^\0.{strip_re}|{strip_re}\0.$", "", result) result = result.replace("\0", "") or "_" while "__" in result: result = result.replace("__", "_") result = result.strip("_") if restricted and result.startswith("-_"): result = result[2:] if result.startswith("-"): result = "_" + result[len("-") :] result = result.lstrip(".") if not result: result = "_" return result def read_config_file(config_file): current_dir = os.path.dirname(os.path.abspath(__file__)) config_path = os.path.join(current_dir, "config", f"{config_file}.json") with open(config_path, encoding="utf-8") as config_file: config = json.load(config_file) return config def open_file(file_path): if sys.platform.startswith("darwin"): subprocess.call(("open", file_path)) elif os.name == "nt": subprocess.Popen(["start", file_path], shell=True) elif os.name == "posix": subprocess.call(("xdg-open", file_path)) else: print(f"\nFile Location: {file_path}") def print_help(): try: help_data = read_config_file("help") print("\n--------------- Help Section ---------------") for menu, options in help_data.items(): print(f"\n{menu.replace('_', ' ').title()}:") for option, description in options.items(): print(f" {option}: {description}") print("\n --------------- End of Help Section ---------------\n") except Exception as error: print(f"An unexpected error occurred: {error}") def read_text_file(text_file_path): lines = [] with open(text_file_path, "r", encoding="utf-8") as text_file: for line in text_file: lines.append(line.rstrip()) return lines def write_text_file(input_text, destination_path): with open(destination_path, "a+", encoding="utf-8") as text_file: text_file.write(input_text + "\n") def write_m3u8_to_file(m3u8_link, destination_path): with open(destination_path, "w", encoding="utf-8") as m3u8_file: m3u8_file.write(requests.get(m3u8_link, timeout=30).text) return m3u8_file def read_csv_file(csv_file_path): with open(csv_file_path, "r", encoding="utf-8") as csv_file: return list(csv.reader(csv_file)) def get_current_version(): current_version = read_config_by_key("settings", "CURRENT_VERSION") if current_version: return current_version else: sys.exit("\033[91m \n✖ Unable to retrieve CURRENT_VERSION from the settings.json files \n\033[0m") def get_log_filepath(streamer_name, video_id): log_filename = os.path.join(get_default_directory(), f"{streamer_name}_{video_id}_log.txt") return log_filename def get_vod_filepath(streamer_name, video_id): vod_filename = os.path.join(get_default_directory(), f"{streamer_name}_{video_id}.m3u8") return vod_filename def get_script_directory(): return os.path.dirname(os.path.realpath(__file__)) def return_user_agent(): script_dir = os.path.dirname(os.path.abspath(__file__)) user_agents = read_text_file(os.path.join(script_dir, "lib", "user_agents.txt")) header = {"user-agent": random.choice(user_agents)} return header def calculate_epoch_timestamp(timestamp, seconds): try: epoch_timestamp = ((datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S") + timedelta(seconds=seconds)) - datetime(1970, 1, 1)).total_seconds() return epoch_timestamp except ValueError: return None def calculate_days_since_broadcast(start_timestamp): if start_timestamp is None: return 0 vod_age = datetime.today() - datetime.strptime(start_timestamp, "%Y-%m-%d %H:%M:%S") return max(vod_age.days, 0) def is_video_muted(m3u8_link): response = requests.get(m3u8_link, timeout=30).text return bool("unmuted" in response) def calculate_broadcast_duration_in_minutes(hours, minutes): return (int(hours) 60) + int(minutes) def calculate_max_clip_offset(video_duration): return (video_duration 60) + 2000 def parse_streamer_from_csv_filename(csv_filename): _, file_name = os.path.split(csv_filename) streamer_name = file_name.strip() return streamer_name.split()[0] def parse_streamer_from_m3u8_link(m3u8_link): indices = [i.start() for i in re.finditer("_", m3u8_link)] streamer_name = m3u8_link[indices[0] + 1 : indices[-2]] return streamer_name def parse_video_id_from_m3u8_link(m3u8_link): indices = [i.start() for i in re.finditer("_", m3u8_link)] video_id = m3u8_link[ indices[0] + len(parse_streamer_from_m3u8_link(m3u8_link)) + 2 : indices[-1] ] return video_id def parse_streamer_and_video_id_from_m3u8_link(m3u8_link): indices = [i.start() for i in re.finditer("_", m3u8_link)] streamer_name = m3u8_link[indices[0] + 1 : indices[-2]] video_id = m3u8_link[indices[0] + len(streamer_name) + 2 : indices[-1]] return f" - {streamer_name} [{video_id}]" def parse_streamscharts_url(streamscharts_url): try: streamer_name = streamscharts_url.split("/channels/", 1)[1].split("/streams/")[0] video_id = streamscharts_url.split("/streams/", 1)[1] return streamer_name, video_id except IndexError: print("\033[91m \n✖ Invalid Streamscharts URL! Please try again:\n \033[0m") input("Press Enter to continue...") return run_vod_recover() def parse_twitchtracker_url(twitchtracker_url): try: streamer_name = twitchtracker_url.split(".com/", 1)[1].split("/streams/")[0] video_id = twitchtracker_url.split("/streams/", 1)[1] return streamer_name, video_id except IndexError: print("\033[91m \n✖ Invalid Twitchtracker URL! Please try again:\n \033[0m") input("Press Enter to continue...") return run_vod_recover() def parse_sullygnome_url(sullygnome_url): try: streamer_name = sullygnome_url.split("/channel/", 1)[1].split("/")[0] video_id = sullygnome_url.split("/stream/", 1)[1] return streamer_name, video_id except IndexError: print("\033[91m \n✖ Invalid SullyGnome URL! Please try again:\n \033[0m") input("Press Enter to continue...") return run_vod_recover() def set_default_video_format(): print("\nSelect the default video format") for i, format_option in enumerate(SUPPORTED_FORMATS, start=1): print(f"{i}) {format_option.lstrip('.')}") user_option = str(input("\nChoose a video format: ")) if user_option in [str(i) for i in range(1, len(SUPPORTED_FORMATS) + 1)]: selected_format = SUPPORTED_FORMATS[int(user_option) - 1] script_dir = get_script_directory() config_file_path = os.path.join(script_dir, "config", "settings.json") try: with open(config_file_path, "r", encoding="utf-8") as config_file: config_data = json.load(config_file) if not config_data: print("Error: No config file found.") return config_data["DEFAULT_VIDEO_FORMAT"] = selected_format with open(config_file_path, "w", encoding="utf-8") as config_file: json.dump(config_data, config_file, indent=4) print(f"\n\033[92m\u2713 Default video format set to: {selected_format.lstrip('.')}\033[0m") except (FileNotFoundError, json.JSONDecodeError) as error: print(f"Error: {error}") else: print("\n✖ Invalid option! Please try again:\n") return def set_default_directory(): print("\nSelect the default directory") window = tk.Tk() window.wm_attributes("-topmost", 1) window.withdraw() file_path = filedialog.askdirectory( parent=window, initialdir=dir, title="Select A Default Directory" ) if file_path: if not file_path.endswith("/"): file_path += "/" script_dir = get_script_directory() config_file_path = os.path.join(script_dir, "config", "settings.json") try: with open(config_file_path, "r", encoding="utf-8") as config_file: config_data = json.load(config_file) config_data["DEFAULT_DIRECTORY"] = file_path with open(config_file_path, "w", encoding="utf-8") as config_file: json.dump(config_data, config_file, indent=4) print(f"\n\033[92m\u2713 Default directory set to: {file_path}\033[0m") except (FileNotFoundError, json.JSONDecodeError) as error: print(f"Error: {error}") else: print("\nNo folder selected! Returning to main menu...") window.destroy() def set_default_downloader(): # Choose between yt-dlp and ffmpeg print("\nSelect the default downloader") DOWNLOADERS = ["ffmpeg", "yt-dlp"] for i, downloader_option in enumerate(DOWNLOADERS, start=1): print(f"{i}) {downloader_option.lstrip('.')}") user_option = str(input("\nChoose a downloader: ")) if user_option in [str(i) for i in range(1, len(DOWNLOADERS) + 1)]: selected_downloader = DOWNLOADERS[int(user_option) - 1] if selected_downloader == "yt-dlp": get_yt_dlp_path() script_dir = get_script_directory() config_file_path = os.path.join(script_dir, "config", "settings.json") try: with open(config_file_path, "r", encoding="utf-8") as config_file: config_data = json.load(config_file) config_data["DEFAULT_DOWNLOADER"] = selected_downloader with open(config_file_path, "w", encoding="utf-8") as config_file: json.dump(config_data, config_file, indent=4) print(f"\n\033[92m\u2713 Default downloader set to: {selected_downloader}\033[0m") except (FileNotFoundError, json.JSONDecodeError) as error: print(f"Error: {error}") else: print("\n✖ Invalid option! Please try again:\n") return def get_m3u8_file_dialog(): window = tk.Tk() window.wm_attributes("-topmost", 1) window.withdraw() directory = get_default_directory() file_path = filedialog.askopenfilename( parent=window, initialdir=directory, title="Select A File", filetypes=(("M3U8 files", ".m3u8"), ("All files", "")), ) window.destroy() return file_path def parse_vod_filename(m3u8_video_filename): base = os.path.basename(m3u8_video_filename) streamer_name, video_id = base.split(".m3u8", 1)[0].rsplit("_", 1) return streamer_name, video_id def parse_vod_filename_with_Brackets(m3u8_video_filename): base = os.path.basename(m3u8_video_filename) streamer_name, video_id = base.split(".m3u8", 1)[0].rsplit("_", 1) return f" - {streamer_name} [{video_id}]" def remove_chars_from_ordinal_numbers(datetime_string): ordinal_numbers = ["th", "nd", "st", "rd"] for exclude_string in ordinal_numbers: if exclude_string in datetime_string: return datetime_string.replace(datetime_string.split(" ")[1], datetime_string.split(" ")[1][:-len(exclude_string)]) def generate_website_links(streamer_name, video_id, tracker_url=None): website_list = [ f"https://sullygnome.com/channel/{streamer_name}/stream/{video_id}", f"https://twitchtracker.com/{streamer_name}/streams/{video_id}", f"https://streamscharts.com/channels/{streamer_name}/streams/{video_id}", ] if tracker_url: website_list = [link for link in website_list if tracker_url not in link] return website_list def convert_url(url, target): # converts url to the specified target website patterns = { "sullygnome": "https://sullygnome.com/channel/{}/stream/{}", "twitchtracker": "https://twitchtracker.com/{}/streams/{}", "streamscharts": "https://streamscharts.com/channels/{}/streams/{}", } parsed_url = urlparse(url) streamer, video_id = None, None if "sullygnome" in url: streamer = parsed_url.path.split("/")[2] video_id = parsed_url.path.split("/")[4] elif "twitchtracker" in url: streamer = parsed_url.path.split("/")[1] video_id = parsed_url.path.split("/")[3] elif "streamscharts" in url: streamer = parsed_url.path.split("/")[2] video_id = parsed_url.path.split("/")[4] if streamer and video_id: return patterns[target].format(streamer, video_id) def extract_offset(clip_url): clip_offset = re.search(r"(?:-offset|-index)-(\d+)", clip_url) return clip_offset.group(1) def get_clip_format(video_id, offsets): default_clip_list = [f"https://clips-media-assets2.twitch.tv/{video_id}-offset-{i}.mp4" for i in range(0, offsets, 2)] alternate_clip_list = [f"https://clips-media-assets2.twitch.tv/vod-{video_id}-offset-{i}.mp4" for i in range(0, offsets, 2)] legacy_clip_list = [f"https://clips-media-assets2.twitch.tv/{video_id}-index-{i:010}.mp4" for i in range(offsets)] clip_format_dict = { "1": default_clip_list, "2": alternate_clip_list, "3": legacy_clip_list, } return clip_format_dict def get_random_clip_information(): while True: url = get_websites_tracker_url() if "streamscharts" in url: _, video_id = parse_streamscharts_url(url) break if "twitchtracker" in url: _, video_id = parse_twitchtracker_url(url) break if "sullygnome" in url: _, video_id = parse_sullygnome_url(url) break print("\n✖ Link not supported! Please try again:\n") while True: duration = get_time_input_HH_MM("Enter stream duration in (HH:MM) format: ") hours, minutes = map(int, duration.split(":")) if hours >= 0 and minutes >= 0: break return video_id, hours, minutes def manual_clip_recover(): while True: streamer_name = input("Enter the Streamer Name: ") if streamer_name.strip(): break else: print("\n✖ No streamer name! Please try again:\n") while True: video_id = input("Enter the Video ID (from: Twitchtracker/Streamscharts/Sullygnome): ") if video_id.strip(): break else: print("\n✖ No video ID! Please try again:\n") while True: duration = get_time_input_HH_MM("Enter stream duration in (HH:MM) format: ") hours, minutes = map(int, duration.split(":")) if hours >= 0 and minutes >= 0: total_minutes = hours 60 + minutes break clip_recover(streamer_name, video_id, total_minutes) def website_clip_recover(): tracker_url = get_websites_tracker_url() if not tracker_url.startswith("https://"): tracker_url = "https://" + tracker_url if "streamscharts" in tracker_url: streamer, video_id = parse_streamscharts_url(tracker_url) print("\nRetrieving stream duration from Streamscharts") duration_streamscharts = parse_duration_streamscharts(tracker_url) # print(f"Duration: {duration_streamscharts}") clip_recover(streamer, video_id, int(duration_streamscharts)) elif "twitchtracker" in tracker_url: streamer, video_id = parse_twitchtracker_url(tracker_url) print("\nRetrieving stream duration from Twitchtracker") duration_twitchtracker = parse_duration_twitchtracker(tracker_url) # print(f"Duration: {duration_twitchtracker}") clip_recover(streamer, video_id, int(duration_twitchtracker)) elif "sullygnome" in tracker_url: streamer, video_id = parse_sullygnome_url(tracker_url) print("\nRetrieving stream duration from Sullygnome") duration_sullygnome = parse_duration_sullygnome(tracker_url) if duration_sullygnome is None: print("Could not retrieve duration from Sullygnome. Try a different URL.\n") return print_main_menu() # print(f"Duration: {duration_sullygnome}") clip_recover(streamer, video_id, int(duration_sullygnome)) else: print("\n✖ Link not supported! Try again...\n") return run_vod_recover() def manual_vod_recover(): while True: streamer_name = input("Enter the Streamer Name: ") if streamer_name.lower().strip(): break print("\n✖ No streamer name! Please try again:\n") while True: video_id = input("Enter the Video ID (from: Twitchtracker/Streamscharts/Sullygnome): ") if video_id.strip(): break else: print("\n✖ No video ID! Please try again:\n") timestamp = get_time_input_YYYY_MM_DD_HH_MM_SS("Enter VOD Datetime (YYYY-MM-DD HH:MM:SS): ") m3u8_link = vod_recover(streamer_name, video_id, timestamp) if m3u8_link is None: sys.exit("No M3U8 link found! Exiting...") m3u8_source = process_m3u8_configuration(m3u8_link) handle_download_menu(m3u8_source) def handle_vod_recover(url, url_parser, datetime_parser, website_name): streamer, video_id = url_parser(url) print(f"Checking {streamer} VOD ID: {video_id}") stream_datetime, source_duration = datetime_parser(url) m3u8_link = vod_recover(streamer, video_id, stream_datetime, url) if m3u8_link is None: input(f"\nNo M3U8 link found from {website_name}! Press Enter to return...") return run_vod_recover() m3u8_source = process_m3u8_configuration(m3u8_link) m3u8_duration = return_m3u8_duration(m3u8_link) if source_duration and int(source_duration) >= m3u8_duration + 10: print(f"The duration from {website_name} exceeds the M3U8 duration. This may indicate a split stream, try checking Streamscharts for another URL.") return m3u8_source, stream_datetime def website_vod_recover(): url = get_twitch_or_tracker_url() if not url.startswith("https://"): url = "https://" + url if "streamscharts" in url: return handle_vod_recover(url, parse_streamscharts_url, parse_datetime_streamscharts, "Streamscharts") if "twitchtracker" in url: return handle_vod_recover(url, parse_twitchtracker_url, parse_datetime_twitchtracker, "Twitchtracker") if "sullygnome" in url: new_tracker_url = re.sub(r"/\d+/", "/", url) return handle_vod_recover(new_tracker_url, parse_sullygnome_url, parse_datetime_sullygnome, "Sullygnome") twitch_recover(url) print("\n✖ Link not supported! Returning to main menu...") return run_vod_recover() def get_all_clip_urls(clip_format_dict, clip_format_list): combined_clip_format_list = [] for key, value in clip_format_dict.items(): if key in clip_format_list: combined_clip_format_list += value return combined_clip_format_list async def fetch_status(session, url, retries=3, timeout=30): for attempt in range(retries): try: async with session.get(url, timeout=timeout) as response: if response.status == 200: return url except (aiohttp.ClientError, asyncio.TimeoutError): if attempt < retries - 1: await asyncio.sleep(2) return None async def get_vod_urls(streamer_name, video_id, start_timestamp): m3u8_link_list = [] script_dir = get_script_directory() domains = read_text_file(os.path.join(script_dir, "lib", "domains.txt")) print("\nSearching for M3U8 URL...") for seconds in range(60): base_url = f"{streamer_name}_{video_id}_{int(calculate_epoch_timestamp(start_timestamp, seconds))}" hashed_base_url = str(hashlib.sha1(base_url.encode("utf-8")).hexdigest())[:20] for domain in domains: if domain.strip(): m3u8_link_list.append(f"{domain.strip()}{hashed_base_url}_{base_url}/chunked/index-dvr.m3u8") successful_url = None async with aiohttp.ClientSession() as session: tasks = [fetch_status(session, url) for url in m3u8_link_list] for index, task in enumerate(asyncio.as_completed(tasks), 1): url = await task if url: successful_url = url print(f"\n\n\033[92m\u2713 Found URL: {successful_url}\033[0m") break print(f"\rSearching {index} out of {len(m3u8_link_list)} URLs", end="", flush=True) return successful_url def return_supported_qualities(m3u8_link): if m3u8_link is None: return None always_best_quality = read_config_by_key("settings", "ALWAYS_BEST_QUALITY") if always_best_quality is True and "chunked" in m3u8_link: return m3u8_link print("\nChecking for available qualities...") resolutions = [ "chunked", "1080p60", "1080p30", "720p60", "720p30", "480p60", "480p30", ] request_list = [ grequests.get(m3u8_link.replace("chunked", resolution)) for resolution in resolutions ] responses = grequests.map(request_list) valid_resolutions = [ resolution for resolution, response in zip(resolutions, responses) if response and response.status_code == 200 ] if not valid_resolutions: return None valid_resolutions.sort(key=resolutions.index) if always_best_quality: return m3u8_link.replace("chunked", valid_resolutions[0]) print("\nQuality Options:") for idx, resolution in enumerate(valid_resolutions, 1): if "chunked" in resolution: print(f"{idx}. {resolution.replace('chunked', 'Chunked (Best Quality)')}") else: print(f"{idx}. {resolution}") user_option = get_user_resolution_choice(m3u8_link, valid_resolutions) return user_option def get_user_resolution_choice(m3u8_link, valid_resolutions): try: choice = int(input("Choose a quality: ")) if 1 <= choice <= len(valid_resolutions): quality = valid_resolutions[choice - 1] user_option = m3u8_link.replace("chunked", quality) return user_option print("\n✖ Invalid option! Please try again:\n") return get_user_resolution_choice(m3u8_link, valid_resolutions) except ValueError: print("\n✖ Invalid option! Please try again:\n") return get_user_resolution_choice(m3u8_link, valid_resolutions) def parse_website_duration(duration_string): if isinstance(duration_string, list): duration_string = " ".join(duration_string) if not isinstance(duration_string, str): try: duration_string = str(duration_string) except Exception: return 0 pattern = r"(\d+)\s(h(?:ou)?r?s?|m(?:in)?(?:ute)?s?)" matches = re.findall(pattern, duration_string, re.IGNORECASE) if not matches: try: minutes = int(duration_string) return calculate_broadcast_duration_in_minutes(0, minutes) except ValueError: return 0 time_units = {"h": 0, "m": 0} for value, unit in matches: time_units[unit[0].lower()] = int(value) return calculate_broadcast_duration_in_minutes(time_units["h"], time_units["m"]) def handle_cloudflare(sb): try: sb.uc_gui_handle_captcha() except Exception: pass finally: # delete folder generated by selenium browser if os.path.exists("downloaded_files"): rmtree("downloaded_files") def parse_streamscharts_duration_data(bs): streamscharts_duration = bs.find_all("div", {"class": "text-xs font-bold"})[3].text streamscharts_duration_in_minutes = parse_website_duration(streamscharts_duration) return streamscharts_duration_in_minutes def parse_duration_streamscharts(streamscharts_url): try: # Method 1: Using requests response = requests.get(streamscharts_url, headers=return_user_agent(), timeout=10) if response.status_code == 200: bs = BeautifulSoup(response.content, "html.parser") return parse_streamscharts_duration_data(bs) # Method 2: Using grequests retries = 10 reqs = [grequests.get(streamscharts_url, headers=return_user_agent()) for _ in range(retries)] for response in grequests.imap(reqs, size=100): if response.status_code == 200: bs = BeautifulSoup(response.content, "html.parser") return parse_streamscharts_duration_data(bs) # Method 3: Using Selenium print("Opening Streamcharts with browser...") with SB(uc=True) as sb: sb.uc_open_with_reconnect(streamscharts_url, reconnect_time=3) handle_cloudflare(sb) bs = BeautifulSoup(sb.driver.page_source, "html.parser") return parse_streamscharts_duration_data(bs) except Exception: pass sullygnome_url = convert_url(streamscharts_url, "sullygnome") if sullygnome_url: return parse_duration_sullygnome(sullygnome_url) return None def parse_twitchtracker_duration_data(bs): twitchtracker_duration = bs.find_all("div", {"class": "g-x-s-value"})[0].text twitchtracker_duration_in_minutes = parse_website_duration(twitchtracker_duration) return twitchtracker_duration_in_minutes def parse_duration_twitchtracker(twitchtracker_url, try_alternative=True): try: # Method 1: Using requests response = requests.get(twitchtracker_url, headers=return_user_agent(), timeout=10) if response.status_code == 200: bs = BeautifulSoup(response.content, "html.parser") return parse_twitchtracker_duration_data(bs) # Method 2: Using grequests retries = 10 reqs = [grequests.get(twitchtracker_url, headers=return_user_agent()) for _ in range(retries)] for response in grequests.imap(reqs, size=100): if response.status_code == 200: bs = BeautifulSoup(response.content, "html.parser") return parse_twitchtracker_duration_data(bs) # Method 3: Using Selenium print("Opening Twitchtracker with browser...") with SB(uc=True) as sb: sb.uc_open_with_reconnect(twitchtracker_url, reconnect_time=3) handle_cloudflare(sb) bs = BeautifulSoup(sb.driver.page_source, "html.parser") return parse_twitchtracker_duration_data(bs) except Exception: pass if try_alternative: sullygnome_url = convert_url(twitchtracker_url, "sullygnome") if sullygnome_url: return parse_duration_sullygnome(sullygnome_url) return None def parse_sullygnome_duration_data(bs): sullygnome_duration = bs.find_all("div", {"class": "MiddleSubHeaderItemValue"})[7].text.split(",") sullygnome_duration_in_minutes = parse_website_duration(sullygnome_duration) return sullygnome_duration_in_minutes def parse_duration_sullygnome(sullygnome_url): try: # Method 1: Using requests response = requests.get(sullygnome_url, headers=return_user_agent(), timeout=10) if response.status_code == 200: bs = BeautifulSoup(response.content, "html.parser") return parse_sullygnome_duration_data(bs) # Method 2: Using grequests retries = 10 reqs = [grequests.get(sullygnome_url, headers=return_user_agent()) for _ in range(retries)] for response in grequests.imap(reqs, size=10): if response.status_code == 200: bs = BeautifulSoup(response.content, "html.parser") return parse_sullygnome_duration_data(bs) # Method 3: Using Selenium print("Opening Sullygnome with browser...") with SB(uc=True) as sb: sb.uc_open_with_reconnect(sullygnome_url, reconnect_time=3) handle_cloudflare(sb) bs = BeautifulSoup(sb.driver.page_source, "html.parser") return parse_sullygnome_duration_data(bs) except Exception: pass sullygnome_url = convert_url(sullygnome_url, "twitchtracker") if sullygnome_url: return parse_duration_twitchtracker(sullygnome_url, try_alternative=False) return None def parse_streamscharts_datetime_data(bs): stream_date = ( bs.find_all("time", {"class": "ml-2 font-bold"})[0] .text.strip() .replace(",", "") + ":00" ) stream_datetime = datetime.strptime(stream_date, "%d %b %Y %H:%M:%S").strftime("%Y-%m-%d %H:%M:%S") streamcharts_duration = bs.find_all("div", {"class": "text-xs font-bold"})[3].text streamcharts_duration_in_minutes = parse_website_duration(streamcharts_duration) print(f"Datetime: {stream_datetime}") return stream_datetime, streamcharts_duration_in_minutes def parse_datetime_streamscharts(streamscharts_url): print("\nFetching data from Streamscharts...") try: # Method 1: Using requests response = requests.get( streamscharts_url, headers=return_user_agent(), timeout=10 ) if response.status_code == 200: bs = BeautifulSoup(response.content, "html.parser") return parse_streamscharts_datetime_data(bs) # Method 2: Using grequests retries = 10 reqs = [grequests.get(streamscharts_url, headers=return_user_agent()) for _ in range(retries)] for response in grequests.imap(reqs, size=100): if response.status_code == 200: bs = BeautifulSoup(response.content, "html.parser") return parse_streamscharts_datetime_data(bs) # Method 3: Using Selenium print("Opening Streamscharts with browser...") with SB(uc=True) as sb: sb.uc_open_with_reconnect(streamscharts_url, reconnect_time=3) handle_cloudflare(sb) bs = BeautifulSoup(sb.driver.page_source, "html.parser") return parse_streamscharts_datetime_data(bs) except Exception: pass return None, None def parse_twitchtracker_datetime_data(bs): twitchtracker_datetime = bs.find_all("div", {"class": "stream-timestamp-dt"})[0].text twitchtracker_duration = bs.find_all("div", {"class": "g-x-s-value"})[0].text twitchtracker_duration_in_minutes = parse_website_duration(twitchtracker_duration) print(f"Datetime: {twitchtracker_datetime}") return twitchtracker_datetime, twitchtracker_duration_in_minutes def parse_datetime_twitchtracker(twitchtracker_url): print("\nFetching data from Twitchtracker...") try: # Method 1: Using requests response = requests.get(twitchtracker_url, headers=return_user_agent(), timeout=10) if response.status_code == 200: bs = BeautifulSoup(response.content, "html.parser") return parse_twitchtracker_datetime_data(bs) # Method 2: Using grequests retries = 10 reqs = [grequests.get(twitchtracker_url, headers=return_user_agent()) for _ in range(retries)] for response in grequests.imap(reqs, size=100): if response.status_code == 200: bs = BeautifulSoup(response.content, "html.parser") return parse_twitchtracker_datetime_data(bs) # Method 3: Using Selenium print("Opening Twitchtracker with browser...") with SB(uc=True) as sb: sb.uc_open_with_reconnect(twitchtracker_url, reconnect_time=3) handle_cloudflare(sb) bs = BeautifulSoup(sb.driver.page_source, "html.parser") description_meta = bs.find("meta", {"name": "description"}) twitchtracker_datetime = None if description_meta: description_content = description_meta.get("content") match = re.search(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}", description_content) if match: twitchtracker_datetime = match.group(0) print(f"Datetime: {twitchtracker_datetime}") twitchtracker_duration = bs.find_all("div", {"class": "g-x-s-value"})[0].text twitchtracker_duration_in_minutes = parse_website_duration(twitchtracker_duration) return twitchtracker_datetime, twitchtracker_duration_in_minutes except Exception: pass return None, None def parse_sullygnome_datetime_data(bs): stream_date = bs.find_all("div", {"class": "MiddleSubHeaderItemValue"})[6].text modified_stream_date = remove_chars_from_ordinal_numbers(stream_date) formatted_stream_date = datetime.strptime(modified_stream_date, "%A %d %B %I:%M%p").strftime("%m-%d %H:%M:%S") sullygnome_datetime = str(datetime.now().year) + "-" + formatted_stream_date sullygnome_duration = bs.find_all("div", {"class": "MiddleSubHeaderItemValue"})[7].text.split(",") sullygnome_duration_in_minutes = parse_website_duration(sullygnome_duration) print(f"Datetime: {sullygnome_datetime}") return sullygnome_datetime, sullygnome_duration_in_minutes def parse_datetime_sullygnome(sullygnome_url): print("\nFetching data from Sullygnome...") try: # Method 1: Using requests response = requests.get(sullygnome_url, headers=return_user_agent(), timeout=10) if response.status_code == 200: bs = BeautifulSoup(response.content, "html.parser") return parse_sullygnome_datetime_data(bs) # Method 2: Using grequests retries = 10 reqs = [grequests.get(sullygnome_url, headers=return_user_agent()) for _ in range(retries)] for response in grequests.imap(reqs, size=100): if response.status_code == 200: bs = BeautifulSoup(response.content, "html.parser") return parse_sullygnome_datetime_data(bs) # Method 3: Using Selenium print("Opening Sullygnome with browser...") with SB(uc=True) as sb: sb.uc_open_with_reconnect(sullygnome_url, reconnect_time=3) handle_cloudflare(sb) bs = BeautifulSoup(sb.driver.page_source, "html.parser") return parse_sullygnome_datetime_data(bs) except Exception: pass return None, None def unmute_vod(m3u8_link): counter = 0 video_filepath = get_vod_filepath(parse_streamer_from_m3u8_link(m3u8_link), parse_video_id_from_m3u8_link(m3u8_link)) write_m3u8_to_file(m3u8_link, video_filepath) file_contents = read_text_file(video_filepath) if is_video_muted(m3u8_link): with open(video_filepath, "w", encoding="utf-8") as video_file: for segment in file_contents: m3u8_link = m3u8_link.replace("index-dvr.m3u8", "") if "-unmuted" in segment and not segment.startswith("#"): counter += 1 video_file.write(f"{m3u8_link}{counter - 1}-muted.ts\n") elif "-unmuted" not in segment and not segment.startswith("#"): counter += 1 video_file.write(f"{m3u8_link}{counter - 1}.ts\n") else: video_file.write(f"{segment}\n") print(f"{os.path.normpath(video_filepath)} has been unmuted!\n") else: with open(video_filepath, "w", encoding="utf-8") as video_file: for segment in file_contents: m3u8_link = m3u8_link.replace("index-dvr.m3u8", "") if not segment.startswith("#"): video_file.write(f"{m3u8_link}{counter}.ts\n") counter += 1 else: video_file.write(f"{segment}\n") def mark_invalid_segments_in_playlist(m3u8_link): print() unmute_vod(m3u8_link) vod_file_path = get_vod_filepath(parse_streamer_from_m3u8_link(m3u8_link),parse_video_id_from_m3u8_link(m3u8_link)) with open(vod_file_path, "r", encoding="utf-8") as f: lines = f.read().splitlines() print("Checking for invalid segments...") segments = asyncio.run(validate_playlist_segments(get_all_playlist_segments(m3u8_link))) if not segments: if "/highlight" not in m3u8_link: print("No segments are valid. Cannot generate M3U8! Returning to main menu.") os.remove(vod_file_path) return playlist_segments = [segment for segment in segments if segment in lines] modified_playlist = [] for line in lines: if line in playlist_segments: modified_playlist.append(line) elif line.startswith("#"): modified_playlist.append(line) elif line.endswith(".ts"): modified_playlist.append("#" + line) else: modified_playlist.append(line) with open(vod_file_path, "w", encoding="utf-8") as f: f.write("\n".join(modified_playlist)) input("Press Enter to continue...") def return_m3u8_duration(m3u8_link): total_duration = 0 file_contents = requests.get(m3u8_link, stream=True, timeout=30).text.splitlines() for line in file_contents: if line.startswith("#EXTINF:"): segment_duration = float(line.split(":")[1].split(",")[0]) total_duration += segment_duration total_minutes = int(total_duration // 60) return total_minutes def process_m3u8_configuration(m3u8_link, skip_check=False): playlist_segments = get_all_playlist_segments(m3u8_link) check_segments = read_config_by_key("settings", "CHECK_SEGMENTS") and not skip_check print() m3u8_source = None if is_video_muted(m3u8_link): print("Video contains muted segments") if read_config_by_key("settings", "UNMUTE_VIDEO"): unmute_vod(m3u8_link) m3u8_source = get_vod_filepath(parse_streamer_from_m3u8_link(m3u8_link),parse_video_id_from_m3u8_link(m3u8_link),) else: # print("Video doesn't contain muted segments") m3u8_source = m3u8_link os.remove(get_vod_filepath(parse_streamer_from_m3u8_link(m3u8_link), parse_video_id_from_m3u8_link(m3u8_link))) if check_segments: print("Checking valid segments...") asyncio.run(validate_playlist_segments(playlist_segments)) return m3u8_source def get_all_playlist_segments(m3u8_link): counter = 0 segment_list = [] video_file_path = get_vod_filepath(parse_streamer_from_m3u8_link(m3u8_link), parse_video_id_from_m3u8_link(m3u8_link)) write_m3u8_to_file(m3u8_link, video_file_path) file_contents = read_text_file(video_file_path) with open(video_file_path, "w", encoding="utf-8") as video_file: for segment in file_contents: m3u8_link = m3u8_link.replace("index-dvr.m3u8", "") if "-unmuted" in segment and not segment.startswith("#"): counter += 1 new_segment = f"{m3u8_link}{counter - 1}-muted.ts" video_file.write(f"{new_segment}\n") segment_list.append(new_segment) elif "-unmuted" not in segment and not segment.startswith("#"): counter += 1 new_segment = f"{m3u8_link}{counter - 1}.ts" video_file.write(f"{new_segment}\n") segment_list.append(new_segment) else: video_file.write(f"{segment}\n") video_file.close() return segment_list async def validate_playlist_segments(segments): valid_segments = [] all_segments = [url.strip() for url in segments] available_segment_count = 0 async with aiohttp.ClientSession() as session: tasks = [fetch_status(session, url) for url in all_segments] for index, task in enumerate(asyncio.as_completed(tasks)): url = await task if url: available_segment_count += 1 valid_segments.append(url) print(f"\rChecking segments {index + 1} / {len(all_segments)}", end="") print() if available_segment_count == len(all_segments) or available_segment_count == 0: print("All Segments are Available\n") elif available_segment_count < len(all_segments): print(f"{available_segment_count} out of {len(all_segments)} Segments are Available. To recheck the segments select option 4 from the menu.\n") return valid_segments def vod_recover(streamer_name, video_id, timestamp, tracker_url=None): vod_age = calculate_days_since_broadcast(timestamp) if vod_age > 60: print("Video is older than 60 days. Chances of recovery are very slim.") vod_url = None if timestamp: vod_url = return_supported_qualities(asyncio.run(get_vod_urls(streamer_name, video_id, timestamp))) if vod_url is None: alternate_websites = generate_website_links(streamer_name, video_id, tracker_url) print("\nUnable to recover video! Trying alternate sources...") all_timestamps = [timestamp] # check if any of the alternate websites have a different timestamp, if so try to recover the video for website in alternate_websites: parsed_timestamp = None if "streamscharts" in website: parsed_timestamp, _ = parse_datetime_streamscharts(website) elif "twitchtracker" in website: parsed_timestamp, _ = parse_datetime_twitchtracker(website) elif "sullygnome" in website: # if timestamp contains a year that differs from current year, skip because sullygnome doesn't provide year if timestamp and datetime.now().year != int(timestamp.split("-")[0]): continue parsed_timestamp, _ = parse_datetime_sullygnome(website) if (parsed_timestamp and parsed_timestamp != timestamp and parsed_timestamp not in all_timestamps): all_timestamps.append(parsed_timestamp) vod_url = return_supported_qualities(asyncio.run(get_vod_urls(streamer_name, video_id, parsed_timestamp))) if vod_url: return vod_url if not parsed_timestamp: print("\033[91m \n✖ Unable to get the datetime, try inputting the datetime manually, using the manual recovery option. \033[0m") input("\nPress Enter to continue...") run_vod_recover() if not vod_url: print("\033[91m \n✖ Unable to recover the video! \033[0m") input("\nPress Enter to continue...") run_vod_recover() return vod_url def bulk_vod_recovery(): csv_file_path = get_and_validate_csv_filename() streamer_name = parse_streamer_from_csv_filename(csv_file_path) csv_file = parse_vod_csv_file(csv_file_path) print() all_m3u8_links = [] for timestamp, video_id in csv_file.items(): print("Recovering Video: ", video_id) m3u8_link = asyncio.run(get_vod_urls(streamer_name.lower(), video_id, timestamp)) if m3u8_link is not None: process_m3u8_configuration(m3u8_link) all_m3u8_links.append(m3u8_link) else: print("No VODs found using the current domain list.") if all_m3u8_links: print("All M3U8 Links found:") for link in all_m3u8_links: print(f"\033[92m{link}\033[0m") input("\nPress Enter to continue...") def clip_recover(streamer, video_id, duration): iteration_counter, valid_counter = 0, 0 valid_url_list = [] clip_format = print_clip_format_menu().split(" ") print("Searching...") full_url_list = get_all_clip_urls(get_clip_format(video_id, calculate_max_clip_offset(duration)), clip_format) request_session = requests.Session() rs = [grequests.head(u, session=request_session) for u in full_url_list] for response in grequests.imap(rs, size=100): iteration_counter += 1 print(f"\rSearching for clips... {iteration_counter} of {len(full_url_list)}", end=" ", flush=True) if response.status_code == 200: valid_counter += 1 valid_url_list.append(response.url) print(f"- {valid_counter} Clip(s) Found", end=" ") else: print(f"- {valid_counter} Clip(s) Found", end=" ") print() if valid_url_list: for url in valid_url_list: write_text_file(url, get_log_filepath(streamer, video_id)) if (read_config_by_key("settings", "AUTO_DOWNLOAD_CLIPS") or input("\nDo you want to download the recovered clips (Y/N): ").upper() == "Y"): download_clips(get_default_directory(), streamer, video_id) if read_config_by_key("settings", "REMOVE_LOG_FILE"): os.remove(get_log_filepath(streamer, video_id)) else: keep_log_option = input("Do you want to remove the log file? ") if keep_log_option.upper() == "Y": os.remove(get_log_filepath(streamer, video_id)) else: print("No clips found! Returning to main menu.\n") def get_and_validate_csv_filename(): window = tk.Tk() window.wm_attributes("-topmost", 1) window.withdraw() file_path = filedialog.askopenfilename(parent=window, title="Select The CSV File", filetypes=(("CSV files", ".csv"), ("all files", "."))) if not file_path: print("\nNo file selected! Returning to main menu.") return run_vod_recover() window.destroy() csv_filename = os.path.basename(file_path) pattern = r"^[a-zA-Z0-9_]{4,25} - Twitch stream stats" if bool(re.match(pattern, csv_filename)): return file_path print("The CSV filename MUST be the original filename that was downloaded from sullygnome!") def parse_clip_csv_file(file_path): vod_info_dict = {} lines = read_csv_file(file_path)[1:] for line in lines: if line: stream_date = remove_chars_from_ordinal_numbers(line[1].replace('"', "")) modified_stream_date = datetime.strptime(stream_date, "%A %d %B %Y %H:%M").strftime("%d-%B-%Y") video_id = line[2].partition("stream/")[2].replace('"', "") duration = line[3] if video_id != "0": max_clip_offset = calculate_max_clip_offset(int(duration)) vod_info_dict.update({video_id: (modified_stream_date, max_clip_offset)}) return vod_info_dict def parse_vod_csv_file(file_path): vod_info_dict = {} lines = read_csv_file(file_path)[1:] for line in lines: if line: stream_date = remove_chars_from_ordinal_numbers(line[1].replace('"', "")) modified_stream_date = datetime.strptime(stream_date, "%A %d %B %Y %H:%M").strftime("%Y-%m-%d %H:%M:%S") video_id = line[2].partition("stream/")[2].split(",")[0].replace('"', "") vod_info_dict.update({modified_stream_date: video_id}) return vod_info_dict def merge_csv_files(csv_filename, directory_path): csv_list = [file for file in os.listdir(directory_path) if file.endswith(".csv")] header_saved = False with open(os.path.join(directory_path, f"{csv_filename.title()}_MERGED.csv"), "w", newline="", encoding="utf-8") as output_file: writer = csv.writer(output_file) for file in csv_list: reader = read_csv_file(os.path.join(directory_path, file)) header = reader[0] if not header_saved: writer.writerow(header) header_saved = True for row in reader[1:]: writer.writerow(row) print("CSV files merged successfully!") def random_clip_recovery(video_id, hours, minutes): counter = 0 display_limit = 5 clip_format = print_clip_format_menu().split(" ") full_url_list = get_all_clip_urls(get_clip_format(video_id, calculate_max_clip_offset(calculate_broadcast_duration_in_minutes(hours, minutes))), clip_format) random.shuffle(full_url_list) request_session = requests.Session() print("Searching...") rs = (grequests.head(url, session=request_session) for url in full_url_list) responses = grequests.imap(rs, size=100) for response in responses: if counter < display_limit: if response.status_code == 200: counter += 1 print(response.url) if counter == display_limit: user_option = input("Do you want to search more URLs (Y/N): ") if user_option.upper() == "Y": display_limit += 3 else: break async def validate_clip(session, url, streamer_name, video_id): try: async with session.head(url, timeout=30) as response: if response.status == 200: write_text_file(response.url, get_log_filepath(streamer_name, video_id)) return response.url except Exception: return None async def bulk_clip_recovery(): vod_counter, total_counter, valid_counter, iteration_counter = 0, 0, 0, 0 streamer_name, csv_file_path = "", "" bulk_recovery_option = print_bulk_clip_recovery_menu() if bulk_recovery_option == "1": csv_file_path = get_and_validate_csv_filename() streamer_name = parse_streamer_from_csv_filename(csv_file_path) elif bulk_recovery_option == "2": csv_directory = input("Enter the full path where the sullygnome csv files exist: ").replace('"', "") streamer_name = input("Enter the streamer's name: ") merge_files = input("Do you want to merge the CSV files in the directory? (Y/N): ") if merge_files.upper() == "Y": merge_csv_files(streamer_name, csv_directory) csv_file_path = os.path.join(csv_directory, f"{streamer_name.title()}_MERGED.csv") else: csv_file_path = get_and_validate_csv_filename() csv_file_path = csv_file_path.replace('"', "") elif bulk_recovery_option == "3": return run_vod_recover() clip_format = print_clip_format_menu().split(" ") stream_info_dict = parse_clip_csv_file(csv_file_path) async with aiohttp.ClientSession() as session: for video_id, values in stream_info_dict.items(): vod_counter += 1 print(f"\nProcessing Past Broadcast:\n" f"Stream Date: {values[0].replace('-', ' ')}\n" f"Vod ID: {video_id}\n" f"Vod Number: {vod_counter} of {len(stream_info_dict)}\n") original_vod_url_list = get_all_clip_urls(get_clip_format(video_id, values[1]), clip_format) print("Searching...") tasks = [validate_clip(session, url, streamer_name, video_id) for url in original_vod_url_list] for task in asyncio.as_completed(tasks): total_counter += 1 iteration_counter += 1 print(f"\rSearching for clips... {iteration_counter} of {len(original_vod_url_list)}", end=" ", flush=True) result = await task if result: valid_counter += 1 print(f"\n\033[92m{valid_counter} Clip(s) Found\033[0m\n") if valid_counter != 0: user_option = input("Do you want to download all clips recovered (Y/N)? ") if user_option.upper() == "Y": download_clips(get_default_directory(), streamer_name, video_id) os.remove(get_log_filepath(streamer_name, video_id)) else: choice = input("\nWould you like to keep the log file containing links to the recovered clips (Y/N)? ") if choice.upper() == "N": os.remove(get_log_filepath(streamer_name, video_id)) else: print("\nRecovered links saved to " + get_log_file
Views: 4