The following prorgram shows how to feed data into FFMPEG and get data from FFMPEG and display it. The program is based on Windows Python. Here are the features:
|
import numpy as np import cv2 import subprocess as sp import threading import sys import re import time class VidDecCtx(): FFMPEG_CMD = [ \ 'c:\\Program Files (x86)\\WinFF\\ffmpeg.exe', '-i', 'pipe:', \ '-f', 'rawvideo', '-pix_fmt', 'bgr24', '-an', '-sn', 'pipe:' \ ] DEFAULT_STOP_CNT = 100 READ_BUF_SIZE = 1024 READ_DECODE_BUF_SIZE = 100*1024 def __init__(self): self.process = None self.pthread = None self.buf = bytearray() self.fp = None self.toStopRunCnt = 0 self.isRunning = False def openVidFile(self, vFile): try: self.fp = open(vFile, 'rb') except Exception as e: self.fp = None return False return True def writer(self, toBuffer): while self.isRunning: if not toBuffer and len(self.buf)>0: time.sleep(0) byte_s = self.buf[0:VidDecCtx.READ_BUF_SIZE] self.buf = self.buf[VidDecCtx.READ_BUF_SIZE:] else: byte_s = self.fp.read(VidDecCtx.READ_BUF_SIZE) if toBuffer: self.buf += bytearray(byte_s) if not byte_s: break self.process.stdin.write(byte_s) self.toStopRunCnt = (self.toStopRunCnt-1) if self.toStopRunCnt>0 else 0 if self.toStopRunCnt==1: break self.process.stdin.close() self.toStopRunCnt = 0 self.isRunning = False def prepareGetDim(self): self.process = sp.Popen(VidDecCtx.FFMPEG_CMD, stdin=sp.PIPE, stderr=sp.STDOUT, \ stdout=sp.PIPE, bufsize=VidDecCtx.READ_DECODE_BUF_SIZE) self.isRunning = True self.pthread = threading.Thread(target=self.writer, args=[True]) self.pthread.start() def prepareDecode(self): self.process = sp.Popen(VidDecCtx.FFMPEG_CMD, stdin=sp.PIPE, stderr=sp.DEVNULL, \ stdout=sp.PIPE, bufsize=VidDecCtx.READ_BUF_SIZE) self.isRunning = True self.pthread = threading.Thread(target=self.writer, args=[False]) self.pthread.start() def stopThread(self): # need to continue to feed some data so that can quit gracefully self.toStopRunCnt = VidDecCtx.DEFAULT_STOP_CNT def cleanupThread(self): if self.pthread is not None: self.pthread.join() self.pthread=None if self.process is not None: try: self.process.wait(0.1) except (sp.TimeoutExpired): self.process.kill() self.process = None def finish(self): if self.fp is not None: self.fp.close() self.fp = None class LineBuffer(): def __init__(self): self.strBuf = bytearray() self.prevStrBufSearchIdx = 0 def feedBytes(self, byte_s): # Extract printable characters, and process line bye line tmp = filter(lambda x: x==0xa or x==0xd or (x>=0x20 and x<=0x7f), byte_s) self.strBuf += bytearray(tmp) def getLine(self): tmp=self.strBuf.find(0xa, self.prevStrBufSearchIdx) if tmp==-1: self.prevStrBufSearchIdx = len(self.strBuf) return None else: # do something to self.strBuf[:tmp] tmpStr=self.strBuf[:tmp].decode() self.strBuf = self.strBuf[tmp+1:] self.prevStrBufSearchIdx=0 return tmpStr if __name__ == "__main__": if len(sys.argv)!=2: print("Usage: c:\Python\python37\python %s vFile"%sys.argv[0], file=sys.stderr) sys.exit() vDCtx = VidDecCtx() lineBuf = LineBuffer() if not vDCtx.openVidFile(sys.argv[1]): print("Failed to open %s"%sys.argv[1], file=sys.stderr) sys.exit() width = None height = None bufSize = 1024 dimRegEx = re.compile(' *?Stream #.*?Video:.*?, *(\d+)x(\d+),') ######################################### # get dimension vDCtx.prepareGetDim() while vDCtx.isRunning: in_bytes = vDCtx.process.stdout.read(bufSize) if not in_bytes: break; if width is None: lineBuf.feedBytes(in_bytes) while True: tmpStr=lineBuf.getLine() if tmpStr is None: break tmpMatch=dimRegEx.match(tmpStr) if tmpMatch is not None: width=int(tmpMatch.group(1)) height=int(tmpMatch.group(2)) vDCtx.stopThread() break vDCtx.cleanupThread() if width is None: print("Failed to get the dimension of video", file=sys.stderr) sys.exit() print("Video dimension: (%d,%d)"%(width, height), file=sys.stderr) print("Buffered video data: %d"%(len(vDCtx.buf)), file=sys.stderr) ######################################### # decoding bufSize=width*height*3 vDCtx.prepareDecode() while vDCtx.isRunning: in_bytes = vDCtx.process.stdout.read(bufSize) if not in_bytes: break; # Transform the byte read into a NumPy array in_frame = (np.frombuffer(in_bytes, np.uint8).reshape([height, width, 3])) # Display the frame (for testing) cv2.imshow('in_frame', in_frame) if cv2.waitKey(1) & 0xFF == ord('q'): vDCtx.stopThread() vDCtx.cleanupThread() vDCtx.finish() |
本來這個blog是記錄開發輸入法的點滴的,後來越來越雜,現在什麼都記錄了。
2020年8月23日 星期日
How to use ffmpeg to convert and display video in Python (Windows)?
訂閱:
張貼留言 (Atom)
沒有留言:
張貼留言