Source code for pynq_composable.composable

# Copyright (C) 2021 Xilinx, Inc
#
# SPDX-License-Identifier: BSD-3-Clause

from graphviz import Digraph
import json
import numpy as np
import os
from .parser import HWHComposable
from pynq import DefaultIP, DefaultHierarchy
from pynq.utils import ReprDict
from .repr_dict import ReprDictComposable
from typing import Type, Union

__author__ = "Mario Ruiz"
__copyright__ = "Copyright 2021, Xilinx"
__email__ = "pynq_support@xilinx.com"


_mem_items = ['axis_register_slice', 'axis_data_fifo',
              'fifo_generator', 'axis_dwidth_converter',
              'axis_subset_converter']


def _nest_level(pl: list) -> int:
    """Compute nested levels of a list iteratively"""

    if not isinstance(pl, list):
        return 0
    level = 0
    for item in pl:
        level = max(level, _nest_level(item))

    return level + 1


def _count_slots(pl: str) -> int:
    """Returns the number of elements in a list"""

    total = 0
    for l0 in pl:
        if isinstance(l0, list):
            for l1 in l0:
                if isinstance(l1, list):
                    for l2 in l1:
                        total += 1
                else:
                    total += 1
        else:
            total += 1

    return total


def _find_index_in_list(pipeline: list, element: Type[DefaultIP]) \
        -> Union[int, tuple]:
    """Return the index of the element in the pipeline"""

    for i, v in enumerate(pipeline):
        if isinstance(v, list):
            for ii, vv in enumerate(v):
                for iii, vvv in enumerate(vv):
                    if vvv == element:
                        return i, ii, iii
        elif v == element:
            return i

    return None


def _edge_label(ci: int, pi: int, debug: bool) -> str:
    """Generate edge label given"""

    return '<<font color=\"' + ('green' if debug else 'white') + '\">' + \
           'ci=' + str(ci) + ' pi=' + str(pi) + '</font>>'


def _get_ip_name_by_vlnv(description: str, vlnv: str) -> str:
    """Search IP by its VLNV and return its name"""

    for k, v in description['ip'].items():
        ip_vlnv = v.get('type')
        if ip_vlnv == vlnv:
            return k
    return None


[docs]class Composable(DefaultHierarchy): """This class keeps track of a composable overlay The Composable class holds the state of the logic available for run-time composition through and AXI4-Stream switch Our definition of composable overlay is: "post-bitstream configurable dataflow pipeline". Hence, this class must expose configurability through content discovery, dataflow configuration and runtime protection. This class stores two dictionaries: c_dict and dfx_dict Each entry of the Composable dictionary (c_dict) is a mapping: 'name' -> {ci, pi, modtype, dfx, loaded, bitstream}, where name (str) is the key of the entry. ci (list) list of physical port in the switch the IP is connected to pi (list) list of physical port in the switch the IP is connected from modtype(str) IP type dfx (bool) IP is located in a DFX region loaded (bool) IP is loaded bitstream (str) location of the corresponding partial bitstream Each entry of the PR dictionary (pr_dict) dictionary is a mapping: 'name' -> {decoupler, gpio, ip}, where name (str) is the name of the partial region decoupler (str) fullpath of the DFX decoupler that controls the pr region gpio (dict) index of PS GPIO that control the DFX decoupler ip (dict) dictionary of partial bitstream and IP associated to the region Attributes ---------- graph : Digraph Graphviz Digraph representation of the current dataflow pipeline """
[docs] @staticmethod def checkhierarchy(description): if _get_ip_name_by_vlnv(description, 'xilinx.com:ip:axis_switch:1.1'): return True return False
def __init__(self, description): """Return a new Composable object. Performs a hardware discovery where the different IP cores connected to the switch are added to the c_dict and the DFX regions are added to the dfx_dict Parameters ---------- description : dict Description of the hierarchy Note ---- This class requires that partial bitstreams and corresponding HWH files to be next to bitstream file that was used to create the Overlay object The name convention for partial bitstreams and HWH file is <bitstream_name>_<hierarchy>_<pr_region>_<pr_module_name>.{bit|hwh} For instance if the main bitstream is `base.bit` and there are two DFX regions with the names `pr_0` and `pr_1` each of them with the same two reconfigurable module, `function_1` and `function_2` the names should be as follow base_composable_pr_0_function_1.bit base_composable_pr_0_function_1.hwh base_composable_pr_0_function_2.bit base_composable_pr_0_function_2.hwh base_composable_pr_1_function_1.bit base_composable_pr_1_function_1.hwh base_composable_pr_1_function_2.bit base_composable_pr_1_function_2.hwh """ super().__init__(description) self._hier = description['fullpath'] + '/' self._dfx_dict = None self._bitfile = description['device'].bitfile_name self._hwh_name = os.path.splitext(self._bitfile)[0] + '.hwh' self._ol = description['overlay'] switch_name = \ _get_ip_name_by_vlnv(description, 'xilinx.com:ip:axis_switch:1.1') self._switch = getattr(self._ol, self._hier + switch_name) self._max_slots = self._switch.max_slots pipelinecrt = \ _get_ip_name_by_vlnv(description, 'xilinx.com:ip:axi_gpio:2.0') if pipelinecrt: self._pipecrtl = getattr(self._ol, self._hier + pipelinecrt) self._soft_reset = self._pipecrtl.channel1 self._dfx_control = self._pipecrtl.channel2 else: self._soft_reset = None self._dfx_control = None parser = HWHComposable(self._hwh_name, self.axis_switch._fullpath) self._c_dict = parser.c_dict self._dfx_dict = parser.dfx_dict self._paths = dict() self._default_paths() self._switch.pi = self._sw_default self.graph = Digraph() self.graph.graph_attr['rankdir'] = 'LR' self._graph_debug = False self._current_pipeline = None self._current_flat_pipeline = None @property def dfx_dict(self) -> dict: """Returns the dfx_dict dictionary All the DFX regions in the hierarchy. Key is the name of the dfx region value is a dictionary with DFX decoupler, PS GPIO that controls the DFX decoupler and partial bitstreams associated to the region {str: {'decoupler' : str, 'gpio' : {'decouple' : int, 'status' : int}, ip': dict}}. """ return ReprDict(self._dfx_dict, rootname='dfx_dict') @property def c_dict(self) -> dict: """Returns the c_dict dictionary All the IP cores connected to the AXI4-Stream Switch. Key is the name of the IP; value is a dictionary mapping the producer and consumer to the switch port, whether the IP is in a dfx region and loaded {str: {'ci' : list, 'pi' : list, 'modtype': str, 'dfx': bool, 'loaded': bool, 'bitstream: str'}}. """ return ReprDictComposable(self._c_dict, rootname=self._hier.replace('/', '')) @property def current_pipeline(self) -> list: """List of IP objects in the current dataflow pipeline""" return self._current_pipeline def _default_paths(self): """Get default paths from user file Generate default AXI4-Stream Switch default configuration based on the user provided dictionary as well as _paths dictionary """ self._default_ip = dict() self._sw_default = np.ones(self._max_slots, dtype=np.int64) * -1 filename = os.path.splitext(self._hwh_name)[0] + '_paths.json' if not os.path.isfile(filename): return with open(filename, "r") as file: jsondict = json.load(file) sw_default = jsondict[self._hier.replace('/', '')] for k, v in sw_default.items(): self._sw_default[v['pi']['port']] = v['ci']['port'] paths = dict() for k, v in sw_default.items(): for kk, vv in v.items(): key = k + ('_in' if kk == 'ci' else '_out') paths[key] = { kk: vv['port'], 'Description': vv['Description'], 'fullpath': None } c_dict = self._c_dict.copy() for k, v in paths.items(): for kk, vv in self._c_dict.items(): ci = v.get('ci') pi = v.get('pi') cii = vv.get('ci') pii = vv.get('pi') if (ci is not None and cii is not None and ci in cii) or \ (pi is not None and pii is not None and pi in pii): if kk in c_dict.keys(): c_dict.pop(kk) v['fullpath'] = kk c_dict[k] = vv.copy() c_dict[k]['default'] = True c_dict[k]['fullpath'] = kk if kk not in self._default_ip.keys(): self._default_ip[kk] = c_dict[k].copy() if 'cpath' not in self._default_ip[kk].keys(): self._default_ip[kk]['cpath'] = dict() key, delkey = ('ci', 'pi') if pi is None else ('pi', 'ci') if c_dict[k].get(delkey): c_dict[k].pop(delkey) self._default_ip[kk]['cpath'][key] = k break self._c_dict = c_dict self._paths = paths def _pr_download(self, partial_region: str, partial_bit: str) -> None: """The method to download a partial bitstream onto PL. The composable dictionary is updated with the new hardware loaded onto the reconfigurable region In this method, the corresponding parser will only be added once the `download()` method of the hierarchical block is called. Note ---- There is no check on whether the partial region specified by users is really partial-reconfigurable. So users have to make sure the `partial_region` provided is correct. Parameters ---------- partial_region : str The name of the hierarchical block corresponding to the PR region. partial_bit : str The name of the partial bitstream. """ self._ol.pr_download(self._hier + partial_region, partial_bit) self._unload_region_from_ip_dict(partial_region) dfx_dict = \ self._dfx_dict[partial_region]['rm'][os.path.basename(partial_bit)] self._set_loaded(dfx_dict) def _unload_region_from_ip_dict(self, partial_region: str) -> None: """Unset loaded attribute for all of the IP of provided region""" for k in self._c_dict.keys(): if partial_region in k: self._c_dict[k]['loaded'] = False def _set_loaded(self, dfx_dict: dict) -> None: """Set loaded attribute in the c_dict for IP on dfx_dict""" for ip in dfx_dict: self._c_dict[ip]['loaded'] = True def _relative_path(self, fullpath: str, port: str = 'ci') -> str: """Return relative path of an IP within the hierarchy If the IP is in the default paths, return proper name """ fullpath = fullpath.replace(self._hier, '') if fullpath not in self._default_ip.keys(): return fullpath for k, v in self._default_ip.items(): if v['fullpath'] == fullpath: return v['cpath'][port]
[docs] def compose(self, cle_list: list) -> None: """Configure design to implement required dataflow pipeline Parameters ---------- cle_list : list list of the composable IP objects Examples: [a, b, c, d] yields .. code-block:: none -> a -> b -> c -> d -> [a, b, [[c,d],[e]], f, g] yields .. code-block:: none -> c -> d - / \\ -> a -> b f -> g -> \\ / ---> e ---- """ if not isinstance(cle_list, list): raise TypeError("The composable pipeline must be a list") levels = _nest_level(cle_list) if levels > 3: raise SystemError("Data flow pipeline with a nest levels bigger " "than 3 is not supported. {}".format(levels)) elif levels % 2 == 0: raise SystemError("Data flow pipeline with an even nest" " levels is not supported. {}".format(levels)) slots = _count_slots(cle_list) if slots > self._max_slots: raise SystemError("Number of slots in the list is bigger than {} " "which are the max slots that hardware allows" .format(self._max_slots)) switch_conf = np.ones(self._max_slots, dtype=np.int64) * -1 flat_list = list() graph = Digraph( node_attr={'shape': 'box'}, edge_attr={'color': 'green'}, graph_attr={'rankdir': self.graph.graph_attr['rankdir']} ) gdebug = self._graph_debug for i, l0 in enumerate(cle_list): if isinstance(l0, list): key = self._relative_path(cle_list[i+1]._fullpath) next_node = self._c_dict[key] if len(l0) != len(next_node['pi']): raise SystemError("Node {} has {} input(s) and cannot meet" "pipeline requirement of {} input(s)" .format(key, len(next_node['pi']), len(l0))) for ii, l1 in enumerate(l0): if not isinstance(l1, list): raise SystemError("Branches must be represented as " "list of list") for iii, l2 in enumerate(l1): ip = cle_list[i][ii][iii] if ip == 1: continue flat_list.append(ip) ci = self._c_dict[ self._relative_path(ip._fullpath)]['ci'][0] if iii == len(l1) - 1: consumer = key pi = self._c_dict[consumer]['pi'][ii] else: consumer = self._relative_path( cle_list[i][ii][iii+1]._fullpath) pi = self._c_dict[consumer]['pi'][0] if not np.where(switch_conf == ci)[0].size: switch_conf[pi] = ci graph.edge(self._relative_path(ip._fullpath), consumer, label=_edge_label(ci, pi, gdebug)) else: raise SystemError("IP: {} is already being used in" " the provided pipeline. An IP " "instance can only be used once" .format(ip._fullpath)) else: ip = cle_list[i] flat_list.append(ip) if i == len(cle_list) - 1: break ci = self._c_dict[(path := self._relative_path(ip._fullpath, 'ci'))]['ci'] if not isinstance(cle_list[i+1], list): pi = self._c_dict[(key := self._relative_path( cle_list[i+1]._fullpath, 'pi'))]['pi'] if not np.where(switch_conf == ci[0])[0].size: switch_conf[pi[0]] = ci[0] graph.edge(path, key, label=_edge_label(ci[0], pi[0], gdebug)) else: raise SystemError("IP: {} is already being used in the" " provided pipeline. An IP instance " "can only be used once" .format(ip._fullpath)) elif len(cle_list[i+1]) != len(ci): raise SystemError("Node {} has {} output(s) and cannot " "meet pipeline requirement of {} " "output(s)".format(l0._fullpath, len(ci), len(cle_list[i+1]))) else: for j in range(len(ci)): nextip = cle_list[i+1][j][0] if nextip != 1: nextkey = self._relative_path(nextip._fullpath) pi = self._c_dict[nextkey]['pi'][0] else: nextip = cle_list[i+2] nextkey = self._relative_path(nextip._fullpath) pi = self._c_dict[nextkey]['pi'][j] if not np.where(switch_conf == ci[j])[0].size: switch_conf[pi] = ci[j] graph.edge(self._relative_path(ip._fullpath), self._relative_path(nextip._fullpath), label=_edge_label(ci[j], pi, gdebug)) else: raise SystemError("IP: {} is already being used " "in the provided pipeline. An IP" " instance can only be used once" .format(ip._fullpath)) if self._soft_reset: self._soft_reset[0].write(1) self._soft_reset[0].write(0) self._configure_switch(switch_conf) for idx, ip in enumerate(flat_list): port = 'pi' if idx == len(flat_list)-1 else 'ci' key = self._relative_path(ip._fullpath, port) if self._c_dict[key]["dfx"]: graph.node(key, _attributes={"color": "blue", "fillcolor": "cyan", "style": "filled"}) if not isinstance(ip, UnloadedIP): if hasattr(ip, "start"): ip.start() else: raise AttributeError("IP {} is not loaded, load IP before " "composing a pipeline" .format(ip._fullpath)) self._current_pipeline = cle_list self._current_flat_pipeline = flat_list self.graph = graph
[docs] def loadIP(self, dfx_list: list) -> None: """Download dfx IP onto the corresponding partial regions Parameters ---------- dfx_list: list List of IP to be downloaded onto the dfx regions. The list can contain either a string with the fullname or the IP object Examples: [cpipe.pr_0.fast_accel, cpipe.pr_1.dilate_accel] ['pr_0/fast_accel', 'pr_1/dilate_accel'] """ bit_dict = dict() for ip in dfx_list: if isinstance(ip, str): fullpath = ip else: fullpath = self._relative_path(ip._fullpath) bitname = os.path.basename(self._c_dict[fullpath]['bitstream']) for pr in self._dfx_dict: if pr in bitname: if pr not in bit_dict.keys(): bit_dict[pr] = dict() bit_dict[pr]['bitstream'] = bitname bit_dict[pr]['loaded'] = \ self._c_dict[fullpath]['loaded'] elif bit_dict[pr]['bitstream'] != bitname: raise SystemError("\'{}\' and \'{}\' bitstreams cannot" " be loaded into the same DFX " " region \'{}\' at the same time" .format(bit_dict[pr]['bitstream'], bitname, pr)) path = os.path.dirname(self._bitfile) + '/' for pr in bit_dict: if not bit_dict[pr]['loaded']: decoupler = self._dfx_control[self._dfx_dict[pr]['decouple']] if self._dfx_control and decoupler: decoupler.write(1) for i in range(5): try: self._pr_download(pr, path + bit_dict[pr]['bitstream']) break except TimeoutError: if i != 4: continue raise TimeoutError("{} partial bitstream could not be " "downloaded" .format(bit_dict[pr]['bitstream'])) if self._dfx_control and decoupler: decoupler.write(0)
[docs] def remove(self, iplist: list = None) -> None: """Remove IP object from the current pipeline Parameters ---------- iplist: list List of IP to be removed from the current pipeline Examples: [cpipe.pr_0.erode] [cpipe.pr_1.filter2d, cpipe.pr_fork.duplicate] """ if self._current_pipeline is None: raise SystemError("A pipeline has not been composed yet") pipeline = self._current_pipeline if iplist is None: raise ValueError("remove requires a list of IP object") for ip in iplist: if ip not in pipeline: raise ValueError("IP object {} does not exit in current " "pipeline {}".format(ip, iplist)) pipeline.remove(ip) self.compose(pipeline)
[docs] def insert(self, iptuple: tuple) -> None: """Insert a new IP or list of IP into current pipeline Parameters ---------- iptuple: tuple Tuple of two items. First: list of IP to be inserted Second: index Examples: ([cpipe.pr_0.erode], 3) ([cpipe.pr_1.filter2d, cpipe.pr_fork.duplicate], 2) """ if not isinstance(iptuple, tuple): raise ValueError("insert expects a tuple as an argument") elif len(iptuple) != 2: raise ValueError("insert expects a tuple with two elements") elif not isinstance(iptuple[1], int): raise ValueError("insert expects an integer as index") elif iptuple[1] > len(self._current_pipeline): raise ValueError("index cannot be bigger than current pipeline" "length") if not isinstance(iptuple[0], list): newlist = [iptuple[0]] else: newlist = iptuple[0] pipeline = self._current_pipeline[:iptuple[1]] + newlist + \ self._current_pipeline[iptuple[1]:] self.compose(pipeline)
[docs] def replace(self, replaceip: tuple) -> None: """Replace an IP object in the current pipeline Parameters ---------- replaceip: tuple Tuple of two items. First: IP object to be replaced Second: new IP object Examples: (cpipe.pr_0.erode, cpipe.pr_1.dilate) """ if not isinstance(replaceip, tuple): raise ValueError("replace expects a tuple as an argument") elif len(replaceip) != 2: raise ValueError("replace expects a tuple with two IP objects") pipeline = self._current_pipeline idx = _find_index_in_list(pipeline, replaceip[0]) if isinstance(idx, int): pipeline[idx] = replaceip[1] elif isinstance(idx, tuple): pipeline[idx[0]][idx[1]][idx[2]] = replaceip[1] else: raise ValueError("IP {} is not in the current pipeline" .format(replaceip[0]._fullpath)) self.compose(pipeline)
[docs] def tap(self, ip: Union[Type[DefaultIP], int] = None) -> None: """Observe the output of an IP object in the current pipeline Tap into the output of any of the IP cores in the current pipeline Note that tap is not supported in a branch You can tap by passing the IP name or the index of the IP in the list. Note that tap does not modify the attribute current_pipeline Parameters ---------- ip: Either an IP object in the current pipeline to be tapped or index of IP object in the current pipeline to be tapped Examples: tap(cpipe.pr_1.dilate) tap(6) """ if self._current_pipeline is None: raise SystemError("A pipeline has not been composed yet") if isinstance(ip, int): if ip >= len(self._current_pipeline): raise ValueError("list index ({}) is out of range, supported " "indexes are integers from 0 to {}" .format(ip, len(self._current_pipeline)-1)) else: index = ip else: try: index = self._current_pipeline.index(ip) except ValueError: raise ValueError("{} {} does not exist in the list or IP is in" " a branch " .format(ip, ip._fullpath if hasattr(ip, '_fullpath') else '')) ip = self._current_pipeline[index] new_list = self._current_pipeline[0:index+1] if index < len(self._current_pipeline)-1: key = self._relative_path(ip._fullpath) if len(self._c_dict[key]['ci']) != 1: raise SystemError("tap into an IP with multiple outputs is " "not supported") new_list.append(self._current_pipeline[-1]) pipeline = self._current_pipeline.copy() self.compose(new_list) self._current_pipeline = pipeline.copy()
[docs] def untap(self) -> None: """Restores current pipeline after tap happened""" if self._current_pipeline is None: raise SystemError("There is nothing to untap") self.compose(self._current_pipeline)
def __getattr__(self, name): if self._dfx_dict is None: return super().__getattr__(name) elif name in self._dfx_dict: return DFXRegion(self, name) elif name in self._paths: try: attr = getattr(self._ol, self._paths[name]['fullpath']) except AttributeError: attr = super().__getattr__(self._paths[name]['fullpath']) return attr elif (key := self._hier + name) not in self._ol.ip_dict.keys(): return StreamingIP(key) else: try: attr = super().__getattr__(name) except AttributeError: attr = getattr(self._ol, name) return attr def __dir__(self): return sorted(set(super().__dir__() + list(self.__dict__.keys()) + list(self._c_dict.keys()) + list(self._default_ip.keys()))) def _configure_switch(self, new_sw_config: dict) -> None: """Verify that default values are set and configure the switch""" switch_conf = np.copy(new_sw_config) for idx, val in enumerate(switch_conf): if val < 0 and self._sw_default[idx] > 0 and \ self._sw_default[idx] not in switch_conf: switch_conf[idx] = self._sw_default[idx] self._switch.pi = switch_conf
[docs]class DFXRegion: """Class that wraps attributes for IP objects on DFX regions""" def __init__(self, cpipe: Composable, name: str): self._ol = cpipe._ol self._parent = cpipe._hier self._c_dict = cpipe._c_dict self.key = name def __getattr__(self, name: str): key = self.key + '/' + name if key in self._c_dict.keys(): if not self._c_dict[key]['loaded']: return UnloadedIP(key) elif self._c_dict[key]['modtype'] in _mem_items: return BufferIP(key) else: return getattr(self._ol, self._parent + key) else: raise ValueError("IP \'{}\' does not exist in partial region " "\'{}\'".format(name, self.key))
[docs]class StreamingIP: """Handles Streaming only IP""" def __init__(self, name: str): self._fullpath = name
[docs]class UnloadedIP: """Handles IP objects that are not yet loaded into the hardware This can be consider a virtual IP object """ def __init__(self, path: str): self._fullpath = path
[docs]class BufferIP: """Handles IP objects that are of buffering type Expose fullpath attribute for buffering type IP such as: a) FIFOs b) Slice registers. """ def __init__(self, path: str): self._fullpath = path