from urllib.parse import urljoin
from typing import Optional, Dict, List, Union, TYPE_CHECKING
import os
import mimetypes
from geobox.exception import ValidationError
import aiohttp
import sys
from .base import AsyncBase
from .task import AsyncTask
from .feature import AsyncFeature
from ..enums import FileFormat, PublishFileType, InputGeomType, FileType
from ..utils import clean_data, get_save_path, get_unique_filename
from ..exception import ApiRequestError
if TYPE_CHECKING:
from . import AsyncGeoboxClient
from .user import AsyncUser
[docs]
class AsyncFile(AsyncBase):
BASE_ENDPOINT: str = 'files/'
[docs]
def __init__(self,
api: 'AsyncGeoboxClient',
uuid: str,
data: Optional[Dict] = {}):
"""
Constructs all the necessary attributes for the File object.
Args:
api (AsyncGeoboxClient): The AsyncGeoboxClient instance.
uuid (str): The UUID of the file.
data (Dict, optional): The data of the file.
"""
super().__init__(api, uuid=uuid, data=data)
[docs]
def __repr__(self) -> str:
"""
Return a string representation of the File object.
Returns:
str: A string representation of the File object.
"""
return f"AsyncFile(uuid={self.uuid}, file_name={self.name}, file_type={self.file_type.value})"
@property
def layers(self) -> List[Dict]:
"""
Get the layers of the file.
Returns:
List[Dict]: The layers of the file.
Example:
>>> from geobox import GeoboxClient
>>> client = GeoboxClient()
>>> file = File.get_file(client, uuid="12345678-1234-5678-1234-567812345678")
>>> file.layers
"""
return self.data.get('layers', {}).get('layers', [])
@property
def file_type(self) -> 'FileType':
"""
Get the file type
Returns:
FileType: the file type enumeration
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> file = await File.get_file(client, uuid="12345678-1234-5678-1234-567812345678")
>>> file.file_type
"""
return FileType(self.data.get('file_type'))
[docs]
@classmethod
async def upload_file(cls, api: 'AsyncGeoboxClient', path: str, user_id: int = None, scan_archive: bool = True) -> 'AsyncFile':
"""
[async] Upload a file to the GeoBox API.
Args:
api (AsyncGeoboxClient): The AsyncGeoboxClient instance.
path (str): The path to the file to upload.
user_id (int, optional): specific user. privileges required.
scan_archive (bool, optional): Whether to scan the archive for layers. default: True
Returns:
AsyncFile: The uploaded file instance.
Raises:
ValueError: If the file type is invalid.
FileNotFoundError: If the file does not exist.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> from geobox.aio.file import AsyncFile
>>> async with AsyncGeoboxClient() as client:
>>> file = await AsyncFile.upload_file(client, path='path/to/file.shp')
or
>>> file = await client.upload_file(path='path/to/file.shp')
"""
if not os.path.exists(path):
raise FileNotFoundError(f"File not found: {path}")
FileFormat(os.path.splitext(path)[1])
endpoint = cls.BASE_ENDPOINT
with open(path, 'rb') as f:
f = open(path, "rb")
files = {'file': f}
file_data = await api.post(endpoint, files=files, is_json=False)
return cls(api, file_data['uuid'], file_data)
[docs]
@classmethod
async def get_files(cls, api:'AsyncGeoboxClient', **kwargs) -> Union[List['AsyncFile'], int]:
"""
[async] Retrieves a list of files.
Args:
api (AsyncGeoboxClient): The AsyncGeoboxClient instance for making requests.
Keyword Args:
q (str): query filter based on OGC CQL standard. e.g. "field1 LIKE '%GIS%' AND created_at > '2021-01-01'"
search (str): search term for keyword-based searching among search_fields or all textual fields if search_fields does not have value. NOTE: if q param is defined this param will be ignored.
search_fields (str): comma separated list of fields for searching
order_by (str): comma separated list of fields for sorting results [field1 A|D, field2 A|D, …]. e.g. name A, type D.NOTE: "A" denotes ascending order and "D" denotes descending order.
return_count (bool): if true, the total number of results will be returned. default is False.
skip (int): number of results to skip. default is 0.
limit (int): number of results to return. default is 10.
user_id (int): filter by user id.
shared (bool): Whether to return shared files. default is False.
Returns:
List[AsyncFile] | int: A list of File objects or the total number of results.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> from geobox.aio.file import AsyncFile
>>> async with AsyncGeoboxClient() as client:
>>> files = await AsyncFile.get_files(client, search_fields='name', search='GIS', order_by='name', skip=10, limit=10)
or
>>> files = await client.get_files(search_fields='name', search='GIS', order_by='name', skip=10, limit=10)
"""
params = {
'f': 'json',
'q': kwargs.get('q', None),
'search': kwargs.get('search', None),
'search_fields': kwargs.get('search_fields', None),
'order_by': kwargs.get('order_by', None),
'return_count': kwargs.get('return_count', False),
'skip': kwargs.get('skip', 0),
'limit': kwargs.get('limit', 10),
'user_id': kwargs.get('user_id', None),
'shared': kwargs.get('shared', False)
}
return await super()._get_list(api, cls.BASE_ENDPOINT, params, factory_func=lambda api, item: cls(api, item['uuid'], item))
[docs]
@classmethod
async def get_file(cls, api: 'AsyncGeoboxClient', uuid: str, user_id: int = None) -> 'AsyncFile':
"""
[async] Retrieves a file by its UUID.
Args:
api (AsyncGeoboxClient): The AsyncGeoboxClient instance.
uuid (str): The UUID of the file.
user_id (int, optional): specific user. privileges required.
Returns:
AsyncFile: The retrieved file instance.
Raises:
NotFoundError: If the file with the specified UUID is not found.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> from geobox.aio.file import AsyncFile
>>> async with AsyncGeoboxClient() as client:
>>> file = await AsyncFile.get_file(client, uuid="12345678-1234-5678-1234-567812345678")
or
>>> file = await client.get_file(uuid="12345678-1234-5678-1234-567812345678")
"""
params = {
'f': 'json',
'user_id': user_id
}
return await super()._get_detail(api, cls.BASE_ENDPOINT, f'{uuid}/info', params, factory_func=lambda api, item: AsyncFile(api, item['uuid'], item))
[docs]
@classmethod
async def get_files_by_name(cls, api: 'AsyncGeoboxClient', name: str, user_id: int = None) -> List['AsyncFile']:
"""
[async] Get files by name
Args:
api (AsyncGeoboxClient): The AsyncGeoboxClient instance for making requests.
name (str): the name of the file to get
user_id (int, optional): specific user. privileges required.
Returns:
List[AsyncFile]: returns files that matches the given name
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> from geobox.aio.file import AsyncFile
>>> async with AsyncGeoboxClient() as client:
>>> files = await AsyncFile.get_files_by_name(client, name='test')
or
>>> files = await client.get_files_by_name(name='test')
"""
return await cls.get_files(api, q=f"name = '{name}'", user_id=user_id)
[docs]
def _get_file_name(self, response: aiohttp.ClientResponse) -> str:
"""
Get the file name from the response.
Args:
response (aiohttp.ClientResponse): The response of the request.
Returns:
str: The file name
"""
if 'Content-Disposition' in response.headers and 'filename=' in response.headers['Content-Disposition']:
file_name = response.headers['Content-Disposition'].split('filename=')[-1].strip().strip('"')
else:
content_type = response.headers.get("Content-Type", "")
file_name = f'{self.name}.{mimetypes.guess_extension(content_type.split(";")[0])}'
return file_name
[docs]
def _create_progress_bar(self) -> 'tqdm':
"""Creates a progress bar for the task."""
try:
from tqdm.auto import tqdm
except ImportError:
from .api import logger
logger.warning("[tqdm] extra is required to show the progress bar. install with: pip install geobox[tqdm]")
return None
return tqdm(unit="B",
total=int(self.size),
file=sys.stdout,
dynamic_ncols=True,
desc="Downloading",
unit_scale=True,
unit_divisor=1024,
ascii=True
)
[docs]
async def download(self, save_path: str = None, progress_bar: bool = True, file_name: str = None, overwrite: bool = False) -> str:
"""
[async] Download a file and save it to the specified path.
Args:
save_path (str, optional): Path where the file should be saved.
If not provided, it saves to the current working directory
using the original filename and appropriate extension.
progress_bar (bool, optional): Whether to show a progress bar. default: True
file_name (str, optional): the downloaded file name.
overwrite (bool, optional): whether to overwrite the downloaded file if it exists on the save path. default is False.
Returns:
str: Path where the file was saved
Raises:
ValueError: If uuid is not set
OSError: If there are issues with file operations
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> from geobox.aio.file import AsyncFile
>>> async with AsyncGeoboxClient() as client:
>>> file = await AsyncFile.get_file(client, uuid="12345678-1234-5678-1234-567812345678")
>>> await file.download(save_path='path/to/save/')
"""
if not self.uuid:
raise ValueError("File UUID is required to download the file")
save_path = get_save_path(save_path)
os.makedirs(save_path, exist_ok=True)
# Use the aiohttp session directly for streaming
endpoint = urljoin(self.api.base_url, f"{self.endpoint}download/")
async with self.api.session.session.get(endpoint) as response:
if response.status != 200:
raise ApiRequestError(f"Failed to download file: {response.status}")
file_name = self._get_file_name(response)
full_path = os.path.join(save_path, file_name)
if os.path.exists(full_path) and not overwrite:
full_path = get_unique_filename(save_path, file_name)
with open(full_path, 'wb') as f:
pbar = self._create_progress_bar() if progress_bar else None
# Use aiohttp's streaming method
async for chunk in response.content.iter_chunked(8192):
f.write(chunk)
if pbar:
pbar.update(len(chunk))
pbar.refresh()
if pbar:
pbar.close()
return os.path.abspath(full_path)
[docs]
async def delete(self) -> None:
"""
[async] Deletes the file.
Returns:
None
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> from geobox.aio.file import AsyncFile
>>> async with AsyncGeoboxClient() as client:
>>> file = await AsyncFile.get_file(client, uuid="12345678-1234-5678-1234-567812345678")
>>> await file.delete()
"""
return await super()._delete(self.endpoint)
[docs]
async def publish(self,
name: str,
publish_as: 'PublishFileType' = None,
input_geom_type: 'InputGeomType' = None,
input_layer: str = None,
input_dataset: str = None,
user_id: int = None,
input_srid: int = AsyncFeature.BASE_SRID,
file_encoding: str = "UTF-8",
replace_domain_codes_by_values: bool = False,
report_errors: bool = True,
as_terrain: bool = False) -> 'AsyncTask':
"""
[async] Publishes a file as a layer.
Args:
name (str): The name of the layer.
publish_as (PublishFileType, optional): The type of layer to publish as.
input_geom_type (InputGeomType, optional): The geometry type of the layer.
input_layer (str, optional): The name of the input layer.
input_dataset (str, optional): The name of the input dataset.
user_id (int, optional): Specific user. privileges required.
input_srid (int, optional): The SRID of the layer. default is: 3857
file_encoding (str, optional): The encoding of the file. default is "utf-8".
replace_domain_codes_by_values (bool, optional): Whether to replace domain codes by values. default is False.
report_errors (bool, optional): Whether to report errors. default is True.
as_terrain (bool, optional): Whether to publish as terrain. default is False.
Returns:
Task: The task object.
Raises:
ValueError: If the publish_as is not a valid PublishFileType.
ValidationError: if the zipped file doesn't have any layers to publish.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> from geobox.aio.file import AsyncFile
>>> async with AsyncGeoboxClient() as client:
>>> file = await AsyncFile.get_file(client, uuid="12345678-1234-5678-1234-567812345678")
>>> await file.publish(publish_as=PublishFileType.VECTOR,
... layer_name='my_layer',
... input_geom_type=InputGeomType.POINT,
... input_layer='layer1',
... input_dataset='dataset1',
... input_srid=4326,
... file_encoding='UTF-8')
"""
if not publish_as:
# checks the file format or file first layer format to dynamically set the publish_as
if self.file_type.value in ['GeoJSON', 'GPKG', 'DXF', 'GPX', 'Shapefile', 'KML', 'CSV', 'FileGDB'] or \
(self.file_type.value in ['Complex'] and self.layers and \
FileType(self.layers[0]['format']).value in ['GeoJSON', 'GPKG', 'DXF', 'GPX', 'Shapefile', 'KML', 'CSV', 'FileGDB']):
publish_as = PublishFileType.VECTOR
elif self.file_type.value in ['GeoTIFF'] or \
(self.file_type.value in ['Complex'] and self.layers and \
FileType(self.layers[0]['format']).value in ['GeoTIFF']):
publish_as = PublishFileType.RASTER
elif self.file_type.value in ['GLB'] or \
(self.file_type.value in ['Complex'] and self.layers and \
FileType(self.layers[0]['format']).value in ['GLB']):
publish_as = PublishFileType.MODEL3D
elif self.file_type.value in ['ThreedTiles']:
publish_as = PublishFileType.Tiles3D
else:
raise ValidationError('Unknown format')
data = clean_data({
"publish_as": publish_as.value if isinstance(publish_as, PublishFileType) else publish_as,
"layer_name": name,
"input_layer": self.layers[0]['layer'] if not input_layer and self.layers else input_layer,
"input_geom_type": input_geom_type.value if isinstance(input_geom_type, InputGeomType) else input_geom_type,
"replace_domain_codes_by_values": replace_domain_codes_by_values,
"input_dataset": self.layers[0]['dataset'] if not input_layer and self.layers else input_dataset,
"user_id": user_id,
"input_srid": input_srid,
"file_encoding": file_encoding,
"report_errors": report_errors,
"as_terrain": as_terrain
})
endpoint = urljoin(self.endpoint, 'publish/')
response = await self.api.post(endpoint, data, is_json=False)
task = await AsyncTask.get_task(self.api, response.get('task_id'))
return task
[docs]
async def share(self, users: List['AsyncUser']) -> None:
"""
[async] Shares the file with specified users.
Args:
users (List[User]): The list of users objects to share the file with.
Returns:
None
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> from geobox.aio.file import AsyncFile
>>> async with AsyncGeoboxClient() as client:
>>> file = await AsyncFile.get_file(client, uuid="12345678-1234-5678-1234-567812345678")
>>> users = await client.search_users(search='John')
>>> await file.share(users=users)
"""
await super()._share(self.endpoint, users)
[docs]
async def unshare(self, users: List['AsyncUser']) -> None:
"""
[async] Unshares the file with specified users.
Args:
users (List[AsyncUser]): The list of users objects to unshare the file with.
Returns:
None
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> from geobox.aio.file import AsyncFile
>>> async with AsyncGeoboxClient() as client:
>>> file = await AsyncFile.get_file(client, uuid="12345678-1234-5678-1234-567812345678")
>>> await users = client.search_users(search='John')
>>> await file.unshare(users=users)
"""
await super()._unshare(self.endpoint, users)
[docs]
async def get_shared_users(self, search: str = None, skip: int = 0, limit: int = 10) -> List['AsyncUser']:
"""
[async] Retrieves the list of users the file is shared with.
Args:
search (str, optional): The search query.
skip (int, optional): The number of users to skip.
limit (int, optional): The maximum number of users to retrieve.
Returns:
List[AsyncUser]: The list of shared users.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> from geobox.aio.file import AsyncFile
>>> async with AsyncGeoboxClient() as client:
>>> file = await AsyncFile.get_file(client, uuid="12345678-1234-5678-1234-567812345678")
>>> await file.get_shared_users(search='John', skip=0, limit=10)
"""
params = {
'search': search,
'skip': skip,
'limit': limit
}
return await super()._get_shared_users(self.endpoint, params)