Source code for qbraid.transpiler.graph

# Copyright (C) 2024 qBraid
#
# This file is part of the qBraid-SDK
#
# The qBraid-SDK is free software released under the GNU General Public License v3
# or later. You can redistribute and/or modify it under the terms of the GPL v3.
# See the LICENSE file in the project root or <https://www.gnu.org/licenses/gpl-3.0.html>.
#
# THERE IS NO WARRANTY for the qBraid-SDK, as per Section 15 of the GPL v3.

"""
Module providing tools to map, analyze, and visualize conversion paths between different
quantum programs available through the qbraid.transpiler using directed graphs.

"""
from collections import deque
from importlib import import_module
from typing import Any, Callable, Optional, Union

import rustworkx as rx

from qbraid.programs.experiment import ExperimentType
from qbraid.programs.registry import (
    QPROGRAM_ALIASES,
    get_native_experiment_type,
    is_registered_alias_native,
)

from .edge import Conversion
from .exceptions import ConversionPathNotFoundError


def _get_path_from_bound_methods(bound_methods: list[Callable[..., Any]]) -> str:
    """
    Constructs a path string from a list of bound methods of Conversion instances.

    This function takes a list of bound methods (specifically 'convert' methods bound to
    Conversion instances) and constructs a path string representing the sequence of
    conversions. Each conversion is defined by the 'source' and 'target' properties of the
    Conversion instance to which each method is bound.

    Args:
        bound_methods: A list of bound methods from Conversion instances.

    Returns:
        A string representing the path of conversions, formatted as
        'source1 -> source2 -> ... -> targetN'.

    Raises:
        AttributeError: If the bound methods do not have the expected 'source'
                        and 'target' attributes.
        IndexError: If the list of bound methods is empty.
        TypeError: If an item in the bound_methods list is not a bound method.
    """
    if not bound_methods:
        raise IndexError("The list of bound methods is empty.")

    total_methods = len(bound_methods)

    path = []
    for index, method in enumerate(bound_methods):
        instance: Conversion = method.__self__  # Get the instance to which the method is bound
        if not hasattr(instance, "source") or not hasattr(instance, "target"):
            raise AttributeError("Bound method instance lacks 'source' or 'target' attributes.")
        path.append(instance.source)  # Add the source node of the instance

        # Add the target of the last method's instance to complete the path
        if index == total_methods - 1:
            path.append(instance.target)

    return " -> ".join(path)


# pylint: disable-next=too-many-public-methods
[docs] class ConversionGraph(rx.PyDiGraph): """ Class for coordinating conversions between different quantum software programs """ def __new__(cls, *args, **kwargs): # pylint: disable=unused-argument return super().__new__(cls) # pylint: disable=no-value-for-parameter # pylint: disable-next=too-many-arguments
[docs] def __init__( self, conversions: Optional[list[Conversion]] = None, require_native: bool = False, include_isolated: bool = False, edge_bias: Optional[float] = None, nodes: Optional[Union[list[str], set[str]]] = None, ): """ Initialize a ConversionGraph instance. Args: conversions (list[Conversion], optional): List of conversion edges. If None, default conversion edges are used. require_native (bool): If True, only include "native" conversion functions. Defaults to False. include_isolated (bool): If True, includes all registered program type aliases, even those that are not connected to any other nodes in the graph. Defaults to False. edge_bias (float, optional): Factor used to fine-tune the edge weight calculations and modify the decision thresholds for pathfinding. Defaults to 0.25 to prioritize shorter paths. For example, a bias of 0.25 slightly favors a single conversion at weight 0.78 over two conversions at weight 1.0. Only used if conversions is None. nodes (list[str], optional): List of nodes to include in the graph. If nodes is None and conversions is None, all nodes connected by registered conversions are included. If nodes is None and conversions is specified, only nodes connected by the specified conversions are included. Isolated nodes included iff include_isolated is True. """ super().__init__() self.require_native = require_native self.edge_bias = edge_bias if edge_bias is not None else 0.25 self._conversions = conversions or self.load_default_conversions(bias=self.edge_bias) self._node_alias_id_map: dict[str, int] = {} self._include_isolated = include_isolated self._init_nodes = set(nodes) if nodes is not None else set() self.create_conversion_graph()
@staticmethod def load_default_conversions(bias: Optional[float] = None) -> list[Conversion]: """ Create a list of default conversion nodes using predefined conversion functions. Returns: list[Conversion]: List of default conversion edges. """ transpiler = import_module("qbraid.transpiler.conversions") conversion_functions: list[str] = getattr(transpiler, "conversion_functions", []) def construct_conversion(name: str) -> Conversion: source, target = name.split("_to_") conversion_func = getattr(transpiler, name) return Conversion(source, target, conversion_func, bias=bias) return [construct_conversion(conversion) for conversion in conversion_functions] def create_conversion_graph(self) -> None: """Create a directed graph from a list of conversion functions.""" nodes = self._init_nodes or set() for edge in ( e for e in self._conversions if e.supported and (not self.require_native or e.native) ): if nodes and (edge.source not in nodes or edge.target not in nodes): continue if edge.source not in self._node_alias_id_map: self._node_alias_id_map[edge.source] = self.add_node(edge.source) if edge.target not in self._node_alias_id_map: self._node_alias_id_map[edge.target] = self.add_node(edge.target) self.add_edge( self._node_alias_id_map[edge.source], self._node_alias_id_map[edge.target], {"native": edge.native, "func": edge.convert, "weight": edge.weight}, ) if self._include_isolated: nodes = nodes or QPROGRAM_ALIASES for alias in nodes: if alias not in self._node_alias_id_map and ( not self.require_native or is_registered_alias_native(alias) ): self._node_alias_id_map[alias] = self.add_node(alias) def has_node(self, node: str) -> bool: """ Check if a node exists in the graph. Args: node (str): The node to check. Returns: bool: True if the node exists, False otherwise. """ return node in set(self.nodes()) def has_edge(self, node_a: str, node_b: str) -> bool: """ Check if an edge exists between two nodes in the graph. Args: node_a (str): The source node. node_b (str): The target node. Returns: bool: True if the edge exists, False otherwise. """ if not self.has_node(node_a) or not self.has_node(node_b): return False return super().has_edge(self._node_alias_id_map[node_a], self._node_alias_id_map[node_b]) def conversions(self) -> list[Conversion]: """ Get the list of conversion edges. Returns: list[Conversion]: The conversion edges of the graph. """ return self._conversions def add_conversion(self, edge: Conversion, overwrite: bool = False) -> None: """ Add a new conversion function as an edge in the graph. Args: edge (qbraid.interface.Conversion): The conversion edge to add to the graph. overwrite (optional, bool): If True, overwrites an existing conversion. Defaults to False. Raises: ValueError: If the conversion already exists and overwrite_existing is False. """ source, target = edge.source, edge.target if self.has_edge(source, target) and not overwrite: raise ValueError( f"Conversion from {source} to {target} already exists. " "Set overwrite=True to overwrite." ) for old_edge in self._conversions: if old_edge.source == source and old_edge.target == target: self._conversions.remove(old_edge) self.remove_edge( self._node_alias_id_map[source], self._node_alias_id_map[target], ) break self._conversions.append(edge) if source not in self._node_alias_id_map: self._node_alias_id_map[source] = self.add_node(source) if target not in self._node_alias_id_map: self._node_alias_id_map[target] = self.add_node(target) self.add_edge( self._node_alias_id_map[source], self._node_alias_id_map[target], {"native": edge.native, "func": edge.convert, "weight": edge.weight}, ) def remove_conversion(self, source: str, target: str) -> None: """Safely remove a conversion from the graph.""" if self.has_edge(source, target): self.remove_edge(self._node_alias_id_map[source], self._node_alias_id_map[target]) else: raise ValueError(f"Conversion from {source} to {target} does not exist.") self._conversions = [ conv for conv in self._conversions.copy() if not (conv.source == source and conv.target == target) ] def find_shortest_conversion_path(self, source: str, target: str) -> list[Callable]: """ Find the shortest conversion path between two nodes in a graph. Args: source (str): The starting node for the path. target (str): The target node for the path. Returns: list of Callable: The shortest conversion path as a list of bound methods of Conversion instances. Raises: ValueError: If no path is found between source and target. """ path = rx.dijkstra_shortest_paths( self, self._node_alias_id_map[source], target=self._node_alias_id_map[target], weight_fn=lambda edge: edge["weight"], ) if len(path) == 0: raise ConversionPathNotFoundError(source, target) return [ self.get_edge_data( path[self._node_alias_id_map[target]][i], path[self._node_alias_id_map[target]][i + 1], )["func"] for i in range(len(path[self._node_alias_id_map[target]]) - 1) ] def find_top_shortest_conversion_paths( self, source: str, target: str, top_n: int = 3 ) -> list[list[Callable]]: """ Find the top shortest conversion paths between two nodes in a graph. Args: source (str): The starting node for the path. target (str): The target node for the path. top_n (int): Number of top shortest paths to find. Returns: list of list of Callable: The top shortest conversion paths. Raises: ValueError: If no path is found between source and target. """ all_paths = rx.all_simple_paths( self, self._node_alias_id_map[source], self._node_alias_id_map[target] ) # rx.all_simple_paths returns an empty list if no path is found if len(all_paths) == 0: raise ConversionPathNotFoundError(source, target) sorted_paths = sorted(all_paths, key=len)[:top_n] return [ [self.get_edge_data(path[i], path[i + 1])["func"] for i in range(len(path) - 1)] for path in sorted_paths ] def has_path(self, source: str, target: str) -> bool: """ Check if a conversion between two languages is supported. Args: source (str): The source language. target (str): The target language. Returns: bool: True if the conversion is supported, False otherwise. """ if source == target: return True source_node = self._node_alias_id_map.get(source) target_node = self._node_alias_id_map.get(target) if source_node is None or target_node is None: return False return rx.has_path(self, source_node, target_node) def shortest_path(self, source: str, target: str) -> str: """ Return string representation of the shortest conversion path between two nodes. Args: source (str): The starting node for the path. target (str): The target node for the path. Returns: str: String representation of shortest conversion path. Raises: ConversionPathNotFoundError: If no path is found between source and target. """ path = self.find_shortest_conversion_path(source, target) return _get_path_from_bound_methods(path) def all_paths(self, source: str, target: str) -> list[str]: """ Return string representations of all conversion paths between two nodes. Args: source (str): The starting node for the path. target (str): The target node for the path. Returns: list[str]: String representations of all conversion paths. Raises: ConversionPathNotFoundError: If no path is found between source and target. """ num_conversions = len(self.conversions()) paths = self.find_top_shortest_conversion_paths(source, target, top_n=num_conversions) return [_get_path_from_bound_methods(path) for path in paths] @staticmethod def _calculate_depth_and_weight( path: str, conv_weights: dict[tuple[str, str], int] ) -> tuple[int, int]: """Calculate the depth and weight of a given path.""" conv_nodes = path.split(" -> ") conv_pairs = list(zip(conv_nodes, conv_nodes[1:])) depth = len(conv_pairs) weight = sum(conv_weights.get(pair, 0) for pair in conv_pairs) return depth, weight def _find_closest( self, pivot: str, candidates: list[str], has_path_func: Callable[[str, str], Optional[str]], shortest_path_func: Callable[[str, str], str], ) -> Optional[str]: """Find the closest item in candidates to/from pivot based on conversion paths.""" closest = None min_depth = float("inf") min_weight = float("inf") conv_weights = {(conv.source, conv.target): conv.weight for conv in self.conversions()} for candidate in candidates: if has_path_func(pivot, candidate): path = shortest_path_func(pivot, candidate) depth, weight = self._calculate_depth_and_weight(path, conv_weights) if (depth < min_depth) or (depth == min_depth and weight < min_weight): min_depth = depth min_weight = weight closest = candidate return closest def closest_target(self, source: str, targets: list[str]) -> Optional[str]: """ Determine the closest target from a list of possible targets based on the shortest conversion path and weights. In case of equal path depths, the target with the higher total weight of its path edges is chosen. Args: source (str): The alias from which conversion paths are evaluated. targets (list[str]): A list of target aliases to which the conversion paths are evaluated. Returns: Optional[str]: The name of the target that requires the fewest conversions from the source or has higher weights in tie cases. Returns `None` if no conversion paths are available. """ if source in targets: return source return self._find_closest(source, targets, self.has_path, self.shortest_path) def closest_source(self, target: str, sources: list[str]) -> Optional[str]: """ Determine the closest source from a list of possible sources based on the shortest conversion path and weights. In case of equal path depths, the source with the higher total weight of its path edges is chosen. Args: target (str): The alias to which conversion paths are evaluated. sources (list[str]): A list of source aliases from which the conversion paths are evaluated. Returns: Optional[str]: The name of the source that requires the fewest conversions to the target or has higher weights in tie cases. Returns `None` if no conversion paths are available. """ if target in sources: return target return self._find_closest( target, sources, lambda tgt, src: self.has_path(src, tgt), lambda tgt, src: self.shortest_path(src, tgt), ) @staticmethod def _get_sorted_closest( pivot: str, items: list[str], closest_func: Callable[[str, list[str]], Optional[str]] ) -> list[str]: """Sort a list of items from closest to least close based on conversion paths.""" sorted_items = [] remaining = items.copy() while remaining: closest = closest_func(pivot, remaining) if closest is None: sorted_items.extend(remaining) break sorted_items.append(closest) remaining.remove(closest) return sorted_items def get_sorted_closest_targets(self, source: str, targets: list[str]) -> list[str]: """ Sorts a list of targets from closest to least close based on conversion paths from the source. Targets without valid conversion paths are appended at the end. Args: source (str): The alias from which conversion paths are evaluated. targets (list[str]): A list of target aliases to be ordered by proximity. Returns: list[str]: A list of target aliases ordered by proximity to the source, with unreachable targets appended at the end. """ return self._get_sorted_closest(source, targets, self.closest_target) def get_sorted_closest_sources(self, target: str, sources: list[str]) -> list[str]: """ Sorts a list of sources from closest to least close based on conversion paths from the target. Sources without valid conversion paths are appended at the end. Args: target (str): The alias to which conversion paths are evaluated. sources (list[str]): A list of source aliases to be ordered by proximity. Returns: list[str]: A list of source aliases ordered by proximity to the target, with unreachable sources appended at the end. """ return self._get_sorted_closest(target, sources, self.closest_source) def get_node_experiment_types(self) -> dict[str, ExperimentType]: """ Get the experiment type of each node in the graph. Returns: dict[str, ExperimentType]: A dictionary mapping each node to its experiment type. """ node_to_experiment_type = {} unassigned_nodes = deque() for node in self.nodes(): if is_registered_alias_native(node): node_to_experiment_type[node] = get_native_experiment_type(node) else: node_to_experiment_type[node] = None unassigned_nodes.append(node) while unassigned_nodes: node = unassigned_nodes.popleft() assigned = False for recorded_node, recorded_exp_type in node_to_experiment_type.items(): if recorded_exp_type is None or recorded_node is None: continue if self.has_path(recorded_node, node) or self.has_path(node, recorded_node): if node_to_experiment_type[node] is None: node_to_experiment_type[node] = recorded_exp_type assigned = True elif node_to_experiment_type[node] != recorded_exp_type: raise ValueError( f"ExperimentType conflict detected: Node '{node}' " f"(type '{node_to_experiment_type[node].name}') " f"is reachable from node '{recorded_node}' " f"(type '{recorded_exp_type.name}'), which has a different experiment " "type. Ensure that connected nodes share compatible experiment types." ) if not assigned: node_to_experiment_type[node] = ExperimentType.OTHER return node_to_experiment_type def reset(self, conversions: Optional[list[Conversion]] = None) -> None: """ Reset the graph to its default state. Returns: None """ self.clear() self._conversions = conversions or self.load_default_conversions() self._node_alias_id_map = {} self.create_conversion_graph() def copy(self): """ Create a copy of this graph, returning a new instance of ConversionGraph. """ copied_conversions = self._conversions.copy() if self._conversions is not None else None return ConversionGraph( conversions=copied_conversions, require_native=self.require_native, include_isolated=self._include_isolated, edge_bias=self.edge_bias, nodes=self._init_nodes, ) def subgraph( self, experiment_type: Union[ExperimentType, list[ExperimentType]] ) -> "ConversionGraph": """Filter the graph to include only nodes with the specified experiment type(s). Args: experiment_type (Union[ExperimentType, list[ExperimentType]]): The experiment type(s) Returns: ConversionGraph: A new ConversionGraph instance with only nodes of the specified experiment type(s). Raises: ValueError: If there are no nodes with the specified experiment type(s) in the graph. """ node_experiment_types = self.get_node_experiment_types() exp_types = experiment_type if isinstance(experiment_type, list) else [experiment_type] nodes = [n for n in self.nodes() if node_experiment_types[n] in exp_types] if not nodes: exp_type_names = ", ".join([exp_type.name for exp_type in exp_types]) raise ValueError( f"No program type nodes found with experiment type(s) '{exp_type_names}'. " "Use ConversionGraph.get_node_experiment_types() to inspect all experiment " "type mappings in this graph." ) self._init_nodes = set(nodes) return self.copy() def __eq__(self, value: object) -> bool: return ( super().__eq__(value) and isinstance(value, ConversionGraph) and self.conversions() == value.conversions() and self.require_native == value.require_native and self._node_alias_id_map == value._node_alias_id_map and self._include_isolated == value._include_isolated and self.edge_bias == value.edge_bias and self._init_nodes == value._init_nodes ) def plot(self, **kwargs): """ Plot the conversion graph. .. note:: Matplotlib is an optional dependency and will not be installed with qbraid by default. If you intend to use this method make sure that you install matplotlib with either ``pip install matplotlib`` or ``pip install 'qbraid[visualization]'``. Args: **kwargs: Keyword arguments for the plot function. Returns: None """ # pylint: disable-next=import-outside-toplevel from qbraid.visualization.plot_conversions import plot_conversion_graph plot_conversion_graph(self, **kwargs)