Source code for pynq_composable.video

# Copyright (C) 2021 Xilinx, Inc
#
# SPDX-License-Identifier: BSD-3-Clause

import asyncio
import cv2
from enum import Enum, auto
import json
import os
from pynq import Overlay
from pynq.lib.video import DrmDriver, VideoMode, PIXEL_RGB
from pynq.lib.video.clocks import *
from pynq.ps import CPU_ARCH, ZU_ARCH, ZYNQ_ARCH
from time import sleep
import threading
from typing import Union


__author__ = "Mario Ruiz"
__copyright__ = "Copyright 2021, Xilinx"
__email__ = "pynq_support@xilinx.com"


"""Collection of classes to manage different video sources"""


[docs]class VSource(Enum): """Suported input video sources""" OpenCV = auto() HDMI = auto() MIPI = auto()
[docs]class VSink(Enum): """Suported output video sinks""" HDMI = auto() DP = auto()
class _DisplayPort(DrmDriver): """Subclass of DisplayPort that works in a thread""" def __init__(self): """Create a new driver instance bound to card0 which should always be the hardened DisplayPort """ super().__init__('/dev/dri/card0') def writeframe(self, frame): """Write a frame to the display. Raises an exception if the operation fails and blocks until a page-flip if there is already a frame scheduled to be displayed. Parameters ---------- frame : pynq.ContiguousArray Frame to write - must have been created by `newframe` """ ret = self._videolib.pynqvideo_frame_write( self._device, frame.pointer) if ret == -1: self._loop.run_until_complete( asyncio.ensure_future(self.writeframe_async(frame))) elif ret > 0: raise OSError(ret, "Can't write frame") else: # Frame should no longer be disposed frame.pointer = None
[docs]class PLPLVideo: """PLPLVideo class Handles video streams that start in the PL and end in the PL """ def __init__(self, ol: Overlay, source: VSource = VSource.HDMI) -> None: """Return a PLVideo object to handle the video path Parameters ---------- ol : pynq.Overlay Overlay object source : str (optional) Input video source. Valid values [VSource.HDMI, VSource.MIPI] """ VSourceources = [VSource.HDMI, VSource.MIPI] if source not in VSourceources: raise ValueError("{} is not supported".format(source)) elif ol.device.name != 'Pynq-ZU' and source != VSource.HDMI: raise ValueError("Device {} only supports {} as input source " .format(ol.device.name, VSource.HDMI.name)) self._hdmi_out = ol.video.hdmi_out self._source = source self._started = None if ol.device.name == 'Pynq-ZU': # Deassert HDMI clock reset ol.hdmi_tx_control.channel2[0].write(1) # Wait 200 ms for the clock to come out of reset sleep(0.2) ol.video.phy.vid_phy_controller.initialize() if self._source == VSource.HDMI: self._source_in = ol.video.hdmi_in self._source_in.frontend.set_phy( ol.video.phy.vid_phy_controller) else: self._source_in = ol.mipi self._hdmi_out.frontend.set_phy(ol.video.phy.vid_phy_controller) dp159 = DP159(ol.HDMI_CTL_axi_iic, 0x5C) si = SI_5324C(ol.HDMI_CTL_axi_iic, 0x68) self._hdmi_out.frontend.clocks = [dp159, si] if (ol.hdmi_tx_control.read(0)) == 0: ol.hdmi_tx_control.write(0, 1) else: self._source_in = ol.video.hdmi_in
[docs] def start(self): """Configure and start the Video source Configures hdmi_in or mipi and hdmi_out. Then starts the source and sink, and finally tie them together """ if not self._started: if self._source == VSource.HDMI: self._source_in.configure() else: self._source_in.configure(VideoMode(1280, 720, 24, 60)) self._hdmi_out.configure(self._source_in.mode) self._source_in.start() self._hdmi_out.start() self._source_in.tie(self._hdmi_out) self._started = True
[docs] def stop(self): """Closes source and sink""" if self._started: self._hdmi_out.close() self._source_in.close() self._started = False
@property def modein(self): """Return input video source mode""" return self._source_in.mode @property def modeout(self): """Return output video sink mode""" return self._hdmi_out.mode
[docs]class PLDPVideo: """Wrapper for PL Video stream sources that sink on DisplayPort """ def __init__(self, ol: Overlay, source: VSource = VSource.HDMI) -> None: """Return a PLDP object to handle the video path Parameters ---------- ol : pynq.Overlay Overlay object source : str (optional) Input video source. Valid values [VSource.HDMI, VSource.MIPI] """ VSourceources = [VSource.HDMI, VSource.MIPI] if source not in VSourceources: raise ValueError("{} is not supported".format(source)) if CPU_ARCH != ZU_ARCH: raise RuntimeError("Device {} does not support DisplayPort" .format(ol.device.name)) self._source = source self._started = None self._pause = None self._dp = _DisplayPort() self._running = None if self._source == VSource.HDMI: self._source_in = ol.video.hdmi_in self._source_in.frontend.set_phy(ol.video.phy.vid_phy_controller) else: self._source_in = ol.mipi
[docs] def start(self): """Configure and start the HDMI""" if not self._started: if self._source == VSource.HDMI: self._source_in.configure() videomode = self._source_in.mode else: videomode = VideoMode(1280, 720, 24, 60) self._source_in.configure(videomode) self._dp.configure(videomode, PIXEL_RGB) self._source_in.start() self._started = True sleep(0.2) self._tie() elif self._pause: self._tie() self._pause = None
[docs] def stop(self): """Stop the HDMI""" if self._started: self._running = False while self._thread.is_alive(): sleep(0.05) self._source_in.close() self._dp.stop() self._started = False self._pause = False
def _tie(self): """Mirror the video stream input to an output channel""" self._thread = threading.Thread(target=self._tievdma, daemon=True) self._running = True try: self._thread.start() except Exception: import traceback print(traceback.format_exc()) raise ValueError("error starting new thread") def _tievdma(self): """Threaded method to implement tie""" while self._running: try: dpframe = self._dp.newframe() dpframe[:] = self._source_in.readframe() self._dp.writeframe(dpframe) sleep(0.07) except Exception as e: print('An exception occurred: {}'.format(e)) import traceback import logging logging.error(traceback.format_exc()) self._running = False @property def modein(self): """Return input video source mode""" return self._source_in.mode @property def modeout(self): """Return output video sink mode""" return self._dp.mode
[docs]class OpenCVPLVideo: """Wrapper for a OpenCV video stream pipeline that sinks on PL""" def __init__(self, ol: Overlay, filename: Union[int, str], mode=VideoMode(1280, 720, 24, 60)): """ Returns a OpenCVPL object Parameters ---------- filename : [int, str] video filename mode : VideoMode video configuration """ if not isinstance(filename, str) and not isinstance(filename, int): raise ValueError("filename ({}) is not an string or integer" .format(filename)) if isinstance(filename, str) and not os.path.exists(filename): raise RuntimeError("File {} does not exists".format(filename)) self._file = filename self._hdmi_out = ol.video.hdmi_out self._videoIn = None self.mode = mode self._running = None self._started = None if ol.device.name == 'Pynq-ZU': # Deassert HDMI clock reset ol.hdmi_tx_control.channel2[0].write(1) # Wait 200 ms for the clock to come out of reset sleep(0.2) ol.video.phy.vid_phy_controller.initialize() self._hdmi_out.frontend.set_phy(ol.video.phy.vid_phy_controller) dp159 = DP159(ol.HDMI_CTL_axi_iic, 0x5C) si = SI_5324C(ol.HDMI_CTL_axi_iic, 0x68) self._hdmi_out.frontend.clocks = [dp159, si] if (ol.hdmi_tx_control.read(0)) == 0: ol.hdmi_tx_control.write(0, 1) def _configure(self): """Add cv2.CAP_V4L2 to make sure V4L2 libraries are used""" if isinstance(self._file, int): self._file += cv2.CAP_V4L2 self._videoIn = cv2.VideoCapture(self._file) if not self._videoIn: raise RuntimeError("OpenCV can't open {}".format(self._file)) self._videoIn.set(cv2.CAP_PROP_FRAME_WIDTH, self.mode.width) self._videoIn.set(cv2.CAP_PROP_FRAME_HEIGHT, self.mode.height) fourcc = int(self._videoIn.get(cv2.CAP_PROP_FOURCC)) mode = \ fourcc.to_bytes((fourcc.bit_length() + 7) // 8, 'little').decode() if isinstance(self._file, int): if mode != 'MJPG': self._videoIn.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')) self._videoIn.set(cv2.CAP_PROP_FPS, self.mode.fps) f_reso = (int(self._videoIn.get(cv2.CAP_PROP_FRAME_WIDTH)), int(self._videoIn.get(cv2.CAP_PROP_FRAME_HEIGHT))) v_reso = (self.mode.width, self.mode.height) if f_reso != v_reso: raise RuntimeError("Source {} and sink {} resolution do not match" .format(f_reso, v_reso))
[docs] def start(self): """Start video stream by configuring it""" if not self._started: self._configure() self._hdmi_out.configure(self.mode) self._hdmi_out.start() self._tie() self._started = True elif not self._running: self._tie()
[docs] def stop(self): """Stop the video stream""" if self._videoIn and self._started: self._running = False while self._thread.is_alive(): sleep(0.05) self._videoIn.release() self._hdmi_out.stop() self._videoIn = None self._started = False
[docs] def pause(self): """Pause tie""" if not self._videoIn: raise SystemError("The stream is not started") if self._running: self._running = False
[docs] def close(self): """Uninitialise the drivers, stopping the pipeline beforehand""" self.stop()
[docs] def readframe(self): """Read an image from the video stream""" for _ in range(5): ret, frame = self._videoIn.read() if not ret: self._configure() else: return frame raise RuntimeError("OpenCV can't rewind {}".format(self._file))
def _tie(self): """Mirror the video stream input to an output channel""" if not self._videoIn: raise SystemError("The stream is not started") self._outframe = self._hdmi_out.newframe() self._thread = threading.Thread(target=self._tievdma, daemon=True) self._running = True try: self._thread.start() except Exception: import traceback print(traceback.format_exc()) def _tievdma(self): """Threaded method to implement tie""" while self._running: self._outframe[:] = self.readframe() self._hdmi_out.writeframe(self._outframe)
[docs]class OpenCVDPVideo(OpenCVPLVideo): """Wrapper for a webcam/file video pipeline streamed to DisplayPort""" def __init__(self, ol: Overlay, filename: Union[int, str], mode=VideoMode(1280, 720, 24, 60)): """ Returns a OpenCVDP object Parameters ---------- filename : [int, str] video filename mode : VideoMode webcam configuration vdma : pynq.lib.video.dma.AxiVDMA Xilinx VideoDMA IP core """ if not isinstance(filename, str) and not isinstance(filename, int): raise ValueError("filename ({}) is not an string or integer" .format(filename)) if isinstance(filename, str) and not os.path.exists(filename): raise RuntimeError("File {} does not exists".format(filename)) self._file = filename self.vdma = ol.video.axi_vdma self.mode = mode self._dp = _DisplayPort() if self.vdma: self.vdma.writechannel.mode = self.mode self.vdma.readchannel.mode = self.mode self._running = None self._started = None
[docs] def start(self): """Configure and start the video stream from/to PS""" self._configure() if not self._started: self._dp.configure(self.mode, PIXEL_RGB) self.vdma.writechannel.start() self.vdma.readchannel.start() self._started = True self._tie()
[docs] def stop(self): """Stop video stream""" if self._started: self._running = False while self._thread.is_alive(): sleep(0.05) self.vdma.writechannel.stop() self.vdma.readchannel.stop() self._dp.stop() self._started = False
def _tie(self): """Mirror the video stream input to an output channel""" if not self._videoIn: raise SystemError("The stream is not started") self._thread = threading.Thread(target=self._tievdma, daemon=True) self._running = True try: self._thread.start() except Exception: import traceback print(traceback.format_exc()) raise ValueError("error starting new thread") def _tievdma(self): """Threaded method to implement tie""" while self._running: try: fpgaframe = self.vdma.writechannel.newframe() fpgaframe[:] = self.readframe() self.vdma.writechannel.writeframe(fpgaframe) dpframe = self._dp.newframe() dpframe[:] = self.vdma.readchannel.readframe() self._dp.writeframe(dpframe) except RuntimeError: raise RuntimeError("Can't start thread")
[docs]class VideoStream: """VideoStream class Handles DisplayPort output paths .start: configures hdmi_in and hdmi_out starts them and tie them together .stop: closes hdmi_in and hdmi_out """ _fres = "/tmp/resolution.json" def __init__(self, ol: Overlay, source: VSource = VSource.HDMI, sink: VSink = VSink.HDMI, file: int = 0, mode: VideoMode = None): """Return a HDMIVideo object to handle the video path Parameters ---------- ol : pynq.Overlay Overlay object source : str (optional) Input video source. Valid values [VSource.HDMI, VSource.MIPI] """ if not mode: if CPU_ARCH == ZYNQ_ARCH or source == VSource.MIPI \ or (source == VSource.OpenCV and isinstance(file, int)): mode = VideoMode(1280, 720, 24, 60) else: mode = VideoMode(1920, 1080, 24, 60) if (source == VSource.HDMI or source == VSource.MIPI) and \ sink == VSink.HDMI: self._video = PLPLVideo(ol=ol, source=source) elif (source == VSource.HDMI or source == VSource.MIPI) and \ sink == VSink.DP: self._video = PLDPVideo(ol, source) elif source == VSource.OpenCV and sink == VSink.HDMI: self._video = OpenCVPLVideo(ol, file, mode) elif source == VSource.OpenCV and sink == VSink.DP: self._video = OpenCVDPVideo(ol=ol, filename=file, mode=mode) reso = {"width": mode.width, "height": mode.height, "fps": mode.fps} with open(self._fres, "w", encoding="utf-8") as f: json.dump(reso, f)
[docs] def start(self): """Start the video stream""" self._video.start()
[docs] def stop(self): """Stop the video stream""" if os.path.exists(self._fres): os.remove(self._fres) self._video.stop()
[docs] def pause(self): """Pause the stream""" if hasattr(self._video, "pause"): self._video.pause()
@property def mode(self): """Return mode""" return self._video.mode