- Blog
- Batch Watermark Removal with Python: Process Hundreds of Videos Automatically
Batch Watermark Removal with Python: Process Hundreds of Videos Automatically
Batch Watermark Removal with Python: Process Hundreds of Videos Automatically
For developers and automation users who need to process large volumes of watermarked videos, manually uploading one by one is clearly too inefficient. Through the API provided by sora2watermarkremover.net, you can write Python scripts to build a complete pipeline: batch upload, automatic processing, and automatic download.
This guide provides complete Python code examples to build an automated batch watermark removal workflow from scratch.
Why Use Python for Batch Watermark Removal?
Use Cases
These scenarios are ideal for batch watermark removal:
- Batch AI video generation: Videos generated in bulk from Kling AI, Runway, Pika all carry watermarks
- Media library cleanup: Large collections of watermarked historical videos that need uniform processing
- Social media batch publishing: Materials downloaded from multiple platforms need watermark removal before republishing
- Automated workflows: Integrate watermark removal into CI/CD pipelines or daily data processing
API Advantages
The API from sora2watermarkremover.net supports:
- Batch video uploads (MP4, MOV, max 200MB per file)
- Async processing — check status without waiting
- Auto watermark detection (Auto mode)
- 1080p high-quality output
- Starting at $0.5/video, 10 free credits included
Environment Setup
Requirements
- Python 3.8+
- Stable internet connection
- API Key (register at sora2watermarkremover.net to obtain)
Install Dependencies
pip install requests tqdm
requests: HTTP request librarytqdm: Progress bar display
Complete Python Batch Watermark Removal Code
Basic Version: Single Video Watermark Removal
import requests
import time
API_KEY = "your-api-key-here"
BASE_URL = "https://api.sora2watermarkremover.net"
def upload_video(api_key, file_path):
"""Upload video file"""
url = f"{BASE_URL}/upload"
headers = {"Authorization": f"Bearer {api_key}"}
with open(file_path, 'rb') as f:
files = {'file': (file_path, f, 'video/mp4')}
response = requests.post(url, headers=headers, files=files)
result = response.json()
if response.status_code == 200:
print(f"✅ Upload successful, Task ID: {result['task_id']}")
return result['task_id']
else:
print(f"❌ Upload failed: {result.get('error', 'Unknown error')}")
return None
def check_status(api_key, task_id):
"""Check task status"""
url = f"{BASE_URL}/status/{task_id}"
headers = {"Authorization": f"Bearer {api_key}"}
response = requests.get(url, headers=headers)
return response.json()
def download_result(api_key, task_id, output_path):
"""Download processed result"""
url = f"{BASE_URL}/download/{task_id}"
headers = {"Authorization": f"Bearer {api_key}"}
response = requests.get(url, headers=headers, stream=True)
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"✅ Downloaded to: {output_path}")
# Usage example
task_id = upload_video(API_KEY, "video_watermarked.mp4")
if task_id:
# Wait for processing
while True:
status = check_status(API_KEY, task_id)
if status['status'] == 'completed':
break
elif status['status'] == 'failed':
print(f"Processing failed: {status.get('error')}")
break
print(f"Processing... Progress: {status.get('progress', 'unknown')}")
time.sleep(5)
# Download result
download_result(API_KEY, task_id, "video_clean.mp4")
Batch Version: Process All Videos in a Folder
import os
import requests
import time
import glob
from tqdm import tqdm
API_KEY = "your-api-key-here"
BASE_URL = "https://api.sora2watermarkremover.net"
class BatchWatermarkRemover:
"""Batch watermark removal processor"""
def __init__(self, api_key, input_dir, output_dir):
self.api_key = api_key
self.input_dir = input_dir
self.output_dir = output_dir
self.headers = {"Authorization": f"Bearer {api_key}"}
os.makedirs(output_dir, exist_ok=True)
def upload_video(self, file_path):
"""Upload single video"""
url = f"{BASE_URL}/upload"
with open(file_path, 'rb') as f:
files = {'file': (os.path.basename(file_path), f, 'video/mp4')}
response = requests.post(url, headers=self.headers, files=files)
return response.json()
def check_status(self, task_id):
"""Check task status"""
url = f"{BASE_URL}/status/{task_id}"
response = requests.get(url, headers=self.headers)
return response.json()
def download_result(self, task_id, output_path):
"""Download processed result"""
url = f"{BASE_URL}/download/{task_id}"
response = requests.get(url, headers=self.headers, stream=True)
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
def process_batch(self, file_pattern="*.mp4"):
"""Batch process all videos in folder"""
video_files = glob.glob(os.path.join(self.input_dir, file_pattern))
video_files.extend(glob.glob(os.path.join(self.input_dir, "*.MOV")))
video_files.extend(glob.glob(os.path.join(self.input_dir, "*.mov")))
video_files = list(set(video_files))
print(f"📂 Found {len(video_files)} video files")
tasks = []
# Phase 1: Batch upload
print("📤 Phase 1: Batch uploading...")
for file_path in tqdm(video_files, desc="Uploading"):
result = self.upload_video(file_path)
if result.get('task_id'):
filename = os.path.basename(file_path)
name, ext = os.path.splitext(filename)
output_name = f"{name}_clean{ext}"
output_path = os.path.join(self.output_dir, output_name)
tasks.append((result['task_id'], file_path, output_path))
tqdm.write(f" ✅ {filename} → Task ID: {result['task_id']}")
else:
tqdm.write(f" ❌ {file_path} upload failed: {result.get('error')}")
# Phase 2: Wait for processing
print("⏳ Phase 2: Waiting for processing...")
for task_id, file_path, output_path in tasks:
filename = os.path.basename(file_path)
while True:
status = self.check_status(task_id)
if status['status'] == 'completed':
break
elif status['status'] == 'failed':
tqdm.write(f" ❌ {filename} processing failed: {status.get('error')}")
break
time.sleep(5)
tqdm.write(f" ✅ {filename} completed")
# Phase 3: Batch download
print("📥 Phase 3: Batch downloading...")
for task_id, file_path, output_path in tqdm(tasks, desc="Downloading"):
self.download_result(task_id, output_path)
tqdm.write(f" ✅ Downloaded: {os.path.basename(output_path)}")
print(f"\n🎉 Batch processing complete! {len(tasks)} videos processed")
print(f"📁 Output directory: {self.output_dir}")
# Usage example
remover = BatchWatermarkRemover(
api_key=API_KEY,
input_dir="./watermarked_videos",
output_dir="./clean_videos"
)
remover.process_batch()
Advanced Version: Concurrent Processing + Retry Logic
import os
import requests
import time
import glob
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
API_KEY = "your-api-key-here"
BASE_URL = "https://api.sora2watermarkremover.net"
MAX_CONCURRENT = 5 # Max concurrent workers
MAX_RETRIES = 3 # Max retry attempts
class AdvancedBatchRemover:
"""Advanced batch watermark removal (concurrent + retry)"""
def __init__(self, api_key, input_dir, output_dir):
self.api_key = api_key
self.input_dir = input_dir
self.output_dir = output_dir
self.headers = {"Authorization": f"Bearer {api_key}"}
self.session = requests.Session()
self.session.headers.update(self.headers)
os.makedirs(output_dir, exist_ok=True)
self.log_file = os.path.join(output_dir, "batch_log.json")
self.results = {"success": [], "failed": []}
def upload_with_retry(self, file_path, retries=MAX_RETRIES):
"""Upload with retry logic"""
for attempt in range(1, retries + 1):
try:
with open(file_path, 'rb') as f:
files = {'file': (os.path.basename(file_path), f, 'video/mp4')}
response = self.session.post(
f"{BASE_URL}/upload",
files=files,
timeout=120
)
return response.json()
except Exception as e:
if attempt == retries:
return {"error": f"Upload failed ({attempt} retries): {str(e)}"}
time.sleep(2 ** attempt)
def wait_for_completion(self, task_id, timeout=600):
"""Wait for task completion"""
start_time = time.time()
while time.time() - start_time < timeout:
response = self.session.get(f"{BASE_URL}/status/{task_id}")
status = response.json()
if status['status'] == 'completed':
return True, status
elif status['status'] == 'failed':
return False, status
time.sleep(5)
return False, {"error": "Processing timed out"}
def download_with_retry(self, task_id, output_path, retries=MAX_RETRIES):
"""Download with retry logic"""
for attempt in range(1, retries + 1):
try:
response = self.session.get(
f"{BASE_URL}/download/{task_id}",
stream=True,
timeout=300
)
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return True
except Exception as e:
if attempt == retries:
return False
time.sleep(2 ** attempt)
return False
def process_video(self, file_path):
"""Complete workflow for single video"""
filename = os.path.basename(file_path)
name, ext = os.path.splitext(filename)
output_name = f"{name}_clean{ext}"
output_path = os.path.join(self.output_dir, output_name)
# Upload
result = self.upload_with_retry(file_path)
if 'task_id' not in result:
self.results['failed'].append({
'file': filename,
'error': result.get('error', 'Upload failed')
})
return False
task_id = result['task_id']
# Wait for completion
success, status = self.wait_for_completion(task_id)
if not success:
self.results['failed'].append({
'file': filename,
'error': status.get('error', 'Processing failed')
})
return False
# Download
downloaded = self.download_with_retry(task_id, output_path)
if not downloaded:
self.results['failed'].append({
'file': filename,
'error': 'Download failed'
})
return False
self.results['success'].append({
'file': filename,
'output': output_name,
'task_id': task_id
})
return True
def run(self, file_pattern="*.mp4"):
"""Run batch processing"""
patterns = ["*.mp4", "*.MP4", "*.mov", "*.MOV", "*.avi", "*.AVI"]
video_files = []
for pattern in patterns:
video_files.extend(glob.glob(os.path.join(self.input_dir, pattern)))
video_files = list(set(video_files))
print(f"📂 Found {len(video_files)} video files")
print(f"⚙️ Max concurrent: {MAX_CONCURRENT}, Max retries: {MAX_RETRIES}")
# Concurrent processing
with ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as executor:
futures = {
executor.submit(self.process_video, fp): fp
for fp in video_files
}
for future in tqdm(as_completed(futures), total=len(futures), desc="Progress"):
fp = futures[future]
try:
result = future.result()
status = "✅" if result else "❌"
tqdm.write(f" {status} {os.path.basename(fp)}")
except Exception as e:
tqdm.write(f" ❌ {os.path.basename(fp)}: {str(e)}")
# Save log
with open(self.log_file, 'w') as f:
json.dump(self.results, f, indent=2, ensure_ascii=False)
print(f"\n🎉 Batch processing complete!")
print(f"✅ Success: {len(self.results['success'])}")
print(f"❌ Failed: {len(self.results['failed'])}")
print(f"📋 Detailed log: {self.log_file}")
print(f"📁 Output directory: {self.output_dir}")
# Usage example
remover = AdvancedBatchRemover(
api_key=API_KEY,
input_dir="./watermarked_videos",
output_dir="./clean_videos"
)
remover.run()
Python Batch Watermark Removal FAQ
How to get an API Key?
Register at sora2watermarkremover.net and find your API Key in the user dashboard. New accounts come with 10 free credits.
What video formats are supported?
Common video formats including MP4, MOV, AVI, MKV are supported, with a max file size of 200MB per file.
How to handle videos larger than 200MB?
Compress with FFmpeg before processing:
ffmpeg -i input.mp4 -vf "scale=1920:-1" -c:v libx264 -crf 23 compressed.mp4
What's the ideal concurrency level?
Start with 3-5 concurrent workers and adjust based on API rate limits. Too high concurrency may trigger throttling.
What needs to be modified in the code?
- Replace
API_KEY = "your-api-key-here"with your real API Key - Update
input_dirandoutput_dirto your actual paths - Adjust
MAX_CONCURRENTbased on your needs
Use Cases for Python Batch Watermark Removal
AI Video Creators
- Unified watermark removal for bulk-generated videos from Kling AI, Runway, Pika
- Full pipeline: daily auto-generation → auto watermark removal → auto publishing
Marketing & E-commerce Teams
- Batch cleanup of supplier materials
- Uniform processing of product videos
- Brand material library management
Education & Media
- Batch processing of teaching materials
- Historical video archive restoration
- Content archiving with watermark removal
Developers & Automation
- Integration into data processing pipelines
- Scheduled cron jobs for automated processing
- API wrapper services
Summary
By calling the sora2watermarkremover.net API through Python scripts, you can easily automate batch video watermark removal. The basic version suits small-scale processing, while the advanced version with concurrency and retry logic is designed for large-scale production environments.
Starting at $0.5/video with 10 free credits included, batch processing costs remain highly manageable.
Disclaimer: This content is for informational purposes only. When using watermark removal tools, ensure you have the right to use or authorize the video. Removing watermarks from copyrighted content owned by others may violate DMCA (Digital Millennium Copyright Act) and relevant copyright laws. If you are the original author of the video, removing watermarks from your own content for personal projects is reasonable. Consult legal counsel before commercial use.
