Resources
Each resource provides clean CRUD methods with automatic timeout and exception handling.
Geofences
# List geofences
response = await client.geofences.list(limit=100, offset=0)
for geofence in response.geofences:
print(geofence.name)
# Create a geofence
geofence = await client.geofences.create(
models.CreateGeofenceRequest(
name="My Region",
geometry={"type": "Polygon", "coordinates": [...]},
group_name="my-group", # Optional grouping
)
)
# Get a geofence
geofence = await client.geofences.get(geofence_id="...")
# Update a geofence
geofence = await client.geofences.update(
geofence_id="...",
request=models.UpdateGeofenceRequest(name="New Name")
)
# Delete a geofence
await client.geofences.delete(geofence_id="...")
# Create with group (convenience method)
geofence = await client.geofences.create_with_group(
name="Zone 1",
geometry={...},
group_name="delivery-zones",
)
# List by group
response = await client.geofences.list_by_group(group_id="...")
Workflows
# List workflows
response = await client.workflows.list()
# Create a workflow
workflow = await client.workflows.create(
models.WorkflowIn(
name="Entry Alert",
nodes=[...],
edges=[...],
)
)
# Get a workflow
workflow = await client.workflows.get(workflow_id="...")
# Update a workflow
workflow = await client.workflows.update(
workflow_id="...",
request=models.WorkflowIn(name="Updated", nodes=[...], edges=[...])
)
# Toggle active/paused
workflow = await client.workflows.toggle(workflow_id="...")
# Delete a workflow
await client.workflows.delete(workflow_id="...")
# Activate a draft workflow (v0.2.0+)
workflow = await client.workflows.activate(workflow_id="...")
Workflow Execution
# Execute a workflow
result = await client.workflows.execute(
workflow_id="...",
request=models.WorkflowExecutionRequest(...) # Optional
)
# Test a workflow with sample data
result = await client.workflows.test(
workflow_id="...",
request=models.WorkflowTestRequest(...) # Optional
)
# List executions for a workflow
executions = await client.workflows.list_executions(
workflow_id="...",
limit=50,
offset=0,
)
# Get execution details
execution = await client.workflows.get_execution(
workflow_id="...",
execution_id="...",
)
Workflow Monitoring
# Get performance metrics
performance = await client.workflows.get_performance(workflow_id="...")
# Get statistics
stats = await client.workflows.get_statistics(workflow_id="...")
# Get retry policy
policy = await client.workflows.get_retry_policy(workflow_id="...")
# Update retry policy
policy = await client.workflows.update_retry_policy(
workflow_id="...",
request=models.WorkflowRetryPolicyRequest(max_retries=5, backoff_seconds=120)
)
# Get bottleneck analysis
bottlenecks = await client.workflows.get_bottlenecks(workflow_id="...")
# Get step-level performance
step_perf = await client.workflows.get_step_performance(workflow_id="...")
# List all executions across workspace
all_executions = await client.workflows.list_all_executions(limit=100)
Workflow Versioning
# List versions
versions = await client.workflows.list_versions(workflow_id="...")
# Restore a previous version
workflow = await client.workflows.restore_version(
workflow_id="...",
version_number=3,
)
# Duplicate a workflow
duplicate = await client.workflows.duplicate(
workflow_id="...",
request=models.WorkflowDuplicateRequest(name="Copy of Workflow"),
)
# Export workflow as JSON
export_data = await client.workflows.export_workflow(workflow_id="...")
# Import workflow from JSON
workflow = await client.workflows.import_workflow(
request=models.WorkflowImportRequest(data=export_data)
)
Use the workflow builders to create workflow configurations programmatically.
Webhooks
# List webhooks
response = await client.webhooks.list()
# Create a webhook
webhook = await client.webhooks.create(
models.CreateWebhookRequest(
name="My Webhook",
url="https://example.com/webhook",
events=["geofence.enter", "geofence.exit"],
)
)
# Get a webhook
webhook = await client.webhooks.get(webhook_id="...")
# Update a webhook
webhook = await client.webhooks.update(
webhook_id="...",
request=models.UpdateWebhookRequest(url="https://new-url.com/webhook")
)
# Rotate secret
result = await client.webhooks.rotate_secret(webhook_id="...")
print(f"New secret: {result.secret}")
# Delete a webhook
await client.webhooks.delete(webhook_id="...")
# Test a webhook
result = await client.webhooks.test(
webhook_id="...",
request=models.WebhookTestRequest(...) # Optional
)
Webhook Delivery Tracking
# List deliveries for a webhook
deliveries = await client.webhooks.list_deliveries(
webhook_id="...",
limit=50,
offset=0,
)
# Get delivery details
delivery = await client.webhooks.get_delivery(
webhook_id="...",
delivery_id="...",
)
# Retry a failed delivery
result = await client.webhooks.retry_delivery(
webhook_id="...",
delivery_id="...",
)
Webhook Monitoring
# Get webhook metrics
metrics = await client.webhooks.get_metrics(webhook_id="...")
# Get success/failure timeline
timeline = await client.webhooks.get_success_timeline(webhook_id="...")
Dead Letter Queue (DLQ)
# List DLQ entries
dlq_entries = await client.webhooks.list_dlq_entries(limit=50)
# Retry from DLQ
result = await client.webhooks.retry_dlq(dlq_entry_id="...")
Devices
# List devices
response = await client.devices.list()
# Create a device
device = await client.devices.create(
models.DeviceIn(
name="Truck 001",
device_id="truck-001",
)
)
device_uuid = device.id # Use device UUID for route params
# Get a device
device = await client.devices.get(device_id=device_uuid)
# Update a device
device = await client.devices.update(
device_id=device_uuid,
request=models.DeviceIn(name="Updated Name")
)
# Update device location
result = await client.devices.update_location(
device_id=device_uuid,
request=models.LocationUpdateRequest(
latitude=37.7749,
longitude=-122.4194,
)
)
# Delete a device
await client.devices.delete(device_id=device_uuid)
Locations (Public Ingest API)
The locations API accepts location updates for any device without pre-registration.
# Ingest single location
result = await client.locations.ingest(
device_id="device-123",
lat=37.7749,
lon=-122.4194,
accuracy=10.0, # Optional: GPS accuracy in meters
speed=5.5, # Optional: Speed in m/s
heading=90.0, # Optional: Heading in degrees
altitude=100.0, # Optional: Altitude in meters
metadata={"key": "value"}, # Optional
)
# Ingest batch (up to 5000 points)
result = await client.locations.ingest_batch(
locations=[
{"device_id": "dev-1", "lat": 37.77, "lon": -122.41, "ts": "2024-01-15T10:00:00Z"},
{"device_id": "dev-1", "lat": 37.78, "lon": -122.42, "ts": "2024-01-15T10:01:00Z"},
],
idempotency_key="batch-001", # Optional: For deduplication
)
# Get ingestion stats
stats = await client.locations.get_stats()
Integrations
Integrations are reusable service connections (webhooks, Slack, SMS, etc.) for workflow actions.
# List integrations
response = await client.integrations.list()
# Create a generic integration
integration = await client.integrations.create(
models.CreateIntegrationSchema(
name="My Slack",
type="slack",
config={"webhook_url": "https://hooks.slack.com/..."},
)
)
# Create webhook integration (convenience method)
integration = await client.integrations.create_webhook(
name="My Webhook",
url="https://example.com/hook",
method="POST",
headers={"Authorization": "Bearer xxx"},
)
# Get an integration
integration = await client.integrations.get(integration_id="...")
# Update an integration
integration = await client.integrations.update(
integration_id="...",
request=models.UpdateIntegrationSchema(name="Updated Name")
)
# Test an integration
result = await client.integrations.test(integration_id="...")
# Delete an integration
await client.integrations.delete(integration_id="...")
Workspaces
Workspaces are the top-level organizational unit containing all your resources.
# Get current workspace
workspace = await client.workspaces.get()
print(f"Workspace: {workspace.name} ({workspace.slug})")
# Update workspace settings
workspace = await client.workspaces.update(
request=models.WorkspaceIn(name="Updated Workspace Name")
)
# Get usage metrics for current billing period
usage = await client.workspaces.get_usage()
print(f"Location events: {usage.location_events}")
print(f"Action deliveries: {usage.action_deliveries}")
print(f"Event units: {usage.event_units}")
print(f"Tier: {usage.tier} (limit: {usage.tier_limit})")
Storage
# Create presigned URL for upload
result = await client.storage.create_presigned_url(
models.PresignedUrlRequest(
filename="boundaries.geojson",
content_type="application/json",
)
)
# List files
response = await client.storage.list_files()
# Get download URL
result = await client.storage.get_download_url(file_id="...")
# Delete file
await client.storage.delete_file(file_id="...")
Account
The account resource provides access to user profile, API key management, dashboard metrics, and notifications.
User Profile
# Get current user's profile
profile = await client.account.get_profile()
print(f"User: {profile.email}")
# Update profile
profile = await client.account.update_profile(
models.UpdateProfileRequest(name="New Name")
)
API Key Management
# List all API keys
keys = await client.account.list_api_keys()
for key in keys:
print(f"{key.name}: {key.prefix}...")
# Get a specific API key
key = await client.account.get_api_key(key_id="...")
# Create a new API key
new_key = await client.account.create_api_key(
models.ApiKeyCreateRequest(name="Production Key")
)
print(f"New key (save this!): {new_key.key}")
# Update an API key
key = await client.account.update_api_key(
key_id="...",
request=models.ApiKeyUpdateRequest(name="Renamed Key")
)
# Rotate an API key (generates new key value)
rotated = await client.account.rotate_api_key(key_id="...")
print(f"New key value: {rotated.key}")
# Delete an API key
await client.account.delete_api_key(key_id="...")
Dashboard Metrics
# Get dashboard metrics
metrics = await client.account.get_dashboard_metrics()
print(f"Geofences: {metrics.geofence_count}")
print(f"Workflows: {metrics.workflow_count}")
Notifications
# Get notifications
notifications = await client.account.get_notifications()
for notif in notifications:
print(f"[{notif['read']}] {notif['message']}")
# Mark a notification as read
await client.account.mark_notification_read(notification_id="...")
# Mark all notifications as read
await client.account.mark_all_notifications_read()
Notification methods return List[Dict[str, Any]] as there is no specific response model for notifications in the generated API.
Raw API Access
For endpoints not covered by the clean API, use client.raw:
# Access raw generated APIs
response = await client.raw.geofences.apps_geofences_api_list_geofences()
response = await client.raw.workflows.apps_workflows_api_toggle_workflow(workflow_id="...")
response = await client.raw.locations.apps_public_locations_api_get_ingest_stats()
Raw API methods follow the pattern: apps_{resource}_api_{operation}