import os import sys import time import requests import tkinter as tk from tkinter import filedialog from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn, TimeElapsedColumn from rich.panel import Panel from rich.table import Table from rich import print # --- CONFIGURATION --- # TODO: Replace with your actual Sightengine API credentials API_USER = "1020608869" API_SECRET = "M7WF4mAXwSs2gs869LC9PecvYCBQyBSz" SYNC_ENDPOINT = "https://api.sightengine.com/1.0/video/check-sync.json" ASYNC_ENDPOINT = "https://api.sightengine.com/1.0/video/check.json" UPLOAD_ENDPOINT = "https://api.sightengine.com/1.0/upload/create-video.json" SIZE_THRESHOLD = 50 * 1024 * 1024 # 50 MB POLL_INTERVAL = 5 # seconds between polls POLL_TIMEOUT = 600 # max seconds to wait for async result console = Console() def select_video_file(): """Opens a file dialog to select a video file using Tkinter.""" root = tk.Tk() root.withdraw() # Hide the main window file_path = filedialog.askopenfilename( title="Select a video file to analyze", filetypes=[ ("Video Files", "*.mp4 *.mov *.avi *.mkv *.webm *.flv *.wmv"), ("All Files", "*.*") ] ) return file_path def upload_video(file_path): """Uploads a large video via the Sightengine Upload API. Returns the media_id.""" # Step 1: create a signed upload URL with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), transient=True ) as progress: progress.add_task(description="Requesting upload URL...", total=None) resp = requests.get(UPLOAD_ENDPOINT, params={ 'api_user': API_USER, 'api_secret': API_SECRET }) data = resp.json() if data.get('status') != 'success': raise RuntimeError(f"Upload URL error: {data.get('error', {}).get('message', 'Unknown')}") media_id = data['media']['id'] upload_url = data['media']['upload_url'] # Step 2: PUT the file to the signed URL file_size = os.path.getsize(file_path) console.print(f"[blue]Uploading[/blue] {os.path.basename(file_path)} ({file_size / 1024 / 1024:.1f} MB)...") with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), TimeElapsedColumn(), transient=True ) as progress: progress.add_task(description="Uploading video to Sightengine...", total=None) with open(file_path, 'rb') as f: put_resp = requests.put(upload_url, data=f, headers={'Content-Type': 'video/mp4'}) if put_resp.status_code not in (200, 204): raise RuntimeError(f"Upload failed: HTTP {put_resp.status_code}") console.print(f"[green]Upload complete.[/green] Media ID: {media_id}") return media_id def check_video_async(file_path): """Submits a video for async analysis and polls until results are ready.""" file_size = os.path.getsize(file_path) params = { 'models': 'genai', 'api_user': API_USER, 'api_secret': API_SECRET } # Submit the job if file_size >= SIZE_THRESHOLD: media_id = upload_video(file_path) console.print("[blue]Submitting async analysis job...[/blue]") with Progress(SpinnerColumn(), TextColumn("[progress.description]{task.description}"), transient=True) as progress: progress.add_task(description="Submitting job...", total=None) params['media_id'] = media_id resp = requests.post(ASYNC_ENDPOINT, data=params) else: console.print("[blue]Submitting async analysis job (file upload)...[/blue]") with Progress(SpinnerColumn(), TextColumn("[progress.description]{task.description}"), transient=True) as progress: progress.add_task(description="Uploading and submitting job...", total=None) with open(file_path, 'rb') as f: resp = requests.post(ASYNC_ENDPOINT, files={'media': f}, data=params) resp_data = resp.json() if resp_data.get('status') not in ('success', 'submitted'): error_msg = resp_data.get('error', {}).get('message', 'Unknown error') raise RuntimeError(f"Async submission error: {error_msg}") request_id = resp_data.get('request', {}).get('id') or resp_data.get('media', {}).get('id') if not request_id: raise RuntimeError(f"No request ID in response: {resp_data}") console.print(f"[blue]Job submitted.[/blue] Request ID: {request_id}") # Poll for results console.print(f"[blue]Waiting for results[/blue] (polling every {POLL_INTERVAL}s, timeout {POLL_TIMEOUT}s)...") start = time.time() with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), TimeElapsedColumn(), transient=True ) as progress: progress.add_task(description="Analyzing video...", total=None) while time.time() - start < POLL_TIMEOUT: time.sleep(POLL_INTERVAL) poll_resp = requests.get(ASYNC_ENDPOINT, params={ 'request_id': request_id, 'models': 'genai', 'api_user': API_USER, 'api_secret': API_SECRET }) poll_data = poll_resp.json() status = poll_data.get('status') if status == 'finished': return poll_data elif status in ('error', 'failure'): error_msg = poll_data.get('error', {}).get('message', 'Unknown error') raise RuntimeError(f"Async analysis error: {error_msg}") # status == 'processing' or similar — keep polling raise RuntimeError(f"Timed out waiting for async results after {POLL_TIMEOUT}s") def check_video(file_path): """Checks the video using Sightengine API (sync, with async fallback for long videos).""" if not os.path.exists(file_path): console.print(f"[bold red]Error:[/bold red] File not found: {file_path}") return file_size = os.path.getsize(file_path) # Large files go directly to async mode if file_size >= SIZE_THRESHOLD: console.print(f"[yellow]Large file ({file_size / 1024 / 1024:.1f} MB) — using async upload mode.[/yellow]") try: response_data = check_video_async(file_path) display_results(file_path, response_data) except Exception as e: console.print(f"[bold red]System Error:[/bold red] {e}") return # Small files: try sync first params = { 'models': 'genai', 'api_user': API_USER, 'api_secret': API_SECRET } media_file = open(file_path, 'rb') try: with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), transient=True ) as progress: progress.add_task(description="Uploading and analyzing video...", total=None) response = requests.post(SYNC_ENDPOINT, files={'media': media_file}, data=params) response_data = response.json() if response_data.get('status') == 'success': display_results(file_path, response_data) return error_msg = response_data.get('error', {}).get('message', 'Unknown error') # Fallback: video too long for sync mode if 'too long' in error_msg.lower() or 'continuous moderation' in error_msg.lower(): console.print(f"[yellow]Video too long for sync mode — switching to async mode...[/yellow]") media_file.close() try: response_data = check_video_async(file_path) display_results(file_path, response_data) except Exception as e: console.print(f"[bold red]System Error:[/bold red] {e}") else: console.print(f"[bold red]API Error:[/bold red] {error_msg}") except Exception as e: console.print(f"[bold red]System Error:[/bold red] {e}") finally: if not media_file.closed: media_file.close() def display_results(file_path, data): """Displays the analysis results using Rich.""" frames = data.get('data', {}).get('frames', []) if not frames: console.print("[yellow]No frames analyzed.[/yellow]") return total_ai_score = 0 count = 0 table = Table(title="Frame Analysis (Sample)") table.add_column("Time (s)", justify="right", style="cyan", no_wrap=True) table.add_column("AI Probability", justify="right", style="magenta") table.add_column("Verdict", justify="center") max_ai_score = 0 for frame in frames: ai_score = frame.get('type', {}).get('ai_generated', 0) total_ai_score += ai_score count += 1 if ai_score > max_ai_score: max_ai_score = ai_score if ai_score > 0.5 or count % 10 == 1: verdict = "[red]FAKE[/red]" if ai_score > 0.8 else ("[yellow]SUSPICIOUS[/yellow]" if ai_score > 0.4 else "[green]REAL[/green]") position = frame.get('info', {}).get('position', 0) table.add_row(str(position), f"{ai_score:.2%}", verdict) avg_ai_score = total_ai_score / count if count else 0 final_verdict = "UNCERTAIN" color = "yellow" if max_ai_score > 0.85: final_verdict = "AI GENERATED" color = "red" elif max_ai_score < 0.2: final_verdict = "AUTHENTIC" color = "green" request_info = data.get('request', {}) req_id = request_info.get('id', 'N/A') ops = request_info.get('operations', 0) panel_content = f""" [bold]File:[/bold] {os.path.basename(file_path)} [bold]Request ID:[/bold] {req_id} [bold]Operations Cost:[/bold] {ops} [bold]Max AI Score:[/bold] {max_ai_score:.2%} [bold]Average AI Score:[/bold] {avg_ai_score:.2%} [bold size=20 style={color}]VERDICT: {final_verdict}[/bold size=20 style={color}] """ console.print(Panel(panel_content, title="Analysis Result", expand=False)) console.print(table) def main(): if API_USER == "CHANGE_ME" or API_SECRET == "CHANGE_ME": console.print("[bold red]Configuration Error:[/bold red] Please set API_USER and API_SECRET in the script.") return if len(sys.argv) > 1: video_path = sys.argv[1] else: console.print(Panel( "[bold green]AI Video Authenticity Check[/bold green]\n\n" "This script analyzes a video to detect AI-generated content using Sightengine.\n" "Please select a video file in the window that opens...", title="Welcome", border_style="green" )) video_path = select_video_file() if not video_path: console.print("[yellow]No file selected. Exiting.[/yellow]") return console.print(f"[blue]Analyzing:[/blue] {video_path}") check_video(video_path) if __name__ == "__main__": try: main() finally: input("Press Enter to continue...")