Efficiente streaming della telecamera con Python

Streaming telecamera Python

Foto di Rahul Chakraborty su Unsplash

Parliamo dell’utilizzo delle webcam con Python. Avevo un compito semplice: leggere i frame dalla telecamera e far girare una rete neurale su ogni frame. Con una webcam specifica, stavo avendo problemi nell’impostare gli fps desiderati (come capisco ora — perché la telecamera poteva funzionare a 30 fps con il formato mjpeg, ma non raw), così ho deciso di approfondire FFmpeg per vedere se poteva aiutare.

Alla fine sono riuscito a far funzionare sia OpenCV che FFmpeg, ma ho scoperto una cosa molto interessante: le prestazioni di FFmpeg erano superiori a quelle di OpenCV nel mio caso d’uso principale. Infatti, con FFmpeg, ho avuto un incremento di velocità del 15x per la lettura del frame e un incremento di velocità del 32% per l’intero flusso di lavoro. Non riuscivo a credere ai risultati e li ho ricontrollati diverse volte, ma erano coerenti.

Nota: le prestazioni erano esattamente le stesse quando leggevo semplicemente frame dopo frame, ma FFmpeg era più veloce quando eseguivo qualcosa dopo la lettura del frame (cosa che richiede tempo). Mostrerò esattamente cosa intendo di seguito.

Ora, diamo un’occhiata al codice. Iniziamo con la classe per la lettura dei frame dalla webcam con OpenCV:

class VideoStreamCV:    def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):        self.src = src        self.fps = fps        self.resolution = resolution        self.cap = self._open_camera()        self.wait_for_cam()    def _open_camera(self):        cap = cv2.VideoCapture(self.src)        cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])        fourcc = cv2.VideoWriter_fourcc(*"MJPG")        cap.set(cv2.CAP_PROP_FOURCC, fourcc)        cap.set(cv2.CAP_PROP_FPS, self.fps)        return cap    def read(self):        ret, frame = self.cap.read()        if not ret:            return None        return frame    def release(self):        self.cap.release()    def wait_for_cam(self):        for _ in range(30):            frame = self.read()        if frame is not None:            return True        return False

Utilizzo la funzione wait_for_cam poiché le telecamere spesso hanno bisogno di tempo per “scaldarsi”. Lo stesso riscaldamento viene utilizzato con la classe FFmpeg:

class VideoStreamFFmpeg:    def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):        self.src = src        self.fps = fps        self.resolution = resolution        self.pipe = self._open_ffmpeg()        self.frame_shape = (self.resolution[1], self.resolution[0], 3)        self.frame_size = np.prod(self.frame_shape)        self.wait_for_cam()    def _open_ffmpeg(self):        os_name = platform.system()        if os_name == "Darwin":  # macOS            input_format = "avfoundation"            video_device = f"{self.src}:none"        elif os_name == "Linux":            input_format = "v4l2"            video_device = f"{self.src}"        elif os_name == "Windows":            input_format = "dshow"            video_device = f"video={self.src}"        else:            raise ValueError("Sistema operativo non supportato")        command = [            'ffmpeg',            '-f', input_format,            '-r', str(self.fps),            '-video_size', f'{self.resolution[0]}x{self.resolution[1]}',            '-i', video_device,            '-vcodec', 'mjpeg',  # Imposta il codec di input su mjpeg            '-an', '-vcodec', 'rawvideo',  # Decodifica lo stream MJPEG in video raw            '-pix_fmt', 'bgr24',            '-vsync', '2',            '-f', 'image2pipe', '-'        ]        if os_name == "Linux":            command.insert(2, "-input_format")            command.insert(3, "mjpeg")        return subprocess.Popen(            command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8        )    def read(self):        raw_image = self.pipe.stdout.read(self.frame_size)        if len(raw_image) != self.frame_size:            return None        image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape)        return image    def release(self):        self.pipe.terminate()    def wait_for_cam(self):        for _ in range(30):            frame = self.read()        if frame is not None:            return True        return False

Per misurare il tempo di esecuzione della funzione run, ho utilizzato un decoratore:

def timeit(func):    def wrapper(*args, **kwargs):        t0 = time.perf_counter()        result = func(*args, **kwargs)        t1 = time.perf_counter()        print(f"Tempo della funzione principale: {round(t1-t0, 4)}s")        return result    return wrapper

Come compito sintetico pesante, invece di una rete neurale, ho utilizzato questa funzione semplice (potrebbe anche essere solo time.sleep). Questa è una parte molto importante, poiché senza alcun compito, le velocità di lettura sono le stesse sia per OpenCV che per FFmpeg:

def computation_task():    for _ in range(5000000):        9999 * 9999

Ora la funzione con un ciclo in cui leggo il frame, misuro il tempo, eseguo computation_task:

@timeitdef run(cam: VideoStreamCV | VideoStreamFFmpeg, run_task: bool):    timer = []    for _ in range(100):        t0 = time.perf_counter()        cam.read()        timer.append(time.perf_counter() - t0)        if run_task:            computation_task()    cam.release()    return round(np.mean(timer), 4)

E infine la funzione main in cui imposto un paio di parametri, inizializzo 2 video stream con OpenCV e FFmpeg, e li eseguo senza computation_task e con essa.

def main():    fsp = 30    resolution = (1920, 1080)    for run_task in [False, True]:        ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution)        cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution)        print(f"FFMPEG, compito {run_task}:")        print(f"Tempo medio di lettura del frame: {run(cam=ff_cam, run_task=run_task)}s\n")        print(f"CV2, compito {run_task}:")        print(f"Tempo medio di lettura del frame: {run(cam=cv_cam, run_task=run_task)}s\n")

Ecco cosa ottengo:

FFMPEG, compito False:Tempo della funzione principale: 3.2334sTempo medio di lettura del frame: 0.0323sCV2, compito False:Tempo della funzione principale: 3.3934sTempo medio di lettura del frame: 0.0332sFFMPEG, compito True:Tempo della funzione principale: 4.461sTempo medio di lettura del frame: 0.0014sCV2, compito True:Tempo della funzione principale: 6.6833sTempo medio di lettura del frame: 0.023s

Quindi, senza un compito sintetico, ottengo lo stesso tempo di lettura: 0.0323, 0.0332. Ma con compito sintetico: 0.0014 e 0.023, quindi FFmpeg è significativamente più veloce. La bellezza è che ho ottenuto un vero miglioramento delle prestazioni con la mia applicazione di rete neurale, non solo con test sintetici, quindi ho deciso di condividere i risultati.

Ecco un grafico che mostra quanto tempo impiega 1 iterazione: leggere il frame, elaborarlo con un modello yolov8s (su CPU) e salvare i frame con gli oggetti rilevati:

Ecco uno script completo con test sintetici:

import platformimport subprocessimport timefrom typing import Tupleimport cv2import numpy as npclass VideoStreamFFmpeg:    def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):        self.src = src        self.fps = fps        self.resolution = resolution        self.pipe = self._open_ffmpeg()        self.frame_shape = (self.resolution[1], self.resolution[0], 3)        self.frame_size = np.prod(self.frame_shape)        self.wait_for_cam()    def _open_ffmpeg(self):        os_name = platform.system()        if os_name == "Darwin":  # macOS            input_format = "avfoundation"            video_device = f"{self.src}:none"        elif os_name == "Linux":            input_format = "v4l2"            video_device = f"{self.src}"        elif os_name == "Windows":            input_format = "dshow"            video_device = f"video={self.src}"        else:            raise ValueError("Sistema operativo non supportato")        command = [            'ffmpeg',            '-f', input_format,            '-r', str(self.fps),            '-video_size', f'{self.resolution[0]}x{self.resolution[1]}',            '-i', video_device,            '-vcodec', 'mjpeg',  # Codec di input impostato su mjpeg            '-an', '-vcodec', 'rawvideo',  # Decodifica lo stream MJPEG in video grezzo            '-pix_fmt', 'bgr24',            '-vsync', '2',            '-f', 'image2pipe', '-'        ]        if os_name == "Linux":            command.insert(2, "-input_format")            command.insert(3, "mjpeg")        return subprocess.Popen(            command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8        )    def read(self):        raw_image = self.pipe.stdout.read(self.frame_size)        if len(raw_image) != self.frame_size:            return None        image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape)        return image    def release(self):        self.pipe.terminate()    def wait_for_cam(self):        for _ in range(30):            frame = self.read()        if frame is not None:            return True        return Falseclass VideoStreamCV:    def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):        self.src = src        self.fps = fps        self.resolution = resolution        self.cap = self._open_camera()        self.wait_for_cam()    def _open_camera(self):        cap = cv2.VideoCapture(self.src)        cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])        fourcc = cv2.VideoWriter_fourcc(*"MJPG")        cap.set(cv2.CAP_PROP_FOURCC, fourcc)        cap.set(cv2.CAP_PROP_FPS, self.fps)        return cap    def read(self):        ret, frame = self.cap.read()        if not ret:            return None        return frame    def release(self):        self.cap.release()    def wait_for_cam(self):        for _ in range(30):            frame = self.read()        if frame is not None:            return True        return Falsedef timeit(func):    def wrapper(*args, **kwargs):        t0 = time.perf_counter()        result = func(*args, **kwargs)        t1 = time.perf_counter()        print(f"Tempo della funzione principale: {round(t1-t0, 4)}s")        return result    return wrapperdef computation_task():    for _ in range(5000000):        9999 * 9999@timeitdef run(cam: VideoStreamCV | VideoStreamFFmpeg, run_task: bool):    timer = []    for _ in range(100):        t0 = time.perf_counter()        cam.read()        timer.append(time.perf_counter() - t0)        if run_task:            computation_task()    cam.release()    return round(np.mean(timer), 4)def main():    fsp = 30    resolution = (1920, 1080)    for run_task in [False, True]:        ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution)        cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution)        print(f"FFMPEG, compito {run_task}:")        print(f"Tempo medio di lettura del frame: {run(cam=ff_cam, run_task=run_task)}s\n")        print(f"CV2, compito {run_task}:")        print(f"Tempo medio di lettura del frame: {run(cam=cv_cam, run_task=run_task)}s\n")if __name__ == "__main__":    main()

Nota: Questo script è stato testato su un chip M1 Pro di Apple. Spero che sia stato utile!