# Copyright (C) 2024 qBraid
#
# This file is part of the qBraid-SDK
#
# The qBraid-SDK is free software released under the GNU General Public License v3
# or later. You can redistribute and/or modify it under the terms of the GPL v3.
# See the LICENSE file in the project root or <https://www.gnu.org/licenses/gpl-3.0.html>.
#
# THERE IS NO WARRANTY for the qBraid-SDK, as per Section 15 of the GPL v3.
# pylint: disable=arguments-differ
"""
Module defining QbraidDevice class
"""
from __future__ import annotations
import json
import logging
from typing import TYPE_CHECKING, Optional, Union
from qbraid_core.services.quantum import QuantumClient, QuantumServiceRequestError
from qbraid.programs import ProgramSpec, get_program_type_alias, load_program
from qbraid.programs.typer import Qasm2String, Qasm2StringType
from qbraid.runtime.device import QuantumDevice
from qbraid.runtime.enums import DeviceStatus, NoiseModel
from qbraid.runtime.exceptions import QbraidRuntimeError
from qbraid.transpiler import ConversionGraph, transpile
from .job import QbraidJob
if TYPE_CHECKING:
import pyqir
import qbraid_core.services.quantum
import qbraid.programs
import qbraid.runtime
import qbraid.transpiler
logger = logging.getLogger(__name__)
[docs]
class QbraidDevice(QuantumDevice):
"""Class to represent a qBraid device."""
[docs]
def __init__(
self,
profile: qbraid.runtime.TargetProfile,
client: Optional[qbraid_core.services.quantum.QuantumClient] = None,
**kwargs,
):
"""Create a new QbraidDevice object."""
super().__init__(profile=profile, **kwargs)
self._client = client or QuantumClient()
@property
def client(self) -> QuantumClient:
"""Return the QuantumClient object."""
return self._client
def status(self) -> qbraid.runtime.DeviceStatus:
"""Return device status."""
device_data = self.client.get_device(self.id)
status = device_data.get("status")
if not status:
raise QbraidRuntimeError("Failed to retrieve device status")
return DeviceStatus(status.lower())
def queue_depth(self) -> int:
"""Return the number of jobs in the queue for the backend"""
device_data = self.client.get_device(self.id)
pending_jobs = device_data.get("pendingJobs", 0)
return int(pending_jobs)
def transform(
self, run_input: Union[pyqir.Module, Qasm2StringType]
) -> dict[str, Union[Qasm2StringType, bytes]]:
"""Transform the input to the format expected by the qBraid API."""
if isinstance(run_input, Qasm2String):
return {"openQasm": run_input}
return {"bitcode": run_input.bitcode}
def submit( # pylint: disable=too-many-arguments
self,
run_input: dict[str, Union[bytes, str]],
shots: Optional[int] = None,
tags: Optional[dict[str, str]] = None,
entrypoint: Optional[str] = None,
noise_model: Optional[str] = None,
seed: Optional[int] = None,
timeout: Optional[int] = None,
) -> qbraid.runtime.QbraidJob:
"""
Creates a qBraid Quantum Job.
Args:
run_input (dict[str, Union[bytes, str]]): Dictionary containing
QIR bytecode or OpenQASM string to be run on the device.
shots (optional, int): The number of times to repeat the execution of the
program. Default value varies by device.
tags (optional, dict): A dictionary of tags to associate with the job.
entrypoint (optional, str): Name of the entrypoint function to execute.
Only applicable if run_input is a QIR module. Defaults to None.
noise_model (optional, str): The noise model to apply to the job.
Only applicable if device supports noisey simulation. Defaults to None.
seed (optional, int): The seed to use for the random number generator.
Only applicable for certain devices. Defaults to None.
timeout (optional, int): The maximum time in seconds to wait for the job
to complete after execution has started. Defaults to None.
Returns:
QbraidJob: The job objects representing the submitted job.
See Also: https://docs.qbraid.com/api-reference/api-reference/post-quantum-jobs
"""
payload = {
"qbraidDeviceId": self.id,
"tags": json.dumps(tags or {}),
"shots": shots,
"seed": seed,
"entrypoint": entrypoint,
"timeout": timeout,
"noiseModel": noise_model,
**run_input,
}
job_data = self.client.create_job(data=payload)
job_id: str = job_data.pop("qbraidJobId")
job_data["job_id"] = job_id
return QbraidJob(**job_data, device=self, client=self.client)
def try_extracting_info(self, func, error_message):
"""Try to extract information from a function/attribute,
logging an error if it fails."""
try:
return func()
except Exception as err: # pylint: disable=broad-exception-caught
logger.info("%s: %s. Field will be omitted in job metadata.", error_message, str(err))
return None
def _extract_qasm_rep(
self, program: qbraid.programs.QPROGRAM, program_spec: ProgramSpec
) -> Optional[str]:
"""Populate the qasm info in the payload."""
if program_spec.alias in ["qasm2", "qasm3"]:
return program
aux_graph = ConversionGraph()
closest_qasm = aux_graph.closest_target(program_spec.alias, ["qasm2", "qasm3"])
if closest_qasm is None:
return None
return transpile(program, closest_qasm, conversion_graph=aux_graph)
def run(
self,
run_input: Union[qbraid.programs.QPROGRAM, list[qbraid.programs.QPROGRAM]],
shots: Optional[int] = None,
tags: Optional[dict[str, str]] = None,
**kwargs,
) -> Union[qbraid.runtime.QbraidJob, list[qbraid.runtime.QbraidJob]]:
"""
Run a quantum job or a list of quantum jobs on this quantum device.
Args:
run_input (Union[QPROGRAM, list[QPROGRAM]]): A single quantum program
or a list of quantum programs to run on the device.
shots (optional, int): The number of times to repeat the execution of the
program. Default value varies by device.
tags (optional, dict): A dictionary of tags to associate with the job.
**kwargs: Additional json data to include in the job submission payload.
Returns:
A QuantumJob object or a list of QuantumJob objects corresponding to the input.
Raises:
ValueError: If any protected dynamic parameters are specified in the kwargs.
"""
dynamic_params = {
"openQasm": None,
"bitcode": None,
"circuitNumQubits": None,
"circuitDepth": None,
}
forbidden_keys = set(dynamic_params.keys())
if any(key in kwargs for key in forbidden_keys):
raise ValueError(
f"You cannot specify {', '.join(forbidden_keys)} "
"as they are dynamically determined."
)
if not isinstance(run_input, list):
run_input_list = [run_input]
is_single_input = True
else:
run_input_list = run_input
is_single_input = False
noise_model: Optional[NoiseModel] = kwargs.pop("noise_model", None)
if noise_model:
if noise_model not in self.profile.get("noise_models", []):
raise ValueError(f"Noise model '{noise_model}' not supported by device.")
noise_model = noise_model.value
kwargs["noise_model"] = noise_model
jobs: list[qbraid.runtime.QbraidJob] = []
for program in run_input_list:
program_alias = get_program_type_alias(program, safe=True)
program_spec = ProgramSpec(type(program), alias=program_alias)
qbraid_program = load_program(program) if program_spec.native else None
aux_payload = {}
if qbraid_program:
aux_payload["circuitNumQubits"] = self.try_extracting_info(
lambda program=qbraid_program: program.num_qubits,
"Error calculating circuit number of qubits.",
)
aux_payload["circuitDepth"] = self.try_extracting_info(
lambda program=qbraid_program: program.depth, "Error calculating circuit depth."
)
aux_payload["openQasm"] = self.try_extracting_info(
lambda program=program, program_spec=program_spec: self._extract_qasm_rep(
program, program_spec
),
"Error extracting OpenQASM string representation.",
)
self.validate(qbraid_program)
transpiled_program = self.transpile(program, program_spec)
run_input_json = self.transform(transpiled_program)
runtime_payload = {**aux_payload, **run_input_json}
job = self.submit(run_input=runtime_payload, shots=shots, tags=tags, **kwargs)
jobs.append(job)
return jobs[0] if is_single_input else jobs
def estimate_cost(
self, shots: Optional[int], execution_time: Optional[Union[float, int]]
) -> float:
"""Estimate the cost of running a quantum job on this device in qBraid credits,
where 1 credit equals $0.01 USD.
The estimated cost is based on the device's pricing model, which may include charges per
task, per shot, and/or per minute. *Note*: The actual price charged may differ from this
calculation. Visit https://docs.qbraid.com/home/pricing for the latest pricing information
and details about qBraid credits.
Args:
shots (int, optional): The number of quantum circuit executions in the quantum job.
execution_time (Union[float, int], optional): The estimated time (in minutes) to
complete the quantum job.
Returns:
float: The estimated cost for the quantum job in qBraid credits.
Raises:
ValueError: If `shots` and `execution_time` are None or 0, or if either is negative.
QbraidRuntimeError: If unable to retrieve the cost estimate from the qBraid API.
"""
if not shots:
shots = None
if not execution_time:
execution_time = None
if shots is None and execution_time is None:
raise ValueError(
"At least one of 'shots' or 'execution_time' must be provided to estimate cost."
)
if shots is not None:
if not isinstance(shots, int) or shots < 0:
raise ValueError("'shots' must be a non-negative integer.")
if execution_time is not None:
if not isinstance(execution_time, (int, float)) or execution_time < 0:
raise ValueError("'execution_time' must be a non-negative number.")
try:
return self.client.estimate_cost(self.id, shots, execution_time)
except QuantumServiceRequestError as err:
raise QbraidRuntimeError(
"Failed to estimate cost due to a service request error."
) from err