Source code for qbraid.runtime.ibm.job

# Copyright (C) 2024 qBraid
#
# This file is part of the qBraid-SDK
#
# The qBraid-SDK is free software released under the GNU General Public License v3
# or later. You can redistribute and/or modify it under the terms of the GPL v3.
# See the LICENSE file in the project root or <https://www.gnu.org/licenses/gpl-3.0.html>.
#
# THERE IS NO WARRANTY for the qBraid-SDK, as per Section 15 of the GPL v3.

"""
Module defining QiskitJob Class

"""
from __future__ import annotations

import os
from typing import TYPE_CHECKING, Optional

from qiskit.primitives.containers.primitive_result import PrimitiveResult
from qiskit_ibm_runtime import QiskitRuntimeService, RuntimeJobV2
from qiskit_ibm_runtime.exceptions import RuntimeInvalidStateError

from qbraid._logging import logger
from qbraid.runtime.enums import JobStatus
from qbraid.runtime.exceptions import JobStateError, QbraidRuntimeError
from qbraid.runtime.job import QuantumJob
from qbraid.runtime.result import Result
from qbraid.runtime.result_data import GateModelResultData

from .result_builder import QiskitGateModelResultBuilder

if TYPE_CHECKING:
    import qiskit.primitives
    import qiskit.result
    import qiskit_ibm_runtime


IBM_JOB_STATUS_MAP = {
    "INITIALIZING": JobStatus.INITIALIZING,
    "QUEUED": JobStatus.QUEUED,
    "VALIDATING": JobStatus.VALIDATING,
    "RUNNING": JobStatus.RUNNING,
    "CANCELLED": JobStatus.CANCELLED,
    "DONE": JobStatus.COMPLETED,
    "ERROR": JobStatus.FAILED,
}


[docs] class QiskitJob(QuantumJob): """Class for interfacing with a Qiskit IBM ``RuntimeJob``."""
[docs] def __init__( self, job_id: str, job: Optional[ qiskit_ibm_runtime.RuntimeJob | qiskit_ibm_runtime.RuntimeJobV2 | qiskit.primitives.PrimitiveJob ] = None, service: Optional[qiskit_ibm_runtime.QiskitRuntimeService] = None, **kwargs, ): """Create a ``QiskitJob`` instance.""" super().__init__(job_id, **kwargs) self._job = job or self._get_job(service=service)
def _get_job( self, service: Optional[qiskit_ibm_runtime.QiskitRuntimeService] = None ) -> qiskit_ibm_runtime.RuntimeJob | qiskit_ibm_runtime.RuntimeJobV2: """Return the qiskit_ibm_runtime.RuntimeJob associated with instance id attribute. Attempts to retrieve a job using a specified or default service. Handles service initialization with error management for authentication issues. Raises: QbraidRuntimeError: If there is an error initializing the service or retrieving the job. """ if service is None: if self._device and getattr(self._device, "_service", None): service = self._device._service else: try: token = os.getenv("QISKIT_IBM_TOKEN") channel = os.getenv("QISKIT_IBM_CHANNEL", "ibm_quantum") service = QiskitRuntimeService(channel=channel, token=token) except Exception as err: raise QbraidRuntimeError("Failed to initialize the quantum service.") from err try: return service.job(self.id) except Exception as err: raise QbraidRuntimeError(f"Error retrieving job {self.id}") from err def status(self): """Returns status from Qiskit Job object.""" job_status = self._job.status() job_status = job_status if isinstance(job_status, str) else job_status.name status = IBM_JOB_STATUS_MAP.get(job_status, JobStatus.UNKNOWN) self._cache_metadata["status"] = status return status def queue_position(self) -> Optional[int]: """Returns the position of the job in the server queue.""" if isinstance(self._job, RuntimeJobV2): raise NotImplementedError("Queue position is not supported for RuntimeJobV2.") return self._job.queue_position(refresh=True) def result(self): """Return the results of the job.""" if not self.is_terminal_state(): logger.info("Result will be available when job has reached final state.") runner_result: qiskit.result.Result | PrimitiveResult = self._job.result() if isinstance(runner_result, PrimitiveResult): backend = self._job.backend() if hasattr(self._job, "backend") else None device_id = ( backend.name if backend else (self._device.id if self._device else "sampler") ) job_id = self._job.job_id() success = self.status() == JobStatus.COMPLETED runner_data = runner_result.metadata else: runner_data = runner_result.to_dict() job_id = runner_data.pop("job_id", self._job.job_id()) success = runner_data.pop("success", runner_result.success) device_id = runner_result.backend_name builder = QiskitGateModelResultBuilder(runner_result) measurement_counts = builder.get_counts() measurements = builder.measurements() data = GateModelResultData(measurement_counts=measurement_counts, measurements=measurements) return Result( device_id=device_id, job_id=job_id, success=success, data=data, **runner_data, ) def cancel(self): """Attempt to cancel the job.""" if self.is_terminal_state(): raise JobStateError("Cannot cancel quantum job in non-terminal state.") try: return self._job.cancel() except RuntimeInvalidStateError as err: raise JobStateError from err