Source code for geobox.aio.vector_tool
from typing import Dict, Optional, TYPE_CHECKING, Union
from geobox.exception import NotFoundError
from .feature import AsyncFeature
from .base import AsyncBase
from ..enums import GroupByAggFunction, NetworkTraceDirection, SpatialAggFunction, SpatialPredicate
if TYPE_CHECKING:
from . import AsyncGeoboxClient
from .task import AsyncTask
from .query import AsyncQuery
[docs]
class AsyncVectorTool(AsyncBase):
BASE_ENDPOINT = 'queries/'
[docs]
def __init__(self, api: 'AsyncGeoboxClient'):
"""
Initialize an async VectorTool instance.
Args:
api (AsyncGeoboxClient): The AsyncGeoboxClient instance for making requests.
"""
super().__init__(api)
def __repr__(self) -> str:
return f"AsyncVectorTool()"
[docs]
async def _add_params_to_query(self, query_name: str, inputs: dict) -> 'AsyncQuery':
"""add user input params to the query"""
queries = await self.api.get_system_queries(q=f"name = '{query_name}'")
try:
query = next(query for query in queries if query.name == query_name)
except StopIteration:
raise NotFoundError("Vector Tool not found!")
for param in query.params:
if param['name'] in inputs.keys():
query.params[query.params.index(param)]['value'] = inputs[param['name']]
return query
[docs]
async def _run_query(self, query: 'AsyncQuery', output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""execute or save as layer"""
if not output_layer_name:
response = await query.execute()
return response
else:
task = await query.save_as_layer(layer_name=output_layer_name)
return task
[docs]
async def area(self,
vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Computes and adds a new column for the area of each polygon in the layer, aiding in spatial measurements
Args:
vector_uuid (str): UUID of the vector layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.area(vector_uuid=vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.area(vector_uuid=vector.uuid, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid
}
query = await self._add_params_to_query(query_name='$area', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def as_geojson(self,
vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Converts geometries to GeoJSON format, adding a column with GeoJSON strings for each geometry in the layer
Args:
vector_uuid (str): UUID of the vector layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.as_geojson(vector_uuid=vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.as_geojson(vector_uuid=vector.uuid, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid
}
query = await self._add_params_to_query(query_name='$as_geojson', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def as_wkt(self,
vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Converts geometries into WKT (Well-Known Text) format, storing it as a new column in the layer
Args:
vector_uuid (str): UUID of the vector layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.as_wkt(vector_uuid=vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.as_wkt(vector_uuid=vector.uuid, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid
}
query = await self._add_params_to_query(query_name='$as_wkt', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def buffer(self,
vector_uuid: str,
distance: float,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Generates a buffer zone around each geometry in the layer, expanding each shape by a specified distance
Args:
vector_uuid (str): UUID of the vector layer
distance (float): Buffer distance
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.buffer(vector_uuid=vector.uuid, distance=1000)
>>> # save as layer
>>> task = await client.vector_tool.buffer(vector_uuid=vector.uuid, distance=1000, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'distance': distance
}
query = await self._add_params_to_query(query_name='$buffer', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def centroid(self,
vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Calculates the centroid point of each geometry, which represents the geometric center of each shape
Args:
vector_uuid (str): UUID of the vector layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.centroid(vector_uuid=vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.centroid(vector_uuid=vector.uuid, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid
}
query = await self._add_params_to_query(query_name='$centroid', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def clip(self,
vector_uuid: str,
clip_vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Clips geometries and retains only the parts of the geometries that fall within the specified boundaries
Args:
vector_uuid (str): UUID of the vector layer
clip_vector_uuid (str): UUID of the clip vector layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> clip_vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.clip(vector_uuid=vector.uuid, clip_vector_uuid=clip_vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.clip(vector_uuid=vector.uuid, clip_vector_uuid=clip_vector.uuid, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'clip': clip_vector_uuid
}
query = await self._add_params_to_query(query_name='$clip', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def concave_hull(self,
vector_uuid: str,
tolerance: float,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Creates a concave hull (a polygon that closely wraps around all geometries) for the layer with a specified tolerance
Args:
vector_uuid (str): UUID of the vector layer
tolerance (float): Tolerance parameter for concave hull
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.concave_hull(vector_uuid=vector.uuid, tolerance=10)
>>> # save as layer
>>> task = await client.vector_tool.concave_hull(vector_uuid=vector.uuid, tolerance=10, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'tolerance': tolerance
}
query = await self._add_params_to_query(query_name='$concave_hull', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def convex_hull(self,
vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Calculates a convex hull for all geometries in a layer, creating a polygon that minimally contains all points or shapes
Args:
vector_uuid (str): UUID of the vector layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.convex_hull(vector_uuid=vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.convex_hull(vector_uuid=vector.uuid, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid
}
query = await self._add_params_to_query(query_name='$convex_hull', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def feature_count(self, vector_uuid: str) -> Dict:
"""
[async] Counts the total number of rows in the specified layer, which is useful for data volume estimation
Args:
vector_uuid (str): UUID of the vector layer
Returns:
Dict: the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.feature_count(vector_uuid=vector.uuid)
"""
inputs = {
'layer': vector_uuid
}
query = await self._add_params_to_query(query_name='$count', inputs=inputs)
return await self._run_query(query=query)
[docs]
async def count_point_in_polygons(self,
polygon_vector_uuid: str,
point_vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Counts the number of points within each polygon, giving a density measure of points per polygon
Args:
polygon_vector_uuid (str): UUID of the polygon layer
point_vector_uuid (str): UUID of the point layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> polygon_vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> point_vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.count_point_in_polygons(
... polygon_vector_uuid=polygon_vector.uuid,
... point_vector_uuid=point_vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.count_point_in_polygons(
... polygon_vector_uuid=polygon_vector.uuid,
... point_vector_uuid=point_vector.uuid,
... output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'polygon_layer': polygon_vector_uuid,
'point_layer': point_vector_uuid
}
query = await self._add_params_to_query(query_name='$count_point_in_polygons', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def delaunay_triangulation(self,
vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Generates Delaunay triangles from points, creating a tessellated network of polygons from input points
Args:
vector_uuid (str): UUID of the vector layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.delaunay_triangulation(vector_uuid=vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.delaunay_triangulation(vector_uuid=vector.uuid, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid
}
query = await self._add_params_to_query(query_name='$delaunay_triangulation', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def disjoint(self,
vector_uuid: str,
filter_vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Filters geometries that do not intersect with another layer, creating a subset with only disjoint geometries
Args:
vector_uuid (str): UUID of the vector layer
filter_vector_uuid (str): UUID of the filter layer
output_layer_name (str): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> filter_vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.disjoint(vector_uuid=vector.uuid, filter_vector_uuid=filter_vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.disjoint(vector_uuid=vector.uuid, filter_vector_uuid=filter_vector.uuid, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'filter_layer': filter_vector_uuid
}
query = await self._add_params_to_query(query_name='$disjoint', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def dissolve(self,
vector_uuid: str,
dissolve_field_name: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Combines geometries based on a specified attribute, grouping them into single shapes for each unique attribute value
Args:
vector_uuid (str): UUID of the vector layer
dissolve_field_name (str): Field to dissolve by
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.dissolve(vector_uuid=vector.uuid, dissolve_field_name="field_name")
>>> # save as layer
>>> task = await client.vector_tool.dissolve(vector_uuid=vector.uuid, dissolve_field_name="field_name", output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'dissolve_field': dissolve_field_name
}
query = await self._add_params_to_query(query_name='$dissolve', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def distance_to_nearest(self,
vector_uuid: str,
nearest_vector_uuid: str,
search_radius: float,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Calculates the minimum distance between geometries in one layer and their nearest neighbors in another
Args:
vector_uuid (str): UUID of the vector layer
nearest_vector_uuid (str): UUID of the nearest layer
search_radius (float): Search radius for nearest features
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> nearest_vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.distance_to_nearest(
... vector_uuid=vector.uuid,
... nearest_vector_uuid=nearest_vector.uuid,
... search_radius=10)
>>> # save as layer
>>> task = await client.vector_tool.distance_to_nearest(
... vector_uuid=vector.uuid,
... nearest_vector_uuid=nearest_vector.uuid,
... search_radius=10,
... output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'nearest_layer': nearest_vector_uuid,
'search_radius': search_radius
}
query = await self._add_params_to_query(query_name='$distance_to_nearest', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def distinct(self,
vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Selects only unique rows from the input layer, removing duplicate entries across columns
Args:
vector_uuid (str): UUID of the vector layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.distinct(vector_uuid=vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.distinct(vector_uuid=vector.uuid, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid
}
query = await self._add_params_to_query(query_name='$distinct', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def dump(self,
vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Splits multi-part geometries into single-part geometries, producing individual shapes from complex collections
Args:
vector_uuid (str): UUID of the vector layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.dump(vector_uuid=vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.dump(vector_uuid=vector.uuid, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid
}
query = await self._add_params_to_query(query_name='$dump', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def erase(self,
vector_uuid: str,
erase_vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Removes portions of geometries that intersect with a specified erase layer, leaving only the non-overlapping parts
Args:
vector_uuid (str): UUID of the vector layer
erase_vector_uuid (str): UUID of the erase layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> erase_vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.erase(vector_uuid=vector.uuid, erase_vector_uuid=erase_vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.erase(vector_uuid=vector.uuid, erase_vector_uuid=erase_vector.uuid, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'erase': erase_vector_uuid
}
query = await self._add_params_to_query(query_name='$erase', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def feature_bbox(self,
vector_uuid: str,
srid: int,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Adds a bounding box column to each feature, showing the min/max x and y coordinates as a text representation
Args:
vector_uuid (str): UUID of the vector layer
srid (int): SRID for the output bounding boxes
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.feature_bbox(vector_uuid=vector.uuid, srid=4326)
>>> # save as layer
>>> task = await client.vector_tool.feature_bbox(vector_uuid=vector.uuid, srid=4326, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'srid': srid
}
query = await self._add_params_to_query(query_name='$feature_bbox', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def feature_extent(self,
vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Produces bounding boxes for each feature in the layer, representing the spatial extent of each geometry as a polygon
Args:
vector_uuid (str): UUID of the vector layer
output_layer_name (str): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.feature_extent(vector_uuid=vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.feature_extent(vector_uuid=vector.uuid, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid
}
query = await self._add_params_to_query(query_name='$feature_extent', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def find_and_replace(self,
vector_uuid: str,
find: str,
replace: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Finds and replaces specified text in a column, updating values based on input patterns
Args:
vector_uuid (str): UUID of the vector layer
find (str): Text to find
replace (str): Text to replace with
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.find_and_replace(vector_uuid=vector.uuid, find="find example", replace="replce example")
>>> # save as layer
>>> task = await client.vector_tool.find_and_replace(vector_uuid=vector.uuid, find="find example", replace="replce example", output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'find': find,
'replace': replace
}
query = await self._add_params_to_query(query_name='$find_replace', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def from_geojson(self,
vector_uuid: str,
json_field_name: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Converts GeoJSON strings in a specified column to geometries, adding these as a new column in the layer
Args:
vector_uuid (str): UUID of the vector layer
json_field_name (str): Name of the column containing GeoJSON strings
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.from_geojson(vector_uuid=vector.uuid, json_field_name="json_field")
>>> # save as layer
>>> task = await client.vector_tool.from_geojson(vector_uuid=vector.uuid, json_field_name="json_field", output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'json_field': json_field_name
}
query = await self._add_params_to_query(query_name='$from_geojson', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def from_wkt(self,
vector_uuid: str,
wkt_column_name: str,
srid: int,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Converts WKT strings in a specified column to geometries, adding them as a new column in the layer
Args:
vector_uuid (str): UUID of the vector layer
wkt_column_name (str): Name of the column containing WKT strings
srid (int): SRID for the output geometries
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.from_wkt(vector_uuid=vector.uuid, wkt_column_name="wkt_field")
>>> # save as layer
>>> task = await client.vector_tool.from_wkt(vector_uuid=vector.uuid, wkt_column_name="wkt_field", output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'wkt_column': wkt_column_name,
'srid': srid
}
query = await self._add_params_to_query(query_name='$from_wkt', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def generate_points(self,
vector_uuid: str,
number_of_points: int,
seed: int,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Generates random points within each polygon, creating a specified number of points for each geometry
Args:
vector_uuid (str): UUID of the vector layer
number_of_points (int): Number of points to generate per polygon
seed (int): Random seed for reproducible results
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.generate_points(vector_uuid=vector.uuid, number_of_points=10, seed=10)
>>> # save as layer
>>> task = await client.vector_tool.generate_points(vector_uuid=vector.uuid, number_of_points=10, seed=10, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'number_of_points': number_of_points,
'seed': seed
}
query = await self._add_params_to_query(query_name='$generate_points', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def group_by(self,
vector_uuid: str,
group_column_name: str,
agg_column_name: str,
agg_function: 'GroupByAggFunction',
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Groups the layer by a specified column, applying aggregation functions like count, sum, min, max, and average
Args:
vector_uuid (str): UUID of the vector layer
group_column_name (str): Column to group by
agg_column_name (str): Column to aggregate
agg_function (GroupByAggFunction): Aggregation function: count, sum, min, max, avg
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> from geobox.vector_tool import GroupByAggFunction
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.group_by(
... vector_uuid=vector.uuid,
... group_column_name="field_1",
... agg_column_name="field_2",
... agg_function=GroupByAggFunction.SUM)
>>> # save as layer
>>> task = await client.vector_tool.group_by(
... vector_uuid=vector.uuid,
... group_column_name="field_1",
... agg_column_name="field_2",
... agg_function=GroupByAggFunction.SUM,
... output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'group_column': group_column_name,
'agg_column': agg_column_name,
'agg_function': agg_function.value
}
query = await self._add_params_to_query(query_name='$group_by', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def hexagon_grid(self,
vector_uuid: str,
cell_size: float,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Creates a grid of hexagons over the layer extent, counting the number of features intersecting each hexagon
Args:
vector_uuid (str): UUID of the vector layer
cell_size (float): Size of hexagon cells
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.hexagon_grid(
... vector_uuid=vector.uuid,
... cell_size=10)
>>> # save as layer
>>> task = await client.vector_tool.hexagon_grid(
... vector_uuid=vector.uuid,
... cell_size=10,
... output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'cell_size': cell_size
}
query = await self._add_params_to_query(query_name='$hexagon_grid', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def intersection(self,
vector_uuid: str,
intersect_vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Calculates intersections between geometries in two layers, retaining only overlapping portions
Args:
vector_uuid (str): UUID of the vector layer
intersect_vector_uuid (str): UUID of the intersect layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> intersect_vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.hexagon_grid(
... vector_uuid=vector.uuid,
... intersect_vector_uuid=intersect_vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.hexagon_grid(
... vector_uuid=vector.uuid,
... intersect_vector_uuid=intersect_vector.uuid,
... output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'intersect': intersect_vector_uuid
}
query = await self._add_params_to_query(query_name='$intersection', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def join(self,
vector1_uuid: str,
vector2_uuid: str,
vector1_join_column: str,
vector2_join_column: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Joins two layers based on specified columns, combining attributes from both tables into one
Args:
vector1_uuid (str): UUID of the first vector layer
vector2_uuid (str): UUID of the second vector layer
vector1_join_column (str): Join column from first layer
vector2_join_column (str): Join column from second layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector1 = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> vector2 = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.join(
... vector1_uuid=vector1.uuid,
... vector2_uuid=vector2.uuid,
... vector1_join_column="vector1_field_name",
... vector2_join_column="vector2_field_name")
>>> # save as layer
>>> task = await client.vector_tool.join(
... vector1_uuid=vector1.uuid,
... vector2_uuid=vector2.uuid,
... vector1_join_column="vector1_field_name",
... vector2_join_column="vector2_field_name",
... output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer1': vector1_uuid,
'layer2': vector2_uuid,
'layer1_join_column': vector1_join_column,
'layer2_join_column': vector2_join_column
}
query = await self._add_params_to_query(query_name='$join', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def lat_lon_to_point(self,
latitude: float,
longitude: float,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Converts latitude and longitude values to a point geometry, storing it as a new feature
Args:
latitude (float): Latitude coordinate
longitude (float): Longitude coordinate
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> # execution
>>> result = await client.vector_tool.lat_lon_to_point(
... latitude=10,
... longitude=10)
>>> # save as layer
>>> task = await client.vector_tool.lat_lon_to_point(
... latitude=10,
... longitude=10,
... output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'latitude': latitude,
'longitude': longitude
}
query = await self._add_params_to_query(query_name='$lat_lon_to_point', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def layer_bbox(self,
vector_uuid: str,
srid: int) -> Dict:
"""
[async] Computes the bounding box for all geometries in the layer, outputting it as a single text attribute
Args:
vector_uuid (str): UUID of the vector layer
srid (int): SRID for the output bounding box
Returns:
Dict: the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = client.vector_tool.layer_bbox(vector_uuid=vector.uuid, srid=4326)
"""
inputs = {
'layer': vector_uuid,
'srid': srid
}
query = await self._add_params_to_query(query_name='$layer_bbox', inputs=inputs)
return await self._run_query(query=query)
[docs]
async def layer_extent(self,
vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Calculates the spatial extent of the entire layer, producing a bounding box polygon around all geometries
Args:
vector_uuid (str): UUID of the vector layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.layer_extent(vector_uuid=vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.layer_extent(vector_uuid=vector.uuid, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid
}
query = await self._add_params_to_query(query_name='$layer_extent', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def length(self,
vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Computes the length of line geometries in the layer, adding it as an attribute for each feature
Args:
vector_uuid (str): UUID of the vector layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.length(vector_uuid=vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.length(vector_uuid=vector.uuid, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid
}
query = await self._add_params_to_query(query_name='$length', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def linear_referencing(self,
vector_uuid: str,
feature_id: int,
dist: float,
tolerance: float,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Returns points separated at a specified distance along a line, based on linear referencing techniques
Args:
vector_uuid (str): UUID of the vector layer
feature_id (int): ID of the feature to reference
dist (float): Distance along the line
tolerance (float): Tolerance for point placement
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> feature = await vector.get_features()
>>> # execution
>>> result = await client.vector_tool.linear_referencing(
... vector_uuid=vector.uuid,
... feature_id=feature.id,
... dist=10,
... tolerance=10)
>>> # save as layer
>>> task = await client.vector_tool.linear_referencing(
... vector_uuid=vector.uuid,
... feature_id=feature.id,
... dist=10,
... tolerance=10,
... output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'feature_id': feature_id,
'dist': dist,
'tolerance': tolerance
}
query = await self._add_params_to_query(query_name='$linear_referencing', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def line_to_polygon(self,
vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Converts line geometries into polygons by closing the line segments to form closed shapes
Args:
vector_uuid (str): UUID of the vector layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.line_to_polygon(vector_uuid=vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.line_to_polygon(vector_uuid=vector.uuid, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid
}
query = await self._add_params_to_query(query_name='$line_to_polygon', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def make_envelop(self,
xmin: float,
ymin: float,
xmax: float,
ymax: float,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Generates a rectangular bounding box based on provided minimum and maximum x and y coordinates
Args:
xmin (float): Minimum x coordinate
ymin (float): Minimum y coordinate
xmax (float): Maximum x coordinate
ymax (float): Maximum y coordinate
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.make_envelop(xmin=10. ymin=10, xmax=10, ymax=10)
>>> # save as layer
>>> task = await client.vector_tool.make_envelop(xmin=10. ymin=10, xmax=10, ymax=10, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'xmin': xmin,
'ymin': ymin,
'xmax': xmax,
'ymax': ymax,
}
query = await self._add_params_to_query(query_name='$make_envelop', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def merge(self,
vector1_uuid: str,
vector2_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Combines two layers into one by uniting their attributes and geometries, forming a single cohesive layer
Args:
vector1_uuid (str): UUID of the first vector layer
vector2_uuid (str): UUID of the second vector layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector1 = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> vector2 = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.merge(vector1_uuid=vector1.uuid, vector2_uuid=vector2.uuid)
>>> # save as layer
>>> task = await client.vector_tool.merge(vector1_uuid=vector1.uuid, vector2_uuid=vector2.uuid, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer1': vector1_uuid,
'layer2': vector2_uuid
}
query = await self._add_params_to_query(query_name='$merge', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def network_trace(self,
vector_uuid: str,
from_id: int,
direction: NetworkTraceDirection,
tolerance: float,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Performs a recursive spatial network trace, traversing connected lines based on specified direction and tolerance
Args:
vector_uuid (str): UUID of the vector layer
from_id (int): Starting feature ID for the trace
direction (NetworkTraceDirection): Direction of trace: UP(upstream) or DOWN(downstream)
tolerance (float): Tolerance for network connectivity
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.network_trace(
... vector_uuid=vector1.uuid,
... from_id=10,
... direction=NetworkTraceDirection.UP,
... tolerance=10)
>>> # save as layer
>>> task = await client.vector_tool.network_trace(
... vector_uuid=vector1.uuid,
... from_id=10,
... direction=NetworkTraceDirection.UP,
... tolerance=10),
... output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'from_id': from_id,
'direction': direction,
'tolerance': tolerance
}
query = await self._add_params_to_query(query_name='$network_trace', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def number_of_geoms(self,
vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Counts the number of geometry parts in each multi-part shape, adding this count as a new column
Args:
vector_uuid (str): UUID of the vector layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.number_of_geoms(vector_uuid=vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.number_of_geoms(vector_uuid=vector.uuid, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid
}
query = await self._add_params_to_query(query_name='$number_of_geoms', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def number_of_points(self,
vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Counts the number of vertices in each geometry, providing an attribute with this point count
Args:
vector_uuid (str): UUID of the vector layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.number_of_points(vector_uuid=vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.number_of_points(vector_uuid=vector.uuid, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid
}
query = await self._add_params_to_query(query_name='$number_of_points', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def point_stats_in_polygon(self,
polygon_vector_uuid: str,
point_vector_uuid: str,
stats_column: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Aggregates statistical data for points within each polygon, calculating sum, average, minimum, and maximum
Args:
polygon_vector_uuid (str): UUID of the polygon layer
point_vector_uuid (str): UUID of the point layer
stats_column (str): Column to calculate statistics on
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> polygon_vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> point_vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.point_stats_in_polygon(
... polygon_vector_uuid=polygon_vector.uuid,
... point_vector_uuid=point_vector.uuid,
... stats_column="field_name")
>>> # save as layer
>>> task = await client.vector_tool.point_stats_in_polygon(
... polygon_vector_uuid=polygon_vector.uuid,
... point_vector_uuid=point_vector.uuid,
... stats_column="field_name"),
... output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'polygon_layer': polygon_vector_uuid,
'point_layer': point_vector_uuid,
'stats_column': stats_column
}
query = await self._add_params_to_query(query_name='$point_stats_in_polygon', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def remove_holes(self,
vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Removes interior holes from polygon geometries, leaving only the outer boundary of each shape
Args:
vector_uuid (str): UUID of the vector layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.remove_holes(vector_uuid=vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.remove_holes(vector_uuid=vector.uuid, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid
}
query = await self._add_params_to_query(query_name='$remove_holes', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def reverse_line(self,
vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Reverses the direction of line geometries, swapping start and end points of each line
Args:
vector_uuid (str): UUID of the vector layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.reverse_line(vector_uuid=vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.reverse_line(vector_uuid=vector.uuid, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid
}
query = await self._add_params_to_query(query_name='$reverse_line', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def simplify(self,
vector_uuid: str,
tolerance: float,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Simplifies geometries based on a tolerance value, reducing detail while preserving general shape
Args:
vector_uuid (str): UUID of the vector layer
tolerance (float): Simplification tolerance
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.simplify(
... vector_uuid=vector.uuid
... tolerance=10)
>>> # save as layer
>>> task = await client.vector_tool.simplify(
... vector_uuid=vector.uuid
... tolerance=10,
... output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'tolerance': tolerance
}
query = await self._add_params_to_query(query_name='$simplify', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def snap_to_grid(self,
vector_uuid: str,
grid_size: float,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Aligns geometries to a grid of a specified size, rounding coordinates to fall on the grid lines
Args:
vector_uuid (str): UUID of the vector layer
grid_size (float): Size of the grid cells
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.snap_to_grid(
... vector_uuid=vector.uuid
... grid_size=10)
>>> # save as layer
>>> task = await client.vector_tool.snap_to_grid(
... vector_uuid=vector.uuid
... grid_size=10,
... output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'grid_size': grid_size
}
query = await self._add_params_to_query(query_name='$snap_to_grid', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def spatial_aggregation(self,
vector_uuid: str,
agg_function: 'SpatialAggFunction',
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Aggregates geometries by performing spatial functions like union or extent on all geometries in the layer
Args:
vector_uuid (str): UUID of the vector layer
agg_function (SpatialAggFunction): Aggregation function: collect, union, extent, makeline
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> from geobox.vector_tool import SpatialAggFunction
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.spatial_aggregation(
... vector_uuid=vector.uuid
... agg_function=SpatialAggFunction.COLLECT)
>>> # save as layer
>>> task = await client.vector_tool.spatial_aggregation(
... vector_uuid=vector.uuid
... agg_function=SpatialAggFunction.COLLECT,
... output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'agg_function': agg_function
}
query = await self._add_params_to_query(query_name='$spatial_aggregation', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def spatial_filter(self,
vector_uuid: str,
spatial_predicate: 'SpatialPredicate',
filter_vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Filters features in a layer based on spatial relationships with a filter layer, such as intersects, contains, or within
Args:
vector_uuid (str): UUID of the vector layer
spatial_predicate (SpatialPredicate): Spatial predicate: Intersect, Contain, Cross, Equal, Overlap, Touch, Within
filter_vector_uuid (str): UUID of the filter layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> from geobox.vector_tool import SpatialAggFunction
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> filter_vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.spatial_filter(
... vector_uuid=vector.uuid
... spatial_predicate=SpatialPredicate.INTERSECT,
... filter_vector_uuid=filter_vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.spatial_filter(
... vector_uuid=vector.uuid
... spatial_predicate=SpatialPredicate.INTERSECT,
... filter_vector_uuid=filter_vector.uuid,
... output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'spatial_predicate': spatial_predicate,
'filter_layer': filter_vector_uuid
}
query = await self._add_params_to_query(query_name='$spatial_filter', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def spatial_group_by(self,
vector_uuid: str,
group_column_name: str,
agg_function: 'SpatialAggFunction',
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Groups geometries by a specified column and aggregates them using spatial functions like union, collect, or extent
Args:
vector_uuid (str): UUID of the vector layer
group_column_name (str): Column to group by
agg_function (SpatialAggFunction): Spatial aggregation function: collect, union, extent, makeline
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> from geobox.vector_tool import GroupByAggFunction
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.spatial_group_by(
... vector_uuid=vector.uuid,
... group_column_name="field_1",
... agg_function=SpatialAggFunction.COLLECT)
>>> # save as layer
>>> task = await client.vector_tool.spatial_group_by(
... vector_uuid=vector.uuid,
... group_column_name="field_1",
... agg_function=SpatialAggFunction.COLLECT,
... output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'group_column': group_column_name,
'agg_function': agg_function.value
}
query = await self._add_params_to_query(query_name='$spatial_group_by', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def square_grid(self,
vector_uuid: str,
cell_size: float,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Generates a square grid across the layer extent, counting how many geometries intersect each square cell
Args:
vector_uuid (str): UUID of the vector layer
cell_size (float): Size of square grid cells
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.square_grid(
... vector_uuid=vector.uuid,
... cell_size=10)
>>> # save as layer
>>> task = await client.vector_tool.square_grid(
... vector_uuid=vector.uuid,
... cell_size=10,
... output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'cell_size': cell_size
}
query = await self._add_params_to_query(query_name='$square_grid', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def voronoi_polygons(self,
vector_uuid: str,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Creates Voronoi polygons from input points, partitioning the space so each polygon surrounds a unique point
Args:
vector_uuid (str): UUID of the vector layer
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.voronoi_polygons(vector_uuid=vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.voronoi_polygons(vector_uuid=vector.uuid, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid
}
query = await self._add_params_to_query(query_name='$voronoi_polygons', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def within_distance(self,
vector_uuid: str,
filter_vector_uuid: str,
dist: float,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Filters geometries that are within a specified distance of a filter layer, useful for proximity analysis
Args:
vector_uuid (str): UUID of the vector layer
filter_vector_uuid (str): UUID of the filter layer
dist (float): Search distance
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> filter_vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.within_distance(
... vector_uuid=vector.uuid,
... filter_vector_uuid=filter_vector=filter_vector.uuid,
... dist=10)
>>> # save as layer
>>> task = await client.vector_tool.within_distance(
... vector_uuid=vector.uuid,
... filter_vector_uuid=filter_vector=filter_vector.uuid,
... dist=10,
... output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'filter_layer': filter_vector_uuid,
'dist': dist
}
query = await self._add_params_to_query(query_name='$within_distance', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def xy_coordinate(self,
vector_uuid: str,
srid: Optional[int] = AsyncFeature.BASE_SRID,
output_layer_name: Optional[str] = None) -> Union['AsyncTask', Dict]:
"""
[async] Extracts the X and Y coordinates for each geometry in a layer, adding coord_x and coord_y columns
Args:
vector_uuid (str): UUID of the vector layer
srid (int, optional): SRID for coordinate extraction. default: 3857
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> vector = await client.get_vector(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.xy_coordinate(vector_uuid=vector.uuid)
>>> # save as layer
>>> task = await client.vector_tool.xy_coordinate(vector_uuid=vector.uuid, output_layer_name="output_layer")
>>> await task.wait()
>>> output_layer = await task.output_asset
"""
inputs = {
'layer': vector_uuid,
'srid': srid
}
query = await self._add_params_to_query(query_name='$xy_coordinate', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)
[docs]
async def table_to_vectorlayer(self,
table_uuid: str,
lon_field: str,
lat_field: str,
output_layer_name: Optional[str] = None,
) -> Union['AsyncTask', Dict]:
"""
[async] Converts a table to a vector layer by converting the lon and lat columns to a point geometry.
Args:
table_uuid (str): UUID of the vector layer
lon_field (str): field name containing lon value
lat_field (str): field name containing lat value
output_layer_name (str, optional): Name for the output layer. name must be a valid identifier and without spacing.
Returns:
Union['AsyncTask', Dict]: If output_layer_name is specified, the function returns a task object; if not, it returns the vector tool execution result.
Example:
>>> from geobox.aio import AsyncGeoboxClient
>>> async with AsyncGeoboxClient() as client:
>>> table = await client.get_table(uuid="12345678-1234-5678-1234-567812345678")
>>> # execution
>>> result = await client.vector_tool.table_to_vectorlayer(
... table_uuid=table.uuid,
... lon_field="lon_field_name",
... lat_field="lat_field_name",
... )
>>> # save as layer
>>> task = await client.vector_tool.table_to_vectorlayer(
... table_uuid=table.uuid,
... lon_field="lon_field_name",
... lat_field="lat_field_name",
... output_layer_name="output_layer",
... )
>>> task.wait()
>>> output_layer = task.output_asset
"""
inputs = {
'table': table_uuid,
'lon': lon_field,
'lat': lat_field,
}
query = await self._add_params_to_query(query_name='$tabel_to_vector_layer', inputs=inputs)
return await self._run_query(query=query, output_layer_name=output_layer_name)