Source code for qbraid.runtime.native.device

# Copyright (C) 2024 qBraid
#
# This file is part of the qBraid-SDK
#
# The qBraid-SDK is free software released under the GNU General Public License v3
# or later. You can redistribute and/or modify it under the terms of the GPL v3.
# See the LICENSE file in the project root or <https://www.gnu.org/licenses/gpl-3.0.html>.
#
# THERE IS NO WARRANTY for the qBraid-SDK, as per Section 15 of the GPL v3.

"""
Module defining QbraidDevice class

"""
import json
import logging
from typing import TYPE_CHECKING, Any, Optional, Union

from qbraid_core.services.quantum import QuantumClient

from qbraid.programs import ProgramSpec, get_program_type_alias, load_program
from qbraid.runtime.device import QuantumDevice
from qbraid.runtime.enums import DeviceStatus
from qbraid.runtime.exceptions import QbraidRuntimeError
from qbraid.transpiler import transpile

from .job import QbraidJob

if TYPE_CHECKING:
    import pyqir
    import qbraid_core.services.quantum

    import qbraid.programs
    import qbraid.runtime
    import qbraid.transpiler


logger = logging.getLogger(__name__)


[docs] class QbraidDevice(QuantumDevice): """Class to represent a qBraid device."""
[docs] def __init__( self, profile: "qbraid.runtime.TargetProfile", client: "Optional[qbraid_core.services.quantum.QuantumClient]" = None, **kwargs, ): """Create a new QbraidDevice object.""" super().__init__(profile=profile, **kwargs) self._client = client or QuantumClient()
@property def client(self) -> QuantumClient: """Return the QuantumClient object.""" return self._client def status(self) -> "qbraid.runtime.DeviceStatus": """Return device status.""" device_data = self.client.get_device(self.id) status = device_data.get("status") if not status: raise QbraidRuntimeError("Failed to retrieve device status") return DeviceStatus(status.lower()) def queue_depth(self) -> int: """Return the number of jobs in the queue for the backend""" device_data = self.client.get_device(self.id) pending_jobs = device_data.get("pendingJobs", 0) return int(pending_jobs) # pylint: disable-next=too-many-arguments def create_job( self, tags: Optional[dict[str, str]] = None, shots: Optional[int] = None, openqasm: Optional[str] = None, bitcode: Optional[bytes] = None, num_qubits: Optional[int] = None, depth: Optional[int] = None, **kwargs, ) -> dict[str, Any]: """Create new qBraid job. Args: tags (optional, dict): A dictionary of tags to associate with the job. shots (optional, int): The number of shots to run the job for. bitcode (optional, bytes): The QIR byte code to run. openqasm (optional, str): The OpenQASM to run. num_qubits (optional, int): The number of qubits in the circuit. depth (optional, int): The depth of the circuit. Returns: The qbraid job ID associated with this job """ tags = tags or {} init_data = { "bitcode": bitcode, "qbraidDeviceId": self.id, "shots": shots, "openQasm": openqasm, "circuitNumQubits": num_qubits, "circuitDepth": depth, "tags": json.dumps(tags), **kwargs, } return self.client.create_job(data=init_data) def _create_and_return_job( self, module: "pyqir.Module", entrypoint: Optional[str] = None, shots: Optional[int] = None, **kwargs, ): job_data = self.create_job( bitcode=module.bitcode, entrypoint=entrypoint, shots=shots, **kwargs ) job_id = job_data.pop("qbraidJobId") return QbraidJob(job_id, device=self, client=self.client, **job_data) def submit( self, run_input: "Union[pyqir.Module, list[pyqir.Module]]", *args, entrypoint: Optional[str] = None, shots: Optional[int] = None, **kwargs, ) -> "Union[qbraid.runtime.QbraidJob, list[qbraid.runtime.QbraidJob]]": """Runs the qir-runner executable with the given QIR file and shots. Args: run_input (Union[pyqir.Module, list[pyqir.Module,]): QIR module to run in the simulator. entrypoint (optional, str): Name of the entrypoint function to execute in the QIR file. shots (optional, int): The number of times to repeat the execution of the chosen entry point in the program. Defaults to 1. Returns: Union[QbraidJob, list[QbraidJob]: The job object(s) representing the submitted job(s). """ is_single_input = not isinstance(run_input, list) run_input = [run_input] if is_single_input else run_input jobs = [ self._create_and_return_job(module, entrypoint, shots, **kwargs) for module in run_input ] if is_single_input: return jobs[0] return jobs def try_extracting_info(self, func, error_message): """Try to extract information from a function/attribute, logging an error if it fails.""" try: return func() except Exception as err: # pylint: disable=broad-exception-caught logger.info("%s: %s. Field will be omitted in job metadata.", error_message, str(err)) return None def run( self, run_input: "Union[qbraid.programs.QPROGRAM, list[qbraid.programs.QPROGRAM]]", *args, **kwargs, ) -> "Union[qbraid.runtime.job.QbraidJob, list[qbraid.runtime.job.QbraidJob]]": """ Run a quantum job or a list of quantum jobs on this quantum device. Args: run_input: A single quantum program or a list of quantum programs to run on the device. Returns: A QuantumJob object or a list of QuantumJob objects corresponding to the input. """ if not isinstance(run_input, list): run_input_list = [run_input] is_single_input = True else: run_input_list = run_input is_single_input = False jobs = [] for program in run_input_list: program_alias = get_program_type_alias(program, safe=True) program_spec = ProgramSpec(type(program), alias=program_alias) qbraid_program = load_program(program) if program_spec.native else None program_data = { "num_qubits": None, "depth": None, "openqasm": None, } if qbraid_program: program_data["num_qubits"] = self.try_extracting_info( lambda program=qbraid_program: program.num_qubits, "Error calculating circuit num_qubits.", ) program_data["depth"] = self.try_extracting_info( lambda program=qbraid_program: program.depth, "Error calculating circuit depth." ) program_data["openqasm"] = self.try_extracting_info( lambda program=program: transpile(program, "qasm3"), "Error converting circuit to OpenQASM 3.", ) self.validate(qbraid_program) transpiled_program = self.transpile(program, program_spec) transformed_program = self.transform(transpiled_program) job = self.submit(transformed_program, **program_data, **kwargs) jobs.append(job) return jobs[0] if is_single_input else jobs