Python 批量去水印脚本教程:自动化处理上百个视频

15 days ago

Python 批量去水印脚本教程:自动化处理上百个视频

对于需要频繁处理大量视频水印的开发者和自动化用户来说,手动逐个上传去水印显然效率太低。通过 sora2watermarkremover.net 的 API 接口,你可以编写 Python 脚本实现批量上传、自动处理、自动下载的完整流水线。

本文提供完整的 Python 代码示例,从零开始搭建批量去水印自动化流程。


为什么用 Python 批量去水印?

场景需求

以下场景特别适合批量去水印:

  • AI 视频批量生成:使用 Kling AI、Runway、Pika 等工具批量生成的视频,每个都带有水印
  • 素材库整理:公司素材库中有大量带水印的历史视频需要统一处理
  • 社交媒体批量发布:从多个平台下载的素材需要统一清理水印后重新发布
  • 自动化工作流:将去水印集成到 CI/CD 或日常数据处理流水线中

API 优势

sora2watermarkremover.net 的 API 支持:

  • 批量上传视频文件(支持 MP4、MOV,最大 200MB/文件)
  • 异步处理,无需等待即可查询状态
  • 自动识别水印位置(Auto 模式)
  • 1080p 高质量输出
  • $0.5/视频起,10 条免费额度

环境准备

系统要求

安装依赖

pip install requests tqdm
  • requests:HTTP 请求库
  • tqdm:进度条显示

完整 Python 批量去水印代码

基础版本:单个视频去水印

import requests
import time

API_KEY = "your-api-key-here"
BASE_URL = "https://api.sora2watermarkremover.net"

def upload_video(api_key, file_path):
    """上传视频文件"""
    url = f"{BASE_URL}/upload"
    headers = {"Authorization": f"Bearer {api_key}"}
    
    with open(file_path, 'rb') as f:
        files = {'file': (file_path, f, 'video/mp4')}
        response = requests.post(url, headers=headers, files=files)
    
    result = response.json()
    if response.status_code == 200:
        print(f"✅ 上传成功,任务ID: {result['task_id']}")
        return result['task_id']
    else:
        print(f"❌ 上传失败: {result.get('error', '未知错误')}")
        return None

def check_status(api_key, task_id):
    """查询任务状态"""
    url = f"{BASE_URL}/status/{task_id}"
    headers = {"Authorization": f"Bearer {api_key}"}
    response = requests.get(url, headers=headers)
    return response.json()

def download_result(api_key, task_id, output_path):
    """下载处理结果"""
    url = f"{BASE_URL}/download/{task_id}"
    headers = {"Authorization": f"Bearer {api_key}"}
    response = requests.get(url, headers=headers, stream=True)
    
    with open(output_path, 'wb') as f:
        for chunk in response.iter_content(chunk_size=8192):
            f.write(chunk)
    
    print(f"✅ 已下载至: {output_path}")

# 使用示例
task_id = upload_video(API_KEY, "video_watermarked.mp4")
if task_id:
    # 等待处理完成
    while True:
        status = check_status(API_KEY, task_id)
        if status['status'] == 'completed':
            break
        elif status['status'] == 'failed':
            print(f"处理失败: {status.get('error')}")
            break
        print(f"处理中... 进度: {status.get('progress', 'unknown')}")
        time.sleep(5)
    
    # 下载结果
    download_result(API_KEY, task_id, "video_clean.mp4")

批量版本:处理文件夹中所有视频

import os
import requests
import time
import glob
from tqdm import tqdm

API_KEY = "your-api-key-here"
BASE_URL = "https://api.sora2watermarkremover.net"

class BatchWatermarkRemover:
    """批量去水印处理器"""
    
    def __init__(self, api_key, input_dir, output_dir):
        self.api_key = api_key
        self.input_dir = input_dir
        self.output_dir = output_dir
        self.headers = {"Authorization": f"Bearer {api_key}"}
        os.makedirs(output_dir, exist_ok=True)
    
    def upload_video(self, file_path):
        """上传单个视频"""
        url = f"{BASE_URL}/upload"
        with open(file_path, 'rb') as f:
            files = {'file': (os.path.basename(file_path), f, 'video/mp4')}
            response = requests.post(url, headers=self.headers, files=files)
        return response.json()
    
    def check_status(self, task_id):
        """查询任务状态"""
        url = f"{BASE_URL}/status/{task_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()
    
    def download_result(self, task_id, output_path):
        """下载处理结果"""
        url = f"{BASE_URL}/download/{task_id}"
        response = requests.get(url, headers=self.headers, stream=True)
        with open(output_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
    
    def process_batch(self, file_pattern="*.mp4"):
        """批量处理文件夹中所有视频"""
        video_files = glob.glob(os.path.join(self.input_dir, file_pattern))
        video_files.extend(glob.glob(os.path.join(self.input_dir, "*.MOV")))
        video_files.extend(glob.glob(os.path.join(self.input_dir, "*.mov")))
        video_files = list(set(video_files))  # 去重
        
        print(f"📂 找到 {len(video_files)} 个视频文件")
        tasks = []
        
        # 第一阶段:批量上传
        print("📤 阶段 1: 批量上传...")
        for file_path in tqdm(video_files, desc="上传中"):
            result = self.upload_video(file_path)
            if result.get('task_id'):
                filename = os.path.basename(file_path)
                name, ext = os.path.splitext(filename)
                output_name = f"{name}_clean{ext}"
                output_path = os.path.join(self.output_dir, output_name)
                tasks.append((result['task_id'], file_path, output_path))
                tqdm.write(f"  ✅ {filename} → 任务ID: {result['task_id']}")
            else:
                tqdm.write(f"  ❌ {file_path} 上传失败: {result.get('error')}")
        
        # 第二阶段:等待处理
        print("⏳ 阶段 2: 等待处理完成...")
        for task_id, file_path, output_path in tasks:
            filename = os.path.basename(file_path)
            while True:
                status = self.check_status(task_id)
                if status['status'] == 'completed':
                    break
                elif status['status'] == 'failed':
                    tqdm.write(f"  ❌ {filename} 处理失败: {status.get('error')}")
                    break
                time.sleep(5)
            tqdm.write(f"  ✅ {filename} 处理完成")
        
        # 第三阶段:批量下载
        print("📥 阶段 3: 批量下载...")
        for task_id, file_path, output_path in tqdm(tasks, desc="下载中"):
            self.download_result(task_id, output_path)
            tqdm.write(f"  ✅ 已下载: {os.path.basename(output_path)}")
        
        print(f"\n🎉 批量处理完成!共处理 {len(tasks)} 个视频")
        print(f"📁 输出目录: {self.output_dir}")

# 使用示例
remover = BatchWatermarkRemover(
    api_key=API_KEY,
    input_dir="./watermarked_videos",
    output_dir="./clean_videos"
)
remover.process_batch()

高级版本:并发处理 + 错误重试

import os
import requests
import time
import glob
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

API_KEY = "your-api-key-here"
BASE_URL = "https://api.sora2watermarkremover.net"
MAX_CONCURRENT = 5  # 最大并发数
MAX_RETRIES = 3     # 最大重试次数

class AdvancedBatchRemover:
    """高级批量去水印处理器(支持并发和重试)"""
    
    def __init__(self, api_key, input_dir, output_dir):
        self.api_key = api_key
        self.input_dir = input_dir
        self.output_dir = output_dir
        self.headers = {"Authorization": f"Bearer {api_key}"}
        self.session = requests.Session()
        self.session.headers.update(self.headers)
        os.makedirs(output_dir, exist_ok=True)
        self.log_file = os.path.join(output_dir, "batch_log.json")
        self.results = {"success": [], "failed": []}
    
    def upload_with_retry(self, file_path, retries=MAX_RETRIES):
        """带重试的上传"""
        for attempt in range(1, retries + 1):
            try:
                with open(file_path, 'rb') as f:
                    files = {'file': (os.path.basename(file_path), f, 'video/mp4')}
                    response = self.session.post(
                        f"{BASE_URL}/upload",
                        files=files,
                        timeout=120
                    )
                return response.json()
            except Exception as e:
                if attempt == retries:
                    return {"error": f"上传失败 ({attempt}次重试): {str(e)}"}
                time.sleep(2 ** attempt)  # 指数退避
        
    def wait_for_completion(self, task_id, timeout=600):
        """等待任务完成"""
        start_time = time.time()
        while time.time() - start_time < timeout:
            response = self.session.get(f"{BASE_URL}/status/{task_id}")
            status = response.json()
            if status['status'] == 'completed':
                return True, status
            elif status['status'] == 'failed':
                return False, status
            time.sleep(5)
        return False, {"error": "处理超时"}
    
    def download_with_retry(self, task_id, output_path, retries=MAX_RETRIES):
        """带重试的下载"""
        for attempt in range(1, retries + 1):
            try:
                response = self.session.get(
                    f"{BASE_URL}/download/{task_id}",
                    stream=True,
                    timeout=300
                )
                with open(output_path, 'wb') as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        f.write(chunk)
                return True
            except Exception as e:
                if attempt == retries:
                    return False
                time.sleep(2 ** attempt)
        return False
    
    def process_video(self, file_path):
        """处理单个视频的完整流程"""
        filename = os.path.basename(file_path)
        name, ext = os.path.splitext(filename)
        output_name = f"{name}_clean{ext}"
        output_path = os.path.join(self.output_dir, output_name)
        
        # 上传
        result = self.upload_with_retry(file_path)
        if 'task_id' not in result:
            self.results['failed'].append({
                'file': filename,
                'error': result.get('error', '上传失败')
            })
            return False
        
        task_id = result['task_id']
        
        # 等待完成
        success, status = self.wait_for_completion(task_id)
        if not success:
            self.results['failed'].append({
                'file': filename,
                'error': status.get('error', '处理失败')
            })
            return False
        
        # 下载
        downloaded = self.download_with_retry(task_id, output_path)
        if not downloaded:
            self.results['failed'].append({
                'file': filename,
                'error': '下载失败'
            })
            return False
        
        self.results['success'].append({
            'file': filename,
            'output': output_name,
            'task_id': task_id
        })
        return True
    
    def run(self, file_pattern="*.mp4"):
        """运行批量处理"""
        # 收集所有视频文件
        patterns = ["*.mp4", "*.MP4", "*.mov", "*.MOV", "*.avi", "*.AVI"]
        video_files = []
        for pattern in patterns:
            video_files.extend(glob.glob(os.path.join(self.input_dir, pattern)))
        video_files = list(set(video_files))
        
        print(f"📂 找到 {len(video_files)} 个视频文件")
        print(f"🔧 最大并发: {MAX_CONCURRENT}, 最大重试: {MAX_RETRIES}")
        
        # 并发处理
        with ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as executor:
            futures = {
                executor.submit(self.process_video, fp): fp 
                for fp in video_files
            }
            
            for future in tqdm(as_completed(futures), total=len(futures), desc="处理进度"):
                fp = futures[future]
                try:
                    result = future.result()
                    status = "✅" if result else "❌"
                    tqdm.write(f"  {status} {os.path.basename(fp)}")
                except Exception as e:
                    tqdm.write(f"  ❌ {os.path.basename(fp)}: {str(e)}")
        
        # 保存日志
        with open(self.log_file, 'w') as f:
            json.dump(self.results, f, indent=2, ensure_ascii=False)
        
        print(f"\n🎉 批量处理完成!")
        print(f"✅ 成功: {len(self.results['success'])} 个")
        print(f"❌ 失败: {len(self.results['failed'])} 个")
        print(f"📋 详细日志: {self.log_file}")
        print(f"📁 输出目录: {self.output_dir}")

# 使用示例
remover = AdvancedBatchRemover(
    api_key=API_KEY,
    input_dir="./watermarked_videos",
    output_dir="./clean_videos"
)
remover.run()

Python 批量去水印常见问答

API Key 如何获取?

sora2watermarkremover.net 注册账号,在用户面板中获取 API Key。新注册账号提供 10 条免费额度。

支持哪些视频格式?

支持 MP4、MOV、AVI、MKV 等常见视频格式,单个文件最大 200MB。

如何处理超过 200MB 的大视频?

对于超大视频,可以先用 FFmpeg 压缩后再处理:

ffmpeg -i input.mp4 -vf "scale=1920:-1" -c:v libx264 -crf 23 compressed.mp4

并发数设多少合适?

建议从 3-5 开始,根据 API 速率限制调整。过高并发可能导致请求被限流。

代码需要修改哪些地方?

  1. 替换 API_KEY = "your-api-key-here" 为你的真实 API Key
  2. 修改 input_diroutput_dir 为你的实际路径
  3. 根据需求调整 MAX_CONCURRENT 并发数

Python 批量去水印使用场景

AI 视频创作者

  • Kling AI、Runway、Pika 批量生成的视频统一去水印
  • 每日自动生成 → 自动去水印 → 自动发布的完整流水线

营销与电商团队

  • 供应商素材批量清理
  • 产品视频素材统一处理
  • 品牌素材库管理

教育与媒体

  • 教学素材批量处理
  • 历史视频资料修复
  • 内容存档去水印

开发者与自动化

  • 集成到数据处理流水线
  • 定时任务自动处理
  • API 二次封装服务

总结

通过 Python 脚本调用 sora2watermarkremover.net API,你可以轻松实现批量视频去水印的自动化处理。基础版本适合少量视频处理,高级版本支持并发和错误重试,适合大规模生产环境。

$0.5/视频起的价格配合 10 条免费额度,让批量处理的成本非常可控。


免责声明:本文内容仅供参考。使用去水印工具时,请确保你拥有视频的使用权或授权。去除他人版权水印可能违反 DMCA(数字千年版权法)和相关版权法规。如果你是视频的原创作者,去除自己内容上的水印用于个人项目是合理的。建议在商业使用前咨询法律顾问。

作者
Admin
分类