我有以下方法来创建一个虚拟视频文件:
def create_dummy_mp4_video() -> None:
cmd = (
f"ffmpeg -y " # rewrite if exists
f"-f lavfi -i color=size=100x100:rate=10:color=black " # blank video
f"-f lavfi -i anullsrc=channel_layout=stereo:sample_rate=44100 " # silent audio
f"-t 1 " # video duration, seconds
"output.mp4" # file name
)
proc = subprocess.run(
shlex.split(cmd),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=False,
)
if proc.returncode != 0:
raise Exception()
@dataclass(frozen=True)
class FakeVideo:
body: bytes
width: int
height: int
fps: int
size: int
frames: int
length_s: int
def video() -> FakeVideo:
w, h, fps, sec, filename = 100, 100, 10, 1, "output.mp4"
create_dummy_mp4_video()
video_path = os.path.join(os.getcwd(), filename)
with open(video_path, "rb") as file:
body = file.read()
size = len(body)
frames = fps // sec
return FakeVideo(
body=body, width=w, height=h, fps=fps,
size=size, frames=frames, length_s=sec,
)
然后我想在特定时间提取一个帧,我这样做了:async def run_shell_command(frame_millisecond, data: bytes) -> bytes:
async with aiofiles.tempfile.NamedTemporaryFile("wb") as file:
await file.write(data)
proc = await asyncio.create_subprocess_exec(
"ffmpeg",
"-i",
file.name,
"-ss",
f"{frame_millisecond}ms", # seek the position to the specific millisecond
"-vframes", "1", # only handle one video frame
"-c:v", "png", # select the output encoder
"-f", "image2pipe", "-", # force output file to stdout,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
level = logging.DEBUG if proc.returncode == 0 else logging.WARN
LOGGER.log(level, f"[cmd exited with {proc.returncode}]")
if stderr:
print(level, f"[stderr]{stderr.decode()}")
LOGGER.log(level, f"[stderr]{stderr.decode()}")
return stdout
async def runner():
v = video()
time = int(v.length_s / 2 * 1000)
res = await run_shell_command(time, v.body)
assert isinstance(res, bytes)
assert imghdr.what(h=res, file=None) == "png"
loop = asyncio.get_event_loop()
loop.run_until_complete(runner())
此代码失败并出现以下错误:/tmp/tmpzo786lfg: Invalid data found when processing input
请帮助找出我的代码的问题。在调查过程中,我发现如果我像这样更改视频的大小,它会起作用:
f"-f lavfi -i color=size=1280x720:rate=25:color=black " # blank video
但我希望能够处理任何视频。我使用 ffmpg 4.3.3-0+deb11u1
最佳答案
看起来您必须确保在执行 FFmpeg 之前将数据写入临时文件。
我对 asyncio
没有任何经验和 aiofiles
我正在运行 Windows 10,所以我不确定 Linux 的行为......
我尝试添加 await file.flush()
在 file.write(data)
之后,但FFmpeg的执行结果是“Permission denied
”。
我使用以下 post 中的解决方案解决了它:
delete=False
tempfile.NamedTemporaryFile
的参数: async with aiofiles.tempfile.NamedTemporaryFile("wb", delete=False) as file:
await file.close()
在 await file.write(data)
之后.关闭文件用于确保在执行 FFmpeg 之前将所有数据写入文件。
os.unlink(file.name)
之前 return stdout
.完整代码:
import subprocess
import asyncio
from dataclasses import dataclass
import shlex
import aiofiles
import os
import logging
import imghdr
def create_dummy_mp4_video() -> None:
cmd = (
f"ffmpeg -y " # rewrite if exists
f"-f lavfi -i color=size=100x100:rate=10:color=black " # blank video
f"-f lavfi -i anullsrc=channel_layout=stereo:sample_rate=44100 " # silent audio
f"-t 1 " # video duration, seconds
"output.mp4" # file name
)
proc = subprocess.run(
shlex.split(cmd),
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL, #stderr=subprocess.PIPE,
shell=False,
)
if proc.returncode != 0:
raise Exception()
@dataclass(frozen=True)
class FakeVideo:
body: bytes
width: int
height: int
fps: int
size: int
frames: int
length_s: int
def video() -> FakeVideo:
w, h, fps, sec, filename = 100, 100, 10, 1, "output.mp4"
create_dummy_mp4_video()
video_path = os.path.join(os.getcwd(), filename)
with open(video_path, "rb") as file:
body = file.read()
size = len(body)
frames = fps // sec
return FakeVideo(
body=body, width=w, height=h, fps=fps,
size=size, frames=frames, length_s=sec,
)
async def run_shell_command(frame_millisecond, data: bytes) -> bytes:
# https://stackoverflow.com/questions/23212435/permission-denied-to-write-to-my-temporary-file/23212515
async with aiofiles.tempfile.NamedTemporaryFile("wb", delete=False) as file:
await file.write(data)
#await file.flush() # Flush data to file before executing FFmpeg ?
await file.close() # Close the file before executing FFmpeg.
proc = await asyncio.create_subprocess_exec(
"ffmpeg",
"-i",
file.name,
"-ss",
f"{frame_millisecond}ms", # seek the position to the specific millisecond
"-vframes", "1", # only handle one video frame
"-c:v", "png", # select the output encoder
"-f", "image2pipe", "-", # force output file to stdout,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
level = logging.DEBUG if proc.returncode == 0 else logging.WARN
#LOGGER.log(level, f"[cmd exited with {proc.returncode}]")
if stderr:
print(level, f"[stderr]{stderr.decode()}")
#LOGGER.log(level, f"[stderr]{stderr.decode()}")
os.unlink(file.name) # Unlink is required because delete=False was used
return stdout
async def runner():
v = video()
time = int(v.length_s / 2 * 1000)
res = await run_shell_command(time, v.body)
assert isinstance(res, bytes)
assert imghdr.what(h=res, file=None) == "png"
loop = asyncio.get_event_loop()
loop.run_until_complete(runner())
笔记:
LOGGER
因为我找不到 LOGGER 模块。 关于python - 提取帧失败,出现 : Invalid data found when processing input,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69900935/