Source code for pynq_composable.apps

# Copyright (C) 2021 Xilinx, Inc
# SPDX-License-Identifier: BSD-3-Clause

from pynq import Overlay
from import VideoMode
from .video import VideoStream, VSource, VSink
from .libs import xvF2d, xvLut
from ipywidgets import VBox, HBox, IntRangeSlider, FloatSlider, interact, \
    interactive_output, IntSlider, Dropdown
from IPython.display import display
import numpy as np
from threading import Timer
import glob
import os
import warnings

__author__ = "Mario Ruiz"
__copyright__ = "Copyright 2021, Xilinx"
__email__ = ""

[docs]class PipelineApp: """ Base class to expose common pipeline methods This class wraps the common functionality for the different applications, these methods are: - start: configures and starts the pipeline - stop: stops the pipeline - play: exposes runtime configuration to the user """ _dfx_ip = None def __init__(self, bitfile_name, source: VSource = VSource.HDMI, sink: VSink = VSink.HDMI, file: int = 0, videomode: VideoMode = None): """Return a PipelineApp object Parameters ---------- bitfile_name : str Bitstream filename source : VSource (optional) Input video source. Default VSource.HDMI sink : VSink (optional) Output video sink. Default VSink.HDMI file: int, str (optional) OpenCV input stream. Default 0 If this argument is \'int\' the source is a webcam If this argument is \'str\' the source is a file """ dtbo = glob.glob(os.path.dirname(bitfile_name) + '/*.dtbo') if len(dtbo) > 1: warnings.warn("There are more than one dtbo, dtbo {} is being " "used".format(dtbo[0])) elif not dtbo: dtbo = [None] self._ol = Overlay(bitfile_name, dtbo=dtbo[0]) self._cpipe = self._ol.composable device = if source not in VSource: raise ValueError("source {} is not supported".format(source)) elif (device == 'Pynq-Z2' and source == VSource.MIPI) or\ (device == 'KV260' and source == VSource.HDMI): raise ValueError("Device {} does not support {} as input source " .format(device, if sink not in VSink: raise ValueError("sink {} is not supported".format(sink)) elif device == 'Pynq-Z2' and sink == VSink.DP: raise ValueError("Device {} does not support {} as output sink " .format(device, self._video = VideoStream(self._ol, source, sink, file, videomode) if source == VSource.HDMI: self._vii = self._cpipe.hdmi_source_in self._vio = self._cpipe.hdmi_source_out elif source == VSource.OpenCV and device != "KV260": self._vii = self._cpipe.hdmi_sink_in self._vio = self._cpipe.hdmi_sink_out elif source == VSource.OpenCV: self._vii = self._cpipe.ps_video_in self._vio = self._cpipe.ps_video_out elif source == VSource.MIPI: self._vii = self._cpipe.mipi_in self._vio = self._cpipe.mipi_out self._fi2d0 = self._cpipe.filter2d_accel self._r2g = self._cpipe.rgb2gray_accel self._g2r = self._cpipe.gray2rgb_accel self._r2h = self._cpipe.rgb2hsv_accel self._ct = self._cpipe.colorthresholding_accel self._lut = self._cpipe.lut_accel self._app_pipeline = [self._vii, self._vio]
[docs] def stop(self): """ Stops the pipeline""" self._video.stop()
def _pipeline(self): """ Logic to configure pipeline Each child should extend this method to download partial bitstreams and compose the pipeline """ pass
[docs] def start(self): """ Starts the pipeline Start the HDMI, compose the pipeline and return the graph """ if self._dfx_ip: self._cpipe.loadIP(self._dfx_ip) self._pipeline() self._cpipe.compose(self._app_pipeline) self._video.start() return self.graph
[docs] def play(self): """ Exposes runtime configurations to the user Each child should implement its own version of play to expose runtime configurations to the users """ pass
@property def graph(self): """Return DAG""" return self._cpipe.graph
[docs]class DifferenceGaussians(PipelineApp): """ This class wraps the functionality to implement the Difference of Gaussian filter video pipeline application """ _dfx_ip = [ 'pr_fork/duplicate_accel', 'pr_join/subtract_accel', 'pr_0/filter2d_accel' ] def _pipeline(self): """ Logic to configure pipeline Download partial bitstreams and configure pipeline """ self._dup = self._cpipe.pr_fork.duplicate_accel self._sub = self._cpipe.pr_join.subtract_accel self._fi2d1 = self._cpipe.pr_0.filter2d_accel self.sigma0 = 0.5 self._fi2d0.kernel_type = xvF2d.gaussian_blur self.sigma1 = 7 self._fi2d1.kernel_type = xvF2d.gaussian_blur self._app_pipeline = [self._vii, self._fi2d0, self._dup, [[self._fi2d1], [1]], self._sub, self._vio] def _play(self, sigma0, sigma1): self._fi2d0.sigma = sigma0 self._fi2d1.sigma = sigma1
[docs] def play(self): """ Exposes runtime configurations to the user Displays two sliders to change the sigma value of each Gaussian Filter """ sigma0 = FloatSlider(min=0.1, max=10, value=0.5, description='\u03C3\u2080') sigma1 = FloatSlider(min=0.1, max=10, value=7, description='\u03C3\u2081') interact(self._play, sigma0=sigma0, sigma1=sigma1)
[docs]class CornerDetect(PipelineApp): """ This class wraps the functionality to implement the corner detection video pipeline application """ _dfx_ip = [ 'pr_0/fast_accel', 'pr_1/cornerHarris_accel', 'pr_fork/duplicate_accel', 'pr_join/add_accel' ] _algorithm = 'Fast' def _pipeline(self): """ Logic to configure pipeline Download partial bitstreams and configure pipeline """ self._fast = self._cpipe.pr_0.fast_accel self._harr = self._cpipe.pr_1.cornerHarris_accel self._add = self._cpipe.pr_join.add_accel self._dup = self._cpipe.pr_fork.duplicate_accel self._app_pipeline = [self._vii, self._dup, [[self._r2g, self._fast, self._g2r], [1]], self._add, self._vio] def _swap(self): if self._algorithm == 'Fast': self._cpipe.replace((self._fast, self._harr)) self._algorithm = 'Harris' self._thr.max = 1024 self._thr.value = 422 else: self._cpipe.replace((self._harr, self._fast)) self._algorithm = 'Fast' self._thr.max = 255 self._thr.value = 20 def _play(self, algorithm, threshold, k_harris): """ Logic for runtime updates""" if algorithm != self._algorithm: self._swap() elif algorithm == 'Fast': self._fast.threshold = threshold self._k_harris.disabled = True else: self._harr.threshold = threshold self._k_harris.disabled = False self._harr.k = k_harris
[docs] def play(self): """ Exposes runtime configurations to the user Displays one drop down menu to select the Corner Detect Algorithm. It also displays two sliders to change the the threshold and K value for the algorithms. """ self._thr = IntSlider(min=0, max=255, step=1, value=20) self._k_harris = FloatSlider(min=0, max=0.2, step=0.002, value=0.04, description='\u03BA') interact(self._play, algorithm=['Fast', 'Harris'], threshold=self._thr, k_harris=self._k_harris)
[docs]class ColorDetect(PipelineApp): """ This class wraps the functionality to implement the color detect video pipeline application """ _dfx_ip = [ 'pr_0/dilate_accel', 'pr_1/dilate_accel', 'pr_fork/duplicate_accel', 'pr_join/bitwise_and_accel' ] _c_space = 'HSV' _output = 'Color Detect' _noise_reduction = 'Yes' def _pipeline(self): """ Logic to configure pipeline Download partial bitstreams and configure pipeline """ self._er0 = self._cpipe.pr_0.erode_accel self._di0 = self._cpipe.pr_0.dilate_accel self._er1 = self._cpipe.pr_1.erode_accel self._di1 = self._cpipe.pr_1.dilate_accel self._band = self._cpipe.pr_join.bitwise_and_accel self._dup = self._cpipe.pr_fork.duplicate_accel self._app_pipeline = [self._vii, self._dup, [[self._r2h, self._ct, self._er0, self._di0, self._di1, self._er1, self._g2r], [1]], self._band, self._vio] self._app_pipeline1 = [self._vii, self._dup, [[self._ct, self._er0, self._di0, self._di1, self._er1, self._g2r], [1]], self._band, self._vio] self._app_pipelinennr = [self._vii, self._dup, [[self._r2h, self._ct, self._g2r], [1]], self._band, self._vio] self._app_pipeline1nnr = [self._vii, self._dup, [[self._ct, self._g2r], [1]], self._band, self._vio] def _play(self, h0, h1, h2, s0, s1, s2, v0, v1, v2, s, nr): lower_thr = np.empty((3, 3), dtype=np.uint8) upper_thr = np.empty((3, 3), dtype=np.uint8) lower_thr[0] = [h0[0], h1[0], h2[0]] upper_thr[0] = [h0[1], h1[1], h2[1]] lower_thr[1] = [s0[0], s1[0], s2[0]] upper_thr[1] = [s0[1], s1[1], s2[1]] lower_thr[2] = [v0[0], v1[0], v2[0]] upper_thr[2] = [v0[1], v1[1], v2[1]] self._ct.lower_thr = lower_thr self._ct.upper_thr = upper_thr if nr != self._noise_reduction: if nr == 'Yes': if self._c_space == 'RGB': self._cpipe.compose(self._app_pipeline1) else: self._cpipe.compose(self._app_pipeline) else: if self._c_space == 'RGB': self._cpipe.compose(self._app_pipeline1nnr) else: self._cpipe.compose(self._app_pipelinennr) self._noise_reduction = nr if s != self._c_space: if s == 'RGB': self._cpipe.compose(self._app_pipeline1) else: self._cpipe.compose(self._app_pipeline) self._c_space = s self._noise_r.value = 'Yes' _noise_r = Dropdown(options=['Yes', 'No'], value='Yes', description='Noise Reduction') _h0 = IntRangeSlider(value=[22, 38], min=0, max=255, description='h\u2080') _h1 = IntRangeSlider(value=[38, 75], min=0, max=255, description='h\u2081') _h2 = IntRangeSlider(value=[160, 179], min=0, max=255, description='h\u2082') _s0 = IntRangeSlider(value=[150, 255], min=0, max=255, description='s\u2080') _s1 = IntRangeSlider(value=[150, 255], min=0, max=255, description='s\u2081') _s2 = IntRangeSlider(value=[150, 255], min=0, max=255, description='s\u2082') _v0 = IntRangeSlider(value=[60, 255], min=0, max=255, description='v\u2080') _v1 = IntRangeSlider(value=[60, 255], min=0, max=255, description='v\u2081') _v2 = IntRangeSlider(value=[60, 255], min=0, max=255, description='v\u2082') _sliders = [_h0, _h1, _h2, _s0, _s1, _s2, _v0, _v1, _v2] def _control_sliders(self, disabled=True): for s in range(len(self._sliders)): self._sliders[s].disabled = disabled
[docs] def play(self): """Exposes runtime configurations to the user Displays nine slides to change the thresholding range for three colors on the three channels """ c_space = Dropdown(options=['HSV', 'RGB'], value='HSV', description='Color Space') left_box = VBox([self._h0, self._h1, self._h2, c_space]) middle_box = VBox([self._s0, self._s1, self._s2]) right_box = VBox([self._v0, self._v1, self._v2, self._noise_r]) out = \ interactive_output(self._play, {'h0': self._h0, 'h1': self._h1, 'h2': self._h2, 's0': self._s0, 's1': self._s1, 's2': self._s2, 'v0': self._v0, 'v1': self._v1, 'v2': self._v2, 's': c_space, 'nr': self._noise_r}) ui = HBox([left_box, middle_box, right_box]) display(ui, out)
[docs]class InterruptTimer(object): """ Threaded interrupt This class encapsulates a threaded interrupt that gets a function and executes it when the timer times out """ def __init__(self, interval, function, *args, **kwargs): self._timer = None self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.is_running = False def _run(self): self.is_running = False self.start() self.function(*self.args, **self.kwargs)
[docs] def start(self): """ Start background task""" if not self.is_running: self._timer = Timer(self.interval, self._run) self._timer.start() self.is_running = True
[docs] def stop(self): """ Stop background task""" if self.is_running: self._timer.cancel() self.is_running = False
[docs]class Filter2DApp(PipelineApp): """ This class wraps the functionality to implement the Filter2D IP in and expose kernel configurability through the buttons on the board """ def __init__(self, bitfile_name, source: VSource = VSource.HDMI): """Return a Filter2DApp object Parameters ---------- bitfile_name : str Bitstream filename source : str (optional) Input video source. Valid values ['HDMI', 'MIPI'] """ super().__init__(bitfile_name=bitfile_name, source=source) self._fi2d0.kernel_type = xvF2d.identity self._buttons = self._ol.btns_gpio.channel1 self._leds = self._ol.leds_gpio.channel1 self._timer = InterruptTimer(0.3, self._play) self._app_pipeline = [self._vii, self._fi2d0, self._vio] self._index = None
[docs] def stop(self): """Stops the pipeline""" super().stop() self._timer.stop()
def _play(self): buttons = int( index = buttons % len(xvF2d) if self._index != index: self._fi2d0.kernel_type = xvF2d(index) self._leds[0:4].write(index) self._index = index
[docs] def play(self): """ Exposes runtime configurations to the user Enables user iteration by changing the on board buttons """ self._timer.start() return "Use the buttons on the board to change the Filter2D "\ "kernel type"
[docs]class LutApp(PipelineApp): """ This class wraps the functionality to implement the LUT IP and expose kernel configurability through the switches on the board """ def __init__(self, bitfile_name, source: VSource = VSource.HDMI): """Return a LutApp object Parameters ---------- bitfile_name : str Bitstream filename source : str (optional) Input video source. Valid values ['HDMI', 'MIPI'] """ super().__init__(bitfile_name=bitfile_name, source=source) self._lut.kernel_type = xvLut.negative self._switches = self._ol.switches_gpio.channel1 self._leds = self._ol.leds_gpio.channel1 self._timer = InterruptTimer(0.3, self._play) self._app_pipeline = [self._vii, self._lut, self._vio] self._index = None
[docs] def stop(self): """Stops the pipeline""" super().stop() self._timer.stop()
def _play(self): switches = int( index = switches % len(xvLut) if self._index != index: self._lut.kernel_type = xvLut(index) self._leds[0:4].write(index) self._index = index
[docs] def play(self): """ Exposes runtime configurations to the user Enables user iteration by changing the on board switches """ self._timer.start() return "Use the switches on the board to change the LUT kernel type"