Source code for qbraid.runtime.ionq.job

# Copyright (C) 2024 qBraid
#
# This file is part of the qBraid-SDK
#
# The qBraid-SDK is free software released under the GNU General Public License v3
# or later. You can redistribute and/or modify it under the terms of the GPL v3.
# See the LICENSE file in the project root or <https://www.gnu.org/licenses/gpl-3.0.html>.
#
# THERE IS NO WARRANTY for the qBraid-SDK, as per Section 15 of the GPL v3.

# pylint:disable=invalid-name

"""
Module defining IonQ job class

"""
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Optional, Union

from qbraid.runtime.enums import JobStatus
from qbraid.runtime.exceptions import QbraidRuntimeError
from qbraid.runtime.job import QuantumJob
from qbraid.runtime.postprocess import normalize_counts
from qbraid.runtime.result import GateModelResultData, MeasCount, MeasProb, Result

if TYPE_CHECKING:
    import qbraid.runtime.ionq.provider


[docs] class IonQJobError(QbraidRuntimeError): """Class for errors raised while processing an IonQ job."""
[docs] class IonQJob(QuantumJob): """IonQ job class."""
[docs] def __init__(self, job_id: str, session: qbraid.runtime.ionq.provider.IonQSession, **kwargs): super().__init__(job_id=job_id, **kwargs) self._session = session
@property def session(self) -> qbraid.runtime.ionq.provider.IonQSession: """Return the IonQ session.""" return self._session @staticmethod def _map_status(status: str) -> JobStatus: """Convert IonQ job status to qBraid job status.""" status_map = { "submitted": JobStatus.INITIALIZING, "ready": JobStatus.QUEUED, "running": JobStatus.RUNNING, "failed": JobStatus.FAILED, "canceled": JobStatus.CANCELLED, "completed": JobStatus.COMPLETED, } return status_map.get(status, JobStatus.UNKNOWN) def status(self) -> JobStatus: """Return the current status of the IonQ job.""" job_data = self.session.get_job(self.id) status = job_data.get("status") return self._map_status(status) def metadata(self) -> dict[str, Any]: """Store and return the metadata of the IonQ job.""" if self.is_terminal_state() and "status" in self._cache_metadata: return self._cache_metadata job_metadata = self.session.get_job(self.id) self._cache_metadata.update(job_metadata) self._cache_metadata["status"] = self._map_status(self._cache_metadata["status"]) return self._cache_metadata def cancel(self) -> None: """Cancel the IonQ job.""" self.session.cancel_job(self.id) @staticmethod def _get_counts(result: dict[str, Any]) -> Union[MeasCount, list[MeasCount]]: """Return the raw counts of the run.""" shots: Optional[int] = result.get("shots") probabilities: Optional[Union[MeasProb, dict[str, MeasProb]]] = result.get("probabilities") if shots is None or probabilities is None: raise ValueError("Missing shots or probabilities in result data.") def convert_to_counts(meas_prob: dict[str, float]) -> dict[str, int]: """Helper function to normalize probabilities and convert to counts.""" probs_dec = {int(key): value for key, value in meas_prob.items()} probs_normal = normalize_counts(probs_dec) return {state: int(prob * shots) for state, prob in probs_normal.items()} if all(isinstance(value, dict) for value in probabilities.values()): return [convert_to_counts(probs) for probs in probabilities.values()] return convert_to_counts(probabilities) def result(self) -> Result: """Return the result of the IonQ job.""" self.wait_for_final_state() job_data = self.session.get_job(self.id) success = job_data.get("status") == "completed" if not success: failure: dict = job_data.get("failure", {}) code = failure.get("code") message = failure.get("error") raise IonQJobError(f"Job failed with code {code}: {message}") results_url: str = job_data["results_url"] results_endpoint = results_url.split("v0.3")[-1] job_data["probabilities"] = self.session.get(results_endpoint).json() job_data["shots"] = job_data.get("shots", self._cache_metadata.get("shots")) measurement_counts = self._get_counts(job_data) data = GateModelResultData(measurement_counts=measurement_counts) return Result( device_id=job_data["target"], job_id=self.id, success=success, data=data, **job_data )