- 博客
- Python 批量去水印脚本教程:自动化处理上百个视频
Python 批量去水印脚本教程:自动化处理上百个视频
15 days ago
Python 批量去水印脚本教程:自动化处理上百个视频
对于需要频繁处理大量视频水印的开发者和自动化用户来说,手动逐个上传去水印显然效率太低。通过 sora2watermarkremover.net 的 API 接口,你可以编写 Python 脚本实现批量上传、自动处理、自动下载的完整流水线。
本文提供完整的 Python 代码示例,从零开始搭建批量去水印自动化流程。
为什么用 Python 批量去水印?
场景需求
以下场景特别适合批量去水印:
- AI 视频批量生成:使用 Kling AI、Runway、Pika 等工具批量生成的视频,每个都带有水印
- 素材库整理:公司素材库中有大量带水印的历史视频需要统一处理
- 社交媒体批量发布:从多个平台下载的素材需要统一清理水印后重新发布
- 自动化工作流:将去水印集成到 CI/CD 或日常数据处理流水线中
API 优势
sora2watermarkremover.net 的 API 支持:
- 批量上传视频文件(支持 MP4、MOV,最大 200MB/文件)
- 异步处理,无需等待即可查询状态
- 自动识别水印位置(Auto 模式)
- 1080p 高质量输出
- $0.5/视频起,10 条免费额度
环境准备
系统要求
- Python 3.8+
- 稳定网络连接
- API Key(在 sora2watermarkremover.net 注册并获取)
安装依赖
pip install requests tqdm
requests:HTTP 请求库tqdm:进度条显示
完整 Python 批量去水印代码
基础版本:单个视频去水印
import requests
import time
API_KEY = "your-api-key-here"
BASE_URL = "https://api.sora2watermarkremover.net"
def upload_video(api_key, file_path):
"""上传视频文件"""
url = f"{BASE_URL}/upload"
headers = {"Authorization": f"Bearer {api_key}"}
with open(file_path, 'rb') as f:
files = {'file': (file_path, f, 'video/mp4')}
response = requests.post(url, headers=headers, files=files)
result = response.json()
if response.status_code == 200:
print(f"✅ 上传成功,任务ID: {result['task_id']}")
return result['task_id']
else:
print(f"❌ 上传失败: {result.get('error', '未知错误')}")
return None
def check_status(api_key, task_id):
"""查询任务状态"""
url = f"{BASE_URL}/status/{task_id}"
headers = {"Authorization": f"Bearer {api_key}"}
response = requests.get(url, headers=headers)
return response.json()
def download_result(api_key, task_id, output_path):
"""下载处理结果"""
url = f"{BASE_URL}/download/{task_id}"
headers = {"Authorization": f"Bearer {api_key}"}
response = requests.get(url, headers=headers, stream=True)
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"✅ 已下载至: {output_path}")
# 使用示例
task_id = upload_video(API_KEY, "video_watermarked.mp4")
if task_id:
# 等待处理完成
while True:
status = check_status(API_KEY, task_id)
if status['status'] == 'completed':
break
elif status['status'] == 'failed':
print(f"处理失败: {status.get('error')}")
break
print(f"处理中... 进度: {status.get('progress', 'unknown')}")
time.sleep(5)
# 下载结果
download_result(API_KEY, task_id, "video_clean.mp4")
批量版本:处理文件夹中所有视频
import os
import requests
import time
import glob
from tqdm import tqdm
API_KEY = "your-api-key-here"
BASE_URL = "https://api.sora2watermarkremover.net"
class BatchWatermarkRemover:
"""批量去水印处理器"""
def __init__(self, api_key, input_dir, output_dir):
self.api_key = api_key
self.input_dir = input_dir
self.output_dir = output_dir
self.headers = {"Authorization": f"Bearer {api_key}"}
os.makedirs(output_dir, exist_ok=True)
def upload_video(self, file_path):
"""上传单个视频"""
url = f"{BASE_URL}/upload"
with open(file_path, 'rb') as f:
files = {'file': (os.path.basename(file_path), f, 'video/mp4')}
response = requests.post(url, headers=self.headers, files=files)
return response.json()
def check_status(self, task_id):
"""查询任务状态"""
url = f"{BASE_URL}/status/{task_id}"
response = requests.get(url, headers=self.headers)
return response.json()
def download_result(self, task_id, output_path):
"""下载处理结果"""
url = f"{BASE_URL}/download/{task_id}"
response = requests.get(url, headers=self.headers, stream=True)
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
def process_batch(self, file_pattern="*.mp4"):
"""批量处理文件夹中所有视频"""
video_files = glob.glob(os.path.join(self.input_dir, file_pattern))
video_files.extend(glob.glob(os.path.join(self.input_dir, "*.MOV")))
video_files.extend(glob.glob(os.path.join(self.input_dir, "*.mov")))
video_files = list(set(video_files)) # 去重
print(f"📂 找到 {len(video_files)} 个视频文件")
tasks = []
# 第一阶段:批量上传
print("📤 阶段 1: 批量上传...")
for file_path in tqdm(video_files, desc="上传中"):
result = self.upload_video(file_path)
if result.get('task_id'):
filename = os.path.basename(file_path)
name, ext = os.path.splitext(filename)
output_name = f"{name}_clean{ext}"
output_path = os.path.join(self.output_dir, output_name)
tasks.append((result['task_id'], file_path, output_path))
tqdm.write(f" ✅ {filename} → 任务ID: {result['task_id']}")
else:
tqdm.write(f" ❌ {file_path} 上传失败: {result.get('error')}")
# 第二阶段:等待处理
print("⏳ 阶段 2: 等待处理完成...")
for task_id, file_path, output_path in tasks:
filename = os.path.basename(file_path)
while True:
status = self.check_status(task_id)
if status['status'] == 'completed':
break
elif status['status'] == 'failed':
tqdm.write(f" ❌ {filename} 处理失败: {status.get('error')}")
break
time.sleep(5)
tqdm.write(f" ✅ {filename} 处理完成")
# 第三阶段:批量下载
print("📥 阶段 3: 批量下载...")
for task_id, file_path, output_path in tqdm(tasks, desc="下载中"):
self.download_result(task_id, output_path)
tqdm.write(f" ✅ 已下载: {os.path.basename(output_path)}")
print(f"\n🎉 批量处理完成!共处理 {len(tasks)} 个视频")
print(f"📁 输出目录: {self.output_dir}")
# 使用示例
remover = BatchWatermarkRemover(
api_key=API_KEY,
input_dir="./watermarked_videos",
output_dir="./clean_videos"
)
remover.process_batch()
高级版本:并发处理 + 错误重试
import os
import requests
import time
import glob
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
API_KEY = "your-api-key-here"
BASE_URL = "https://api.sora2watermarkremover.net"
MAX_CONCURRENT = 5 # 最大并发数
MAX_RETRIES = 3 # 最大重试次数
class AdvancedBatchRemover:
"""高级批量去水印处理器(支持并发和重试)"""
def __init__(self, api_key, input_dir, output_dir):
self.api_key = api_key
self.input_dir = input_dir
self.output_dir = output_dir
self.headers = {"Authorization": f"Bearer {api_key}"}
self.session = requests.Session()
self.session.headers.update(self.headers)
os.makedirs(output_dir, exist_ok=True)
self.log_file = os.path.join(output_dir, "batch_log.json")
self.results = {"success": [], "failed": []}
def upload_with_retry(self, file_path, retries=MAX_RETRIES):
"""带重试的上传"""
for attempt in range(1, retries + 1):
try:
with open(file_path, 'rb') as f:
files = {'file': (os.path.basename(file_path), f, 'video/mp4')}
response = self.session.post(
f"{BASE_URL}/upload",
files=files,
timeout=120
)
return response.json()
except Exception as e:
if attempt == retries:
return {"error": f"上传失败 ({attempt}次重试): {str(e)}"}
time.sleep(2 ** attempt) # 指数退避
def wait_for_completion(self, task_id, timeout=600):
"""等待任务完成"""
start_time = time.time()
while time.time() - start_time < timeout:
response = self.session.get(f"{BASE_URL}/status/{task_id}")
status = response.json()
if status['status'] == 'completed':
return True, status
elif status['status'] == 'failed':
return False, status
time.sleep(5)
return False, {"error": "处理超时"}
def download_with_retry(self, task_id, output_path, retries=MAX_RETRIES):
"""带重试的下载"""
for attempt in range(1, retries + 1):
try:
response = self.session.get(
f"{BASE_URL}/download/{task_id}",
stream=True,
timeout=300
)
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return True
except Exception as e:
if attempt == retries:
return False
time.sleep(2 ** attempt)
return False
def process_video(self, file_path):
"""处理单个视频的完整流程"""
filename = os.path.basename(file_path)
name, ext = os.path.splitext(filename)
output_name = f"{name}_clean{ext}"
output_path = os.path.join(self.output_dir, output_name)
# 上传
result = self.upload_with_retry(file_path)
if 'task_id' not in result:
self.results['failed'].append({
'file': filename,
'error': result.get('error', '上传失败')
})
return False
task_id = result['task_id']
# 等待完成
success, status = self.wait_for_completion(task_id)
if not success:
self.results['failed'].append({
'file': filename,
'error': status.get('error', '处理失败')
})
return False
# 下载
downloaded = self.download_with_retry(task_id, output_path)
if not downloaded:
self.results['failed'].append({
'file': filename,
'error': '下载失败'
})
return False
self.results['success'].append({
'file': filename,
'output': output_name,
'task_id': task_id
})
return True
def run(self, file_pattern="*.mp4"):
"""运行批量处理"""
# 收集所有视频文件
patterns = ["*.mp4", "*.MP4", "*.mov", "*.MOV", "*.avi", "*.AVI"]
video_files = []
for pattern in patterns:
video_files.extend(glob.glob(os.path.join(self.input_dir, pattern)))
video_files = list(set(video_files))
print(f"📂 找到 {len(video_files)} 个视频文件")
print(f"🔧 最大并发: {MAX_CONCURRENT}, 最大重试: {MAX_RETRIES}")
# 并发处理
with ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as executor:
futures = {
executor.submit(self.process_video, fp): fp
for fp in video_files
}
for future in tqdm(as_completed(futures), total=len(futures), desc="处理进度"):
fp = futures[future]
try:
result = future.result()
status = "✅" if result else "❌"
tqdm.write(f" {status} {os.path.basename(fp)}")
except Exception as e:
tqdm.write(f" ❌ {os.path.basename(fp)}: {str(e)}")
# 保存日志
with open(self.log_file, 'w') as f:
json.dump(self.results, f, indent=2, ensure_ascii=False)
print(f"\n🎉 批量处理完成!")
print(f"✅ 成功: {len(self.results['success'])} 个")
print(f"❌ 失败: {len(self.results['failed'])} 个")
print(f"📋 详细日志: {self.log_file}")
print(f"📁 输出目录: {self.output_dir}")
# 使用示例
remover = AdvancedBatchRemover(
api_key=API_KEY,
input_dir="./watermarked_videos",
output_dir="./clean_videos"
)
remover.run()
Python 批量去水印常见问答
API Key 如何获取?
在 sora2watermarkremover.net 注册账号,在用户面板中获取 API Key。新注册账号提供 10 条免费额度。
支持哪些视频格式?
支持 MP4、MOV、AVI、MKV 等常见视频格式,单个文件最大 200MB。
如何处理超过 200MB 的大视频?
对于超大视频,可以先用 FFmpeg 压缩后再处理:
ffmpeg -i input.mp4 -vf "scale=1920:-1" -c:v libx264 -crf 23 compressed.mp4
并发数设多少合适?
建议从 3-5 开始,根据 API 速率限制调整。过高并发可能导致请求被限流。
代码需要修改哪些地方?
- 替换
API_KEY = "your-api-key-here"为你的真实 API Key - 修改
input_dir和output_dir为你的实际路径 - 根据需求调整
MAX_CONCURRENT并发数
Python 批量去水印使用场景
AI 视频创作者
- Kling AI、Runway、Pika 批量生成的视频统一去水印
- 每日自动生成 → 自动去水印 → 自动发布的完整流水线
营销与电商团队
- 供应商素材批量清理
- 产品视频素材统一处理
- 品牌素材库管理
教育与媒体
- 教学素材批量处理
- 历史视频资料修复
- 内容存档去水印
开发者与自动化
- 集成到数据处理流水线
- 定时任务自动处理
- API 二次封装服务
总结
通过 Python 脚本调用 sora2watermarkremover.net API,你可以轻松实现批量视频去水印的自动化处理。基础版本适合少量视频处理,高级版本支持并发和错误重试,适合大规模生产环境。
$0.5/视频起的价格配合 10 条免费额度,让批量处理的成本非常可控。
免责声明:本文内容仅供参考。使用去水印工具时,请确保你拥有视频的使用权或授权。去除他人版权水印可能违反 DMCA(数字千年版权法)和相关版权法规。如果你是视频的原创作者,去除自己内容上的水印用于个人项目是合理的。建议在商业使用前咨询法律顾问。
作者
Admin
