Source code for qbraid.runtime.oqc.job

# Copyright (C) 2024 qBraid
#
# This file is part of the qBraid-SDK
#
# The qBraid-SDK is free software released under the GNU General Public License v3
# or later. You can redistribute and/or modify it under the terms of the GPL v3.
# See the LICENSE file in the project root or <https://www.gnu.org/licenses/gpl-3.0.html>.
#
# THERE IS NO WARRANTY for the qBraid-SDK, as per Section 15 of the GPL v3.

"""
Module for OQC job class.

"""
from __future__ import annotations

import json
from typing import TYPE_CHECKING, Any, Optional

from qbraid.runtime.enums import JobStatus
from qbraid.runtime.exceptions import ResourceNotFoundError
from qbraid.runtime.job import QuantumJob

from .result import OQCJobResult

if TYPE_CHECKING:
    from qcaas_client.client import OQCClient, QPUTaskErrors

RESULTS_FORMAT = {
    2: "raw",
    3: "binary",
}

METRICS = {
    1: "empty",
    2: "optimized_circuit",
    4: "optimized_instruction_count",
    6: "default",
}

OPTIMIZATIONS = {
    1: "empty",
    2: "default_mapping_pass",
    4: "full_peephole_optimise",
    8: "context_simplify",
    18: "one",
    30: "two",
    32: "clifford_simplify",
    64: "decompose_controlled_gates",
    128: "globalise_phased_x",
    256: "kak_decomposition",
    512: "peephole_optimise_2q",
    1024: "remove_discarded",
    2048: "remove_barriers",
    4096: "remove_redundancies",
    8192: "three_qubit_squash",
    16384: "simplify_measured",
}


[docs] class OQCJob(QuantumJob): """Oxford Quantum Circuit job class."""
[docs] def __init__(self, job_id: str, qpu_id: str, client: OQCClient, **kwargs): super().__init__(job_id=job_id, **kwargs) self._qpu_id = qpu_id self._client = client
def cancel(self) -> None: """Cancel the task.""" self._client.cancel_task(task_id=self.id, qpu_id=self._qpu_id) def result(self, **kwargs) -> OQCJobResult: """Get the result of the task.""" self.wait_for_final_state() status = self.status(**kwargs) timings = self.get_timings() success = status == JobStatus.COMPLETED result_data = { "timings": timings, "success": success, "error_details": None, "counts": None, } if success: qpu_task_result = self._client.get_task_results( task_id=self.id, qpu_id=self._qpu_id, **kwargs ) if not qpu_task_result: raise ResourceNotFoundError("No result found for the task") result_data["counts"] = qpu_task_result.result.get("c") else: result_data["error_details"] = self.get_errors(**kwargs) return OQCJobResult(result_data) def status(self, **kwargs) -> JobStatus: """Get the status of the task.""" task_status = self._client.get_task_status(task_id=self.id, qpu_id=self._qpu_id, **kwargs) status_map = { "CREATED": JobStatus.INITIALIZING, "SUBMITTED": JobStatus.INITIALIZING, "RUNNING": JobStatus.RUNNING, "FAILED": JobStatus.FAILED, "CANCELLED": JobStatus.CANCELLED, "COMPLETED": JobStatus.COMPLETED, "UNKNOWN": JobStatus.UNKNOWN, "EXPIRED": JobStatus.FAILED, } return status_map.get(task_status, JobStatus.UNKNOWN) def metadata(self, use_cache: bool = False) -> dict[str, Any]: """Get the metadata for the task.""" if not use_cache: status = self.status() self._cache_metadata["status"] = status provider_metadata = self._client.get_task_metadata(task_id=self.id, qpu_id=self._qpu_id) del provider_metadata["id"] config = json.loads(provider_metadata["config"]) del config["$type"] provider_metadata["shots"] = config["$data"]["repeats"] provider_metadata["repetition_period"] = config["$data"]["repetition_period"] provider_metadata["results_format"] = RESULTS_FORMAT[ config["$data"]["results_format"]["$data"]["transforms"]["$value"] ] provider_metadata["metrics"] = METRICS[config["$data"]["metrics"]["$value"]] provider_metadata["active_calibrations"] = config["$data"]["active_calibrations"] try: provider_metadata["optimizations"] = OPTIMIZATIONS[ config["$data"]["optimizations"]["$data"]["tket_optimizations"]["$value"] ] except TypeError: provider_metadata["optimizations"] = None provider_metadata["error_mitigation"] = config["$data"]["error_mitigation"] del provider_metadata["config"] self._cache_metadata.update(provider_metadata) return self._cache_metadata def metrics(self, **kwargs) -> dict[str, Any]: """Get the metrics for the task.""" return self._client.get_task_metrics(task_id=self.id, qpu_id=self._qpu_id, **kwargs) def get_timings(self, **kwargs) -> dict[str, Any]: """Get the timings for the task.""" return self._client.get_task_timings(task_id=self.id, qpu_id=self._qpu_id, **kwargs) def get_errors(self, **kwargs) -> Optional[QPUTaskErrors]: """Get the error message for the task.""" if self.status(**kwargs) != JobStatus.FAILED: return None try: return self._client.get_task_errors( task_id=self.id, qpu_id=self._qpu_id, **kwargs ).error_message except AttributeError: return None