Source code for qbraid.runtime.quantinuum.job

# Copyright 2026 qBraid
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Module defining Quantinuum job class.

"""
from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any

from qbraid.runtime.enums import JobStatus
from qbraid.runtime.exceptions import QbraidRuntimeError
from qbraid.runtime.job import QuantumJob
from qbraid.runtime.result import Result
from qbraid.runtime.result_data import GateModelResultData

if TYPE_CHECKING:
    from qnexus.models.references import ExecuteJobRef

    from qbraid.runtime.quantinuum.device import QuantinuumDevice

logger = logging.getLogger(__name__)

_QUANTINUUM_STATUS_MAP: dict[str, JobStatus] = {
    "COMPLETED": JobStatus.COMPLETED,
    "ERROR": JobStatus.FAILED,
    "CANCELLED": JobStatus.CANCELLED,
    "QUEUED": JobStatus.QUEUED,
    "RUNNING": JobStatus.RUNNING,
    "INITIALIZING": JobStatus.INITIALIZING,
}


class QuantinuumJobError(QbraidRuntimeError):
    """Class for errors raised while processing a Quantinuum job."""


def _map_quantinuum_status(last_status: str | None) -> JobStatus:
    """Map a qnexus ``last_status`` string to a qBraid :class:`JobStatus`."""
    if last_status is None:
        return JobStatus.UNKNOWN
    return _QUANTINUUM_STATUS_MAP.get(last_status, JobStatus.UNKNOWN)


[docs] class QuantinuumJob(QuantumJob): """Quantinuum NEXUS job class."""
[docs] def __init__( self, job_id: str, device: QuantinuumDevice | None = None, job: ExecuteJobRef | None = None, **kwargs: Any, ) -> None: super().__init__(job_id=job_id, device=device, **kwargs) self._job = job
def _get_ref(self) -> ExecuteJobRef: """Return the cached qnexus job reference, or look it up by ID.""" if self._job is not None: return self._job try: # pylint: disable-next=import-outside-toplevel import qnexus as qnx self._job = qnx.jobs.get(id=self.id) return self._job except Exception as exc: raise QuantinuumJobError(f"Unable to retrieve Quantinuum job {self.id}") from exc def status(self) -> JobStatus: """Return the current status of the Quantinuum job.""" if self._cache_metadata.get("status") in JobStatus.terminal_states(): return self._cache_metadata["status"] try: ref = self._get_ref() last_status = ref.last_status except Exception as exc: raise QuantinuumJobError(f"Unable to retrieve job status for {self.id}") from exc status = _map_quantinuum_status(last_status) if status == JobStatus.FAILED and ref.last_message: logger.error("Quantinuum job %s failed: %s", self.id, ref.last_message) self._cache_metadata["status"] = status return status def _resolve_device_id(self, ref: ExecuteJobRef) -> str: """Resolve the target device name for a job. Prefers the device this job was constructed with, then tries to read the device name from the NEXUS job's ``backend_config`` when it is a :class:`~quantinuum_schemas.models.backend_config.QuantinuumConfig` (other ``BackendConfig`` subclasses do not expose ``device_name``). Falls back to the generic ``"quantinuum"`` label if neither is set. """ # pylint: disable-next=import-outside-toplevel from quantinuum_schemas.models.backend_config import QuantinuumConfig if self._device is not None: return self._device.id backend_config = ref.backend_config if isinstance(backend_config, QuantinuumConfig): return backend_config.device_name return "quantinuum" def cancel(self) -> None: """Cancel the Quantinuum job.""" # pylint: disable-next=import-outside-toplevel import qnexus as qnx try: qnx.jobs.cancel(self._get_ref()) except Exception as exc: raise QuantinuumJobError(f"Failed to cancel Quantinuum job {self.id}") from exc def execution_time_s(self) -> float | None: """Return the wall-clock execution time of the job in seconds. Computed from the NEXUS ``last_status_detail`` timestamps (``completed_time - running_time``). This duration reflects the total time the job spent in the "running" phase on the service and may include queueing, calibration, and other backend checks in addition to on-device execution. qnexus does not currently expose a more granular, on-device-only metric. Returns: The execution time in seconds, or ``None`` if the job has not completed. Raises: QuantinuumJobError: If the job is completed but timing details are unavailable. """ if self.status() != JobStatus.COMPLETED: return None ref = self._get_ref() last_status_detail = ref.last_status_detail if last_status_detail is None: raise QuantinuumJobError( f"Execution time not available for {self.id}: last_status_detail is missing" ) completed_time = last_status_detail.completed_time running_time = last_status_detail.running_time if completed_time is None or running_time is None: raise QuantinuumJobError( f"Execution time not available for {self.id}: " "completed_time or running_time is missing" ) return (completed_time - running_time).total_seconds() def result(self) -> Result[GateModelResultData]: """Return the result of the Quantinuum job.""" # pylint: disable-next=import-outside-toplevel import qnexus as qnx # pylint: disable-next=import-outside-toplevel from pytket.circuit import BasisOrder self.wait_for_final_state() if self.status() == JobStatus.FAILED: ref = self._get_ref() raise QuantinuumJobError( f"Quantinuum job {self.id} failed: {ref.last_message or 'no error message'}" ) ref = self._get_ref() try: results = qnx.jobs.results(ref) except Exception as exc: raise QuantinuumJobError( f"Failed to fetch results for Quantinuum job {self.id}" ) from exc if not results: raise QuantinuumJobError(f"No results available for Quantinuum job {self.id}") # Quantinuum / pytket use least-significant-bit-first ordering by default. # Convert to most-significant-bit-first (dlo = descending lexicographic order) # for consistency with other qBraid providers. all_counts: list[dict[str, int]] = [] for result_item in results: counts = result_item.download_result().get_counts(basis=BasisOrder.dlo) all_counts.append({"".join(map(str, k)): v for k, v in counts.items()}) measurement_counts = all_counts[0] if len(all_counts) == 1 else all_counts device_id = self._resolve_device_id(ref) return Result[GateModelResultData]( device_id=device_id, job_id=self.id, success=True, data=GateModelResultData(measurement_counts=measurement_counts), )