# Copyright (C) 2024 qBraid
#
# This file is part of the qBraid-SDK
#
# The qBraid-SDK is free software released under the GNU General Public License v3
# or later. You can redistribute and/or modify it under the terms of the GPL v3.
# See the LICENSE file in the project root or <https://www.gnu.org/licenses/gpl-3.0.html>.
#
# THERE IS NO WARRANTY for the qBraid-SDK, as per Section 15 of the GPL v3.
"""
Module defining AzureQuantumJob class
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Optional, Union
from azure.quantum.target.microsoft import MicrosoftEstimatorResult
from azure.quantum.workspace import Workspace
from qbraid._logging import logger
from qbraid.runtime.azure.result_builder import AzureGateModelResultBuilder
from qbraid.runtime.enums import JobStatus
from qbraid.runtime.exceptions import JobStateError
from qbraid.runtime.job import QuantumJob
from qbraid.runtime.result import Result
from qbraid.runtime.result_data import GateModelResultData
from .io_format import OutputDataFormat
if TYPE_CHECKING:
import azure.quantum
[docs]
class AzureQuantumJob(QuantumJob):
"""Azure job class."""
[docs]
def __init__(self, job_id: str, workspace: Optional[Workspace] = None, **kwargs):
super().__init__(job_id=job_id, **kwargs)
self._workspace = workspace or Workspace()
self._job = self.workspace.get_job(self.id)
@property
def workspace(self) -> azure.quantum.Workspace:
"""Return the Azure Quantum Workspace."""
return self._workspace
def _details(self) -> dict[str, Any]:
"""Return the details of the Azure job and
update the metadata cache."""
self._job.refresh()
details = self._job.details.as_dict()
self._cache_metadata["details"] = details
return details
def status(self) -> JobStatus:
"""Return the current status of the Azure job.
Returns:
JobStatus: The current status of the job.
"""
details: dict = self._details()
status: str = details.get("status")
status_map = {
"Succeeded": JobStatus.COMPLETED,
"Waiting": JobStatus.QUEUED,
"Executing": JobStatus.RUNNING,
"Failed": JobStatus.FAILED,
"Cancelled": JobStatus.CANCELLED,
"Finishing": JobStatus.RUNNING,
}
return status_map.get(status, JobStatus.UNKNOWN)
@staticmethod
def _make_estimator_result(data: dict[str, Any]) -> MicrosoftEstimatorResult:
"""Create a MicrosoftEstimatorResult from the given data.
Args:
data (dict): The data to create the result from.
Returns:
MicrosoftEstimatorResult: The result created from the data.
Raises:
RuntimeError: If the job execution failed.
"""
if not data["success"]:
error_data = data["error_data"]
raise RuntimeError(
f"Cannot retrieve results as job execution failed "
f"({error_data['code']}: {error_data['message']})"
)
result_data = data["data"]
return MicrosoftEstimatorResult(result_data)
def result(self) -> Union[Result, MicrosoftEstimatorResult]:
"""Return the result of the Azure job.
Returns:
Union[Result, MicrosoftEstimatorResult]: The result of the job.
"""
if not self.is_terminal_state():
logger.info("Result will be available when job has reached final state.")
job: azure.quantum.Job = self._job
job.wait_until_completed()
success = job.details.status == "Succeeded"
details = job.details.as_dict()
if job.details.output_data_format == OutputDataFormat.RESOURCE_ESTIMATOR.value:
return self._make_estimator_result(
{
"job_id": job.id,
"target": job.details.target,
"job_name": job.details.name,
"success": success,
"data": job.get_results(),
"error_data": (
None if job.details.error_data is None else job.details.error_data.as_dict()
),
}
)
builder = AzureGateModelResultBuilder(job)
data = GateModelResultData(measurement_counts=builder.get_counts())
return Result(
device_id=job.details.target, job_id=job.id, success=success, data=data, **details
)
def cancel(self) -> None:
"""Cancel the Azure job."""
if self.is_terminal_state():
raise JobStateError("Cannot cancel; job in terminal state.")
self._job = self.workspace.cancel_job(self._job)