Source code for qbraid.providers.job

# Copyright (C) 2023 qBraid
#
# This file is part of the qBraid-SDK
#
# The qBraid-SDK is free software released under the GNU General Public License v3
# or later. You can redistribute and/or modify it under the terms of the GPL v3.
# See the LICENSE file in the project root or <https://www.gnu.org/licenses/gpl-3.0.html>.
#
# THERE IS NO WARRANTY for the qBraid-SDK, as per Section 15 of the GPL v3.

"""
Module defining abstract QuantumJob Class

"""
import logging
from abc import ABC, abstractmethod
from enum import Enum
from time import sleep, time
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union

from qbraid.api import QbraidSession

from .enums import JOB_FINAL, JobStatus
from .exceptions import JobError
from .status_maps import STATUS_MAP

if TYPE_CHECKING:
    import qbraid


[docs] class QuantumJob(ABC): """Abstract interface for job-like classes."""
[docs] def __init__( # pylint: disable=too-many-arguments self, job_id: str, vendor_job_id: Optional[str] = None, device: "Optional[qbraid.providers.QuantumDevice]" = None, vendor_job_obj: Optional[Any] = None, status: Optional[Union[str, JobStatus]] = None, ): self._vendor = None self._cache_metadata = None self._cache_status = self._map_status(status) self._job_id = job_id self._vendor_job_id = vendor_job_id self._device = device self._job = vendor_job_obj or self._get_job() self._status_map = STATUS_MAP[self.vendor]
@property def id(self) -> str: # pylint: disable=invalid-name """Return a unique id identifying the job.""" return self._job_id @property def vendor(self) -> str: """Get job vendor.""" if self._vendor is not None: return self._vendor try: vendor = self._cache_metadata["vendor"] except (KeyError, TypeError): vendor = self.device.vendor self._vendor = vendor.upper() return self._vendor @property def vendor_job_id(self) -> str: """Returns the ID assigned by the device vendor""" if self._vendor_job_id is None: self._cache_metadata = self._post_job_data() self._vendor_job_id = self._cache_metadata["vendorJobId"] return self._vendor_job_id @property def device(self) -> "qbraid.providers.QuantumDevice": """Returns the qbraid QuantumDevice object associated with this job.""" if self._device is None: self._fetch_and_set_device() return self._device def _fetch_and_set_device(self) -> None: """Fetches device id from the server and sets the device object.""" session = QbraidSession() job_lst = session.post( "/get-user-jobs", json={"qbraidJobId": self.id, "numResults": 1} ).json() if len(job_lst) == 0: job_lst = session.post("/get-user-jobs", json={"_id": self.id, "numResults": 1}).json() if len(job_lst) == 0: raise JobError(f"Could not find device associated with job {self.id}.") job_data = job_lst[0] try: import qbraid # pylint: disable=import-outside-toplevel vendor_device_id = job_data["vendorDeviceId"] self._device = qbraid.device_wrapper(vendor_device_id) except Exception as err: # pylint: disable=broad-except raise JobError(f"Could not find device associated with job {self.id}.") from err @staticmethod def _map_status(status: Optional[Union[str, JobStatus]] = None) -> JobStatus: """Returns `JobStatus` object mapped from raw status value. If no value provided or conversion from string fails, returns `JobStatus.UNKNOWN`.""" if status is None: return JobStatus.UNKNOWN if isinstance(status, Enum): return status if isinstance(status, str): for e in JobStatus: status_enum = JobStatus(e.value) if status == status_enum.name or status == str(status_enum): return status_enum raise ValueError(f"Status value '{status}' not recognized.") raise ValueError(f"Invalid status value type: {type(status)}") @staticmethod def status_final(status: Union[str, JobStatus]) -> bool: """Returns True if job is in final state. False otherwise.""" if isinstance(status, str): if status in JOB_FINAL: return True for job_status in JOB_FINAL: if job_status.name == status: return True return False raise TypeError( f"Expected status of type 'str' or 'JobStatus' \ but instead got status of type {type(status)}." ) def _status(self) -> Tuple[JobStatus, str]: vendor_status = self._get_status() try: return self._status_map[vendor_status], vendor_status except KeyError: logging.warning( "Expected %s job status matching one of %s but instead got '%s'.", self._device.vendor, str(list(self._status_map.keys())), vendor_status, ) return JobStatus.UNKNOWN, vendor_status def status(self) -> JobStatus: """Return the status of the job / task , among the values of ``JobStatus``.""" qbraid_status, vendor_status = self._status() if qbraid_status != self._cache_status: self._post_job_data( update={"status": vendor_status, "qbraidStatus": qbraid_status.name} ) return qbraid_status def _post_job_data(self, update: Optional[dict] = None) -> dict: """Retreives job metadata and optionally updates document. Args: update: Dictionary containing fields to update in job document. Returns: The metadata associated with this job """ session = QbraidSession() body = {"_id": self.id, "qbraidJobId": self.id} # Two status variables so we can track both qBraid and vendor status. if update is not None and "status" in update and "qbraidStatus" in update: body["status"] = update["status"] body["qbraidStatus"] = update["qbraidStatus"] metadata = session.put("/update-job", data=body).json() if "qbraidJobId" not in metadata: metadata["qbraidJobId"] = metadata.get("_id") return metadata def metadata(self) -> Dict[str, Any]: """Return the metadata regarding the job.""" qbraid_status, vendor_status = self._status() if not self._cache_metadata or qbraid_status != self._cache_status: update = {"status": vendor_status, "qbraidStatus": qbraid_status.name} self._cache_metadata = self._post_job_data(update=update) self._cache_status = self._map_status(self._cache_metadata["qbraidStatus"]) return self._cache_metadata def wait_for_final_state(self, timeout=None, poll_interval=5) -> None: """Poll the job status until it progresses to a final state. Args: timeout: Seconds to wait for the job. If ``None``, wait indefinitely. poll_interval: Seconds between queries. Raises: JobError: If the job does not reach a final state before the specified timeout. """ start_time = time() status = self.status() while not self.status_final(status): elapsed_time = time() - start_time if timeout is not None and elapsed_time >= timeout: raise JobError(f"Timeout while waiting for job {self.id}.") sleep(poll_interval) status = self.status() @abstractmethod def _get_job(self): """Return the job like object that is being wrapped.""" @abstractmethod def _get_status(self) -> str: """Returns job status casted as string.""" @abstractmethod def result(self) -> "qbraid.providers.ResultWrapper": """Return the results of the job.""" @abstractmethod def cancel(self) -> None: """Attempt to cancel the job.""" def __repr__(self) -> str: """String representation of a QuantumJob object.""" return f"<{self.__class__.__name__}(id:'{self.id}')>"