Source code for geobox.task

from urllib.parse import urljoin
from typing import Optional, Dict, List, Optional, Union, TYPE_CHECKING
import time
import logging

from .base import Base
from .enums import TaskStatus

if TYPE_CHECKING:
    from . import GeoboxClient
    from .vectorlayer import VectorLayer
    from .raster import Raster
    from .model3d import Model
    from .file import File
    from .tile3d import Tile3d
    from .table import Table


logger = logging.getLogger(__name__)


[docs] class Task(Base): BASE_ENDPOINT: str = 'tasks/'
[docs] def __init__(self, api: 'GeoboxClient', uuid: str, data: Optional[Dict] = {}): """ Constructs all the necessary attributes for the Task object. Args: api (GeoboxClient): The API instance. uuid (str): The UUID of the task. data (Dict, optional): The task data. """ super().__init__(api, uuid=uuid, data=data) self._data = data if isinstance(data, dict) else {}
[docs] def refresh_data(self) -> None: """ Updates the task data. """ self._data = self.get_task(self.api, self.uuid).data
@property def output_asset(self) -> Union['VectorLayer', 'Raster', 'Model', 'File', 'Tile3d', 'Table', None]: """ output asset property Returns: VectorLayer | Raster | Model | File | Tile3d | Table | None: if task type is publish, it returns the published layer Example: >>> from geobox import GeoboxClient >>> from geobox.task import Task >>> client = GeoboxClient() >>> task = Task.get_task(client, uuid="12345678-1234-5678-1234-567812345678") >>> task.output_asset """ if self.data.get('result', {}).get('layer_uuid'): return self.api.get_vector(uuid=self.data['result']['layer_uuid']) elif self.data.get('result', {}).get('raster_uuid'): return self.api.get_raster(uuid=self.data['result']['raster_uuid']) elif self.data.get('result', {}).get('model_uuid'): return self.api.get_model(uuid=self.data['result']['model_uuid']) elif self.data.get('result', {}).get('file_uuid'): return self.api.get_file(uuid=self.data['result']['file_uuid']) elif self.data.get('result', {}).get('3dtiles_uuid'): return self.api.get_3dtile(uuid=self.data['result']['3dtiles_uuid']) elif self.data.get('result', {}).get('table_uuid'): return self.api.get_table(uuid=self.data['result']['table_uuid']) else: return None @property def data(self) -> Dict: """ Returns the task data. Returns: Dict: the task data as a dictionary Example: >>> from geobox import GeoboxClient >>> from geobox.task import Task >>> client = GeoboxClient() >>> task = Task.get_task(client, uuid="12345678-1234-5678-1234-567812345678") >>> task.data """ return self._data @data.setter def data(self, value: Dict) -> None: """ Sets the task data. Example: >>> from geobox import GeoboxClient >>> from geobox.task import Task >>> client = GeoboxClient() >>> task = Task.get_task(client, uuid="12345678-1234-5678-1234-567812345678") >>> task.data = {'name': 'test'} """ self._data = value if isinstance(value, dict) else {} @property def status(self) -> 'TaskStatus': """ Returns the status of the task. (auto refresh) Returns: TaskStatus: the status of the task(SUCCESS, FAILURE, ABORTED, PENDING, PROGRESS) Example: >>> from geobox import GeoboxClient >>> from geobox.task import Task >>> client = GeoboxClient() >>> task = Task.get_task(client, uuid="12345678-1234-5678-1234-567812345678") >>> task.status """ self.refresh_data() return TaskStatus(self._data.get('state')) @property def errors(self) -> Union[Dict, None]: """ Get the task errors. Returns: Dict | None: if there are any errors Example: >>> from geobox import GeoboxClient >>> from geobox.task import Task >>> client = GeoboxClient() >>> task = Task.get_task(client, uuid="12345678-1234-5678-1234-567812345678") >>> task.errors """ result = self.data.get('result', {}) if result.get('errors') or result.get('detail', {}).get('msg'): return result else: return None @property def progress(self) -> Union[int, None]: """ Returns the progress of the task. Returns: int | None: the progress of the task in percentage or None if the task is not running Example: >>> from geobox import GeoboxClient >>> from geobox.task import Task >>> client = GeoboxClient() >>> task = Task.get_task(client, uuid="12345678-1234-5678-1234-567812345678") >>> task.progress """ endpoint = urljoin(self.endpoint, 'status/') response = self.api.get(endpoint) current = response.get('current') total = response.get('total') if not total or not current: return None return int((current / total) * 100) def _wait(self, timeout: Union[int, None] = None, interval: int = 1, progress_bar: bool = True) -> 'TaskStatus': start_time = time.time() last_progress = 0 pbar = self._create_progress_bar() if progress_bar else None try: while True: self._check_timeout(start_time, timeout) status = self.status if self._is_final_state(status): self._update_progress_bar(pbar, last_progress, status) return status if pbar: last_progress = self._update_progress_bar(pbar, last_progress) time.sleep(interval) finally: if pbar: pbar.close()
[docs] def wait(self, timeout: Union[int, None] = None, interval: int = 1, progress_bar: bool = True, retry: int = 3) -> 'TaskStatus': """ Wait for the task to finish. Args: timeout (int, optional): Maximum time to wait in seconds. interval (int, optional): Time between status checks in seconds. progress_bar (bool, optional): Whether to show a progress bar. default: True retry (int, optional): Number of times to retry if waiting for the task fails. default is 3 Returns: TaskStatus: the status of the task(SUCCESS, FAILURE, ABORTED, PENDING, PROGRESS) Raises: TimeoutError: If the task doesn't complete within timeout seconds. Example: >>> from geobox import GeoboxClient >>> from geobox.task import Task >>> client = GeoboxClient() >>> task = Task.get_task(client, uuid="12345678-1234-5678-1234-567812345678") >>> task.wait() # return the status of the task """ last_exception = None for attempt in range(1, retry + 1): try: return self._wait(timeout, interval, progress_bar) except Exception as e: last_exception = e logger.warning(f"[Retry {attempt}/{retry}] Task wait failed: {e}") time.sleep(interval) raise last_exception
[docs] def _create_progress_bar(self) -> 'tqdm': """Creates a progress bar for the task.""" try: from tqdm.auto import tqdm except ImportError: from .api import logger logger.warning("[tqdm] extra is required to show the progress bar. install with: pip insatll geobox[tqdm]") return None return tqdm(total=100, colour='green', desc=f"Task: {self.name}", unit="%", leave=True)
[docs] def _check_timeout(self, start_time: float, timeout: Union[int, None]) -> None: """Checks if the task has exceeded the timeout period.""" if timeout and time.time() - start_time > float(timeout): raise TimeoutError(f"Task {self.name} timed out after {timeout} seconds")
[docs] def _is_final_state(self, status: 'TaskStatus') -> bool: """Checks if the task has reached a final state.""" return status in [TaskStatus.FAILURE, TaskStatus.SUCCESS, TaskStatus.ABORTED]
[docs] def _update_progress_bar(self, pbar: Union['tqdm', None], last_progress: int, status: 'TaskStatus' = None) -> int: """ Updates the progress bar with current progress and returns the new last_progress. Args: pbar (tqdm | None): The progress bar to update last_progress (int): The last progress value status (TaskStatus, optional): The task status. If provided and SUCCESS, updates to 100% Returns: int: The new last_progress value """ if not pbar: return last_progress if status == TaskStatus.SUCCESS: pbar.update(100 - last_progress) return 100 current_progress = self.progress if current_progress is not None: progress_diff = current_progress - last_progress if progress_diff > 0: pbar.update(progress_diff) return current_progress return last_progress
[docs] def abort(self) -> None: """ Aborts the task. Returns: None Example: >>> from geobox import GeoboxClient >>> from geobox.task import Task >>> client = GeoboxClient() >>> task = Task.get_task(client, uuid="12345678-1234-5678-1234-567812345678") >>> task.abort() """ endpoint = urljoin(self.endpoint, 'abort/') self.api.post(endpoint)
[docs] @classmethod def get_tasks(cls, api: 'GeoboxClient', **kwargs) -> Union[List['Task'], int]: """ Get a list of tasks Args: api (GeoboxClient): The GeoboxClient instance for making requests. Keyword Args: state (TaskStatus): Available values : TaskStatus.PENDING, TaskStatus.PROGRESS, TaskStatus.SUCCESS, TaskStatus.FAILURE, TaskStatus.ABORTED q (str): query filter based on OGC CQL standard. e.g. "field1 LIKE '%GIS%' AND created_at > '2021-01-01'" search (str): search term for keyword-based searching among search_fields or all textual fields if search_fields does not have value. NOTE: if q param is defined this param will be ignored. search_fields (str): comma separated list of fields for searching. order_by (str): comma separated list of fields for sorting results [field1 A|D, field2 A|D, …]. e.g. name A, type D. NOTE: "A" denotes ascending order and "D" denotes descending order. return_count (bool): The count of the tasks. default is False. skip (int): The skip of the task. default is 0. limit (int): The limit of the task. default is 10. user_id (int): Specific user. privileges required. shared (bool): Whether to return shared tasks. default is False. Returns: List[Task] | int: The list of task objects or the count of the tasks if return_count is True. Example: >>> from geobox import GeoboxClient >>> from geobox.task import Task >>> client = GeoboxClient() >>> tasks = Task.get_tasks(client) or >>> tasks = client.get_tasks() """ params = { 'f': 'json', 'state': kwargs.get('state').value if kwargs.get('state') else None, 'q': kwargs.get('q'), 'search': kwargs.get('search'), 'search_fields': kwargs.get('search_fields'), 'order_by': kwargs.get('order_by'), 'return_count': kwargs.get('return_count', False), 'skip': kwargs.get('skip'), 'limit': kwargs.get('limit'), 'user_id': kwargs.get('user_id'), 'shared': kwargs.get('shared', False) } return super()._get_list(api, cls.BASE_ENDPOINT, params, factory_func=lambda api, item: Task(api, item['uuid'], item))
[docs] @classmethod def get_task(cls, api: 'GeoboxClient', uuid: str) -> 'Task': """ Gets a task. Args: api (GeoboxClient): The GeoboxClient instance for making requests. uuid (str): The UUID of the task. Returns: Task: The task object. Example: >>> from geobox import GeoboxClient >>> from geobox.task import Task >>> client = GeoboxClient() >>> task = Task.get_task(client, uuid="12345678-1234-5678-1234-567812345678") or >>> task = client.get_task(uuid="12345678-1234-5678-1234-567812345678") """ return super()._get_detail(api, cls.BASE_ENDPOINT, uuid, factory_func=lambda api, item: Task(api, item['uuid'], item))