Source code for qbraid.runtime.native.job

# Copyright (C) 2024 qBraid
#
# This file is part of the qBraid-SDK
#
# The qBraid-SDK is free software released under the GNU General Public License v3
# or later. You can redistribute and/or modify it under the terms of the GPL v3.
# See the LICENSE file in the project root or <https://www.gnu.org/licenses/gpl-3.0.html>.
#
# THERE IS NO WARRANTY for the qBraid-SDK, as per Section 15 of the GPL v3.

"""
Module defining QbraidJob class

"""
from __future__ import annotations

from typing import TYPE_CHECKING, Optional, Type, Union

from qbraid_core.services.quantum import QuantumClient

from qbraid._logging import logger
from qbraid.programs import ExperimentType
from qbraid.runtime.enums import JobStatus
from qbraid.runtime.exceptions import JobStateError, QbraidRuntimeError
from qbraid.runtime.job import QuantumJob
from qbraid.runtime.result import Result, ResultDataType
from qbraid.runtime.result_data import AhsResultData, AnnealingResultData, GateModelResultData
from qbraid.runtime.schemas import RuntimeJobModel

from .result import (
    NECVectorAnnealerResultData,
    QbraidQirSimulatorResultData,
    QuEraQasmSimulatorResultData,
)

if TYPE_CHECKING:
    import qbraid_core.services.quantum

    import qbraid.runtime


[docs] class QbraidJob(QuantumJob): """Class representing a qBraid job."""
[docs] def __init__( self, job_id: str, device: Optional[qbraid.runtime.QbraidDevice] = None, client: Optional[qbraid_core.services.quantum.QuantumClient] = None, **kwargs, ): super().__init__(job_id, device, **kwargs) self._client = client
@property def client(self) -> qbraid_core.services.quantum.QuantumClient: """ Lazily initializes and returns the client object associated with the job. If the job has an associated device with a client, that client is used. Otherwise, a new instance of QuantumClient is created and used. Returns: QuantumClient: The client object associated with the job. """ if self._client is None: self._client = self._device.client if self._device else QuantumClient() return self._client def queue_position(self) -> Optional[int]: """Return the position of the job in the queue.""" job_data = self.metadata() return job_data.get("queue_position", job_data.get("queuePosition")) def status(self) -> JobStatus: """Return the status of the job / task , among the values of ``JobStatus``.""" terminal_states = JobStatus.terminal_states() if self._cache_metadata.get("status") not in terminal_states: client_data = self.client.get_job(self.id) job_model = RuntimeJobModel.from_dict(client_data) job_data = job_model.model_dump(exclude={"metadata", "cost"}) status = JobStatus(job_data.pop("status")) if job_model.status_text is not None: status.set_status_message(job_model.status_text) self._cache_metadata.update({**job_data, "status": status}) return self._cache_metadata["status"] def cancel(self) -> None: """Attempt to cancel the job.""" if self.is_terminal_state(): raise JobStateError("Cannot cancel job in a terminal state.") self.client.cancel_job(self.id) logger.info("Cancel job request validated.") try: logger.info("Waiting for job to cancel...") self.wait_for_final_state(timeout=3, poll_interval=1) except JobStateError: pass status = self.status() if status not in {JobStatus.CANCELLED, JobStatus.CANCELLING}: raise QbraidRuntimeError(f"Failed to cancel job. Current status: {status.name}") logger.info("Success. Current status: %s", status.name) @staticmethod def get_result_data_cls( device_id: Optional[str] = None, experiment_type: Optional[ExperimentType] = None ) -> Union[Type[GateModelResultData], Type[AnnealingResultData], Type[AhsResultData]]: """Determine the appropriate ResultData class based on device_id and experiment_type.""" device_to_result_data = { "qbraid_qir_simulator": QbraidQirSimulatorResultData, "quera_qasm_simulator": QuEraQasmSimulatorResultData, "nec_vector_annealer": NECVectorAnnealerResultData, } result_data_cls = device_to_result_data.get(device_id) if not result_data_cls: experiment_type_to_result_data = { ExperimentType.GATE_MODEL: GateModelResultData, ExperimentType.ANNEALING: AnnealingResultData, ExperimentType.AHS: AhsResultData, } result_data_cls = experiment_type_to_result_data.get(experiment_type) if not result_data_cls: raise ValueError( f"Unsupported device_id '{device_id}' or experiment_type '{experiment_type.name}'" ) return result_data_cls def result(self) -> Result[ResultDataType]: """Return the results of the job.""" self.wait_for_final_state() job_data = self.client.get_job(self.id) success = job_data.get("status") == JobStatus.COMPLETED.name job_result = ( self.client.get_job_results(self.id, wait_time=1, backoff_factor=1.4) if success else {} ) job_result.update(job_data) model = RuntimeJobModel.from_dict(job_result) result_data_cls = self.get_result_data_cls(model.device_id, model.experiment_type) data = result_data_cls.from_object(model.metadata) exclude = ( {"solutions", "num_solutions"} if model.experiment_type == ExperimentType.ANNEALING else {"measurement_counts", "measurements"} ) metadata_dump = model.metadata.model_dump(by_alias=True, exclude=exclude) model_dump = model.model_dump(by_alias=True, exclude={"job_id", "device_id", "metadata"}) experiment_type: ExperimentType = model_dump["experimentType"] status_text = ( model.status_text or model.status.status_message or model.status.default_message ) model_dump.update( { "status": model.status.name, "statusText": status_text, "experimentType": experiment_type, } ) model_dump["metadata"] = metadata_dump return Result[ResultDataType]( device_id=model.device_id, job_id=model.job_id, success=success, data=data, **model_dump )