Skip to main content

File Uploads

Upload geofence files (GeoJSON, KML, GPX) with automatic job polling.

upload_geofences

Upload a file and wait for processing to complete:

from spatialflow import upload_geofences

result = await upload_geofences(
client,
"boundaries.geojson",
group_name="my-region",
timeout=120, # Job polling timeout (seconds)
upload_timeout=60, # S3 upload timeout (seconds)
on_status=lambda status, _: print(f"Status: {status}"),
)

print(f"Created {result.created_count} geofences")
for geofence in result.created_geofences:
print(f" - {geofence['name']} ({geofence['id']})")

Parameters

ParameterTypeDefaultDescription
clientSpatialFlowrequiredSpatialFlow client instance
filepathstrrequiredPath to the file to upload
group_namestrNoneOptional group name for created geofences
timeoutint300Max time to wait for job completion (seconds)
upload_timeoutint300Max time for S3 upload (seconds)
on_statusCallableNoneCallback for status updates (status, response)

Supported Formats

FormatExtensionDescription
GeoJSON.geojson, .jsonGeoJSON FeatureCollection or single Feature
KML.kmlKeyhole Markup Language
GPX.gpxGPS Exchange Format (tracks converted to polygons)

JobResult

The upload_geofences function returns a JobResult:

result = await upload_geofences(client, "file.geojson")

# Access results
print(result.status) # "completed"
print(result.created_count) # Number of geofences created
print(result.created_geofences) # List of created geofence dicts
print(result.errors) # List of error messages
print(result.warnings) # List of warning messages

Manual Job Polling

For lower-level control, use poll_job directly:

from spatialflow import poll_job, JobTimeoutError, JobFailedError

# Start an upload job
presigned = await client.storage.create_presigned_url(...)
# ... upload file to S3 ...
job = await client.geofences.start_upload(file_id=presigned.file_id)

# Poll for completion
async def get_status():
return await client.geofences.get_upload_status(job_id=job.job_id)

try:
result = await poll_job(
get_status,
timeout=120,
poll_interval=2.0,
on_status=lambda status, _: print(f"Status: {status}"),
)
print(f"Job completed: {result.created_count} created")
except JobTimeoutError as e:
print(f"Job timed out after {e.timeout}s (last status: {e.last_status})")
except JobFailedError as e:
print(f"Job failed: {e.error_message}")

poll_job Parameters

ParameterTypeDefaultDescription
get_status_fnCallablerequiredAsync function that returns job status
timeoutint120Max time to wait (seconds)
poll_intervalfloat2.0Time between status checks (seconds)
on_statusCallableNoneCallback (status, response) on each poll
terminal_statusesset{"completed", "failed"}Statuses that end polling

Error Handling

from spatialflow import (
upload_geofences,
JobTimeoutError,
JobFailedError,
)

try:
result = await upload_geofences(client, "file.geojson", timeout=60)
print(f"Success: {result.created_count} geofences")
except JobTimeoutError as e:
print(f"Upload timed out after {e.timeout}s")
print(f"Last status: {e.last_status}")
except JobFailedError as e:
print(f"Upload failed: {e.error_message}")
if e.results:
print(f"Partial results: {e.results}")
except FileNotFoundError:
print("File not found")
except ValueError as e:
print(f"Invalid file: {e}") # Unsupported format

Status Callback

Track progress with a status callback:

def on_status(status: str, response):
if status == "processing":
progress = getattr(response, "progress", None)
if progress:
print(f"Processing: {progress}%")
elif status == "completed":
print("Done!")
elif status == "failed":
print(f"Failed: {getattr(response, 'error', 'Unknown error')}")

result = await upload_geofences(
client,
"large-file.geojson",
on_status=on_status,
)

Large File Tips

  • Set appropriate timeouts for large files
  • Use on_status to monitor progress
  • Consider splitting very large files into smaller batches
  • GeoJSON is generally faster than KML for large datasets