# Copyright (c) 2024, qBraid Development Team
# All rights reserved.
"""
Module containing Python wrapper for the qir-runner sparse quantum state simulator.
"""
import datetime
import logging
import os
import pathlib
import shutil
import subprocess
import tempfile
import time
import warnings
from collections import defaultdict
from typing import Any, Optional, Union
try:
import numpy as np # type: ignore
except ImportError:
np = None # type: ignore
from qbraid_core.annotations import deprecated
from qbraid_core.system.generic import _datetime_to_str, get_current_utc_datetime
from qbraid_core.system.versions import is_valid_semantic_version
from .exceptions import QuantumServiceRuntimeError
logger = logging.getLogger(__name__)
def measure_resource_usage():
"""Returns the current memory and CPU usage."""
import psutil
process = psutil.Process()
memory_info = process.memory_info()
cpu_percent = psutil.cpu_percent(interval=None)
return memory_info.rss, cpu_percent # rss: Resident Set Size (in bytes)
[docs]
class QirRunner:
"""A sparse simulator that extends the functionality of the qir-runner.
This simulator is a Python wrapper for the qir-runner, a command-line tool
for executing compiled QIR files. It uses sparse matrices to represent quantum
states and can be used to simulate quantum circuits that have been compiled to QIR.
The simulator allows for setting a seed for random number generation and specifying
an entry point for the execution.
The qir-runner can be found at: https://github.com/qir-alliance/qir-runner
Attributes:
seed (optional, int): The value to use when seeding the random number generator used
for quantum simulation.
exec_path (str): Path to the qir-runner executable.
version (str): The version of the qir-runner executable.
"""
[docs]
def __init__(self, seed: Optional[int] = None, exec_path: Optional[str] = None):
"""Create a QIR runner simulator."""
self.seed = seed
self._version: Optional[str] = None
self._qir_runner: Optional[str] = None
try:
self.set_path(exec_path)
except (ValueError, FileNotFoundError) as err:
warnings.warn(str(err), RuntimeWarning)
@property
def qir_runner(self) -> Optional[str]:
"""Path to the qir-runner executable."""
return self._qir_runner
@property
def version(self) -> str:
"""Get the version of the qir-runner executable, caching the result."""
if self._version is None and self._qir_runner:
try:
result = subprocess.run(
[self._qir_runner, "--version"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
check=False,
)
if result.returncode != 0:
raise QuantumServiceRuntimeError(result.stdout)
version_info = result.stdout.strip()
if version_info.startswith("qir-runner"):
runner_version = version_info.split()[-1]
if is_valid_semantic_version(runner_version):
self._version = runner_version
else:
warnings.warn(
f"Invalid qir-runner version '{runner_version}'. "
"Executable may be corrupt.",
RuntimeWarning,
)
except Exception as err: # pylint: disable=broad-exception-caught
warnings.warn(f"Failed to determine qir-runner version: {err}", RuntimeWarning)
return self._version or "Unknown"
def set_path(self, value: Optional[str]) -> None:
"""Set the qir-runner path with additional validation."""
resolved_path = shutil.which(value or "qir-runner")
if resolved_path is None:
if value is None:
raise ValueError(
"No value was provided for the exec_path, "
"and the qir-runner executable was not found in the system PATH."
)
raise FileNotFoundError(
f"The provided qir-runner executable path '{value}' does not exist."
)
self._qir_runner = resolved_path
self._version = None # Reset version cache since qir_runner changed
def status(self) -> str:
"""Check the status of the qir-runner executable."""
if self.qir_runner is None or self.version is None:
return "UNAVAILABLE"
return "ONLINE"
@staticmethod
def _execute(command: list[str], **kwargs) -> dict[str, Any]:
"""Execute a subprocess command and return its output.
Args:
command (list[str]): The command to execute as a list of arguments.
Returns:
dict[str, Any]: A dictionary containing the status, stdout, stderr,
and timestamps related to the command execution.
"""
status = "COMPLETED"
result = {}
memory_before, cpu_before = measure_resource_usage()
created_at = get_current_utc_datetime()
start = time.perf_counter()
try:
# Execute the command and capture both stdout and stderr
process = subprocess.run(
command,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
**kwargs,
)
result["stdout"] = process.stdout
result["stderr"] = process.stderr
if process.returncode != 0:
status = "FAILED"
except subprocess.TimeoutExpired as err:
status = "FAILED"
result["stderr"] = f"Execution timed out after {err.timeout} seconds."
logger.error("Timeout expired: %s. %s", command, err)
except subprocess.CalledProcessError as err:
status = "FAILED"
result["stderr"] = f"Execution error. Exit code: {err.returncode}"
logger.error(
"Command failed with return code %s: %s. Stderr: %s",
err.returncode,
err.cmd,
err.stderr,
)
finally:
end = time.perf_counter()
memory_after, cpu_after = measure_resource_usage()
execution_duration = (end - start) * 1000 # Convert to milliseconds
ended_at = created_at + datetime.timedelta(milliseconds=execution_duration)
memory_mb = (memory_after - memory_before) / (1024**2) # Convert bytes to MB
cpu_percent = cpu_after - cpu_before # CPU percentage
result.update(
{
"status": status,
"timeStamps": {
"createdAt": _datetime_to_str(created_at),
"endedAt": _datetime_to_str(ended_at),
"executionDuration": int(execution_duration),
},
"memoryUsageMb": memory_mb,
"cpuUsagePercent": cpu_percent,
}
)
return result
@staticmethod
def _parse_results(stdout: str) -> dict[str, list[int]]:
"""Parse the raw output from the execution to extract measurement results."""
results = defaultdict(list)
current_shot_results = []
for line in stdout.splitlines():
elements = line.split()
if len(elements) == 3 and elements[:2] == ["OUTPUT", "RESULT"]:
_, _, bit = elements
current_shot_results.append(int(bit))
elif line.startswith("END"):
for idx, result in enumerate(current_shot_results):
results[f"q{idx}"].append(result)
current_shot_results = []
return dict(results)
@staticmethod
def _data_to_measurements(parsed_data: dict, numpy=np is not None) -> list:
"""Convert parsed data to a 2D array of measurement results."""
data_lists = [parsed_data[key] for key in sorted(parsed_data.keys())]
if numpy:
data_array = np.array(data_lists, dtype=np.int8).T
transposed_data = data_array.tolist()
else:
# Use the pure Python implementation if numpy is not installed
transposed_data = list(map(list, zip(*data_lists)))
return transposed_data
@staticmethod
def _measurements_to_counts(counts: list) -> dict[str, int]:
"""Convert measurements list to histogram data."""
row_strings = ["".join(map(str, row)) for row in counts]
hist_data = {row: row_strings.count(row) for row in set(row_strings)}
counts_dict = {key.replace(" ", ""): value for key, value in hist_data.items()}
num_bits = max(len(key) for key in counts_dict)
all_keys = [format(i, f"0{num_bits}b") for i in range(2**num_bits)]
final_counts = {key: counts_dict.get(key, 0) for key in sorted(all_keys)}
non_zero_counts = {key: value for key, value in final_counts.items() if value != 0}
return non_zero_counts
def process_job_data(self, job_data: dict) -> dict:
"""
Process the job data based on its status, parse the raw output,
and update the job data with measurements and measurement counts.
Args:
job_data (dict): A dictionary containing details and results of a quantum job.
Returns:
dict: Updated job data with additional keys for measurements and counts, if applicable.
"""
status = job_data.get("status")
if status != "COMPLETED":
return job_data
try:
raw_out = job_data.get("stdout", "")
parsed_data = self._parse_results(raw_out)
measurements = self._data_to_measurements(parsed_data)
counts = self._measurements_to_counts(measurements)
# Update job_data with the processed information
job_data["measurements"] = measurements
job_data["measurementCounts"] = counts
except Exception as err: # pylint: disable=broad-exception-caught
logger.error("Error processing job data: %s", err)
return job_data
# pylint: disable-next=too-many-arguments
def execute(
self,
qir_program: Optional[bytes] = None,
file_path: Optional[Union[str, pathlib.Path]] = None,
entrypoint: Optional[str] = None,
shots: Optional[int] = None,
timeout: Optional[float] = None,
**kwargs,
) -> dict[str, Any]:
"""Runs the qir-runner executable with the given QIR file and shots.
Args:
qir_program (optional, bytes): QIR module to run in the simulator.
file_path (optional, Union[str, pathlib.Path]): Path to QIR file to run on simulator.
entrypoint (optional, str): Name of the entrypoint function to execute in the QIR file.
shots (optional, int): The number of times to repeat the execution of the chosen entry
point in the program. Defaults to 1.
timeout (optional, float): Max number of seconds to wait for the command to complete.
Returns:
dict containing the job_id, measurement results, and execution duration.
"""
if not qir_program and not file_path:
raise ValueError("Either qir_program or file_path must be provided.")
if qir_program and file_path:
raise ValueError("Only one of qir_program or file_path should be provided.")
if qir_program:
tmp_dir = os.getenv("TMPDIR", "/tmp")
local_store = pathlib.Path(tmp_dir)
local_store.mkdir(
parents=True, exist_ok=True
) # Create the directory if it doesn't exist
# Use tempfile to automatically manage creation and deletion of the temp file
with tempfile.NamedTemporaryFile(
delete=False, dir=local_store, suffix=".bc"
) as temp_file:
temp_file.write(qir_program)
file_path = pathlib.Path(temp_file.name) # Store file path to use in the command
else:
if not isinstance(file_path, (str, pathlib.Path)):
raise ValueError("file_path must be a string or pathlib.Path object")
file_path = pathlib.Path(file_path)
if self.qir_runner is None:
raise ValueError(
"The qir-runner executable path has not been set. Use set_path() to set it."
)
try:
# Construct the command
command = [self.qir_runner, "--shots", str(shots or 1), "-f", str(file_path)]
if entrypoint:
command.extend(["-e", entrypoint])
if self.seed is not None:
command.extend(["-r", str(self.seed)])
# Execute the qir-runner with the built command
job_data = self._execute(command, timeout=timeout, **kwargs)
job_data = self.process_job_data(job_data)
job_data["runnerVersion"] = self.version
job_data["runnerSeed"] = self.seed
return job_data
finally:
# Ensure the temporary file is deleted even if an error occurs
if qir_program:
file_path.unlink(missing_ok=True)
@deprecated("Use execute method instead.")
def run(self, *args, **kwargs) -> dict[str, Any]:
"""Alias for execute method."""
return self.execute(*args, **kwargs)
def __eq__(self, other):
"""Check if two Simulator instances are equal based on their attributes."""
if not isinstance(other, QirRunner):
return NotImplemented
return (
(self.seed == other.seed)
and (self.qir_runner == other.qir_runner)
and (self.version == other.version)
)
def __repr__(self):
return f"QirRunner(seed={self.seed}, exec_path={self.qir_runner}, version={self.version})"
class Simulator(QirRunner):
"""Deprecated class name for the QirRunner simulator."""