Skip to main content

Resources

Each resource provides clean CRUD methods with automatic timeout and exception handling.

Geofences

# List geofences
response = await client.geofences.list(limit=100, offset=0)
for geofence in response.geofences:
print(geofence.name)

# Create a geofence
geofence = await client.geofences.create(
models.CreateGeofenceRequest(
name="My Region",
geometry={"type": "Polygon", "coordinates": [...]},
group_name="my-group", # Optional grouping
)
)

# Get a geofence
geofence = await client.geofences.get(geofence_id="...")

# Update a geofence
geofence = await client.geofences.update(
geofence_id="...",
request=models.UpdateGeofenceRequest(name="New Name")
)

# Delete a geofence
await client.geofences.delete(geofence_id="...")

# Create with group (convenience method)
geofence = await client.geofences.create_with_group(
name="Zone 1",
geometry={...},
group_name="delivery-zones",
)

# List by group
response = await client.geofences.list_by_group(group_id="...")

Workflows

# List workflows
response = await client.workflows.list()

# Create a workflow
workflow = await client.workflows.create(
models.WorkflowIn(
name="Entry Alert",
nodes=[...],
edges=[...],
)
)

# Get a workflow
workflow = await client.workflows.get(workflow_id="...")

# Update a workflow
workflow = await client.workflows.update(
workflow_id="...",
request=models.WorkflowIn(name="Updated", nodes=[...], edges=[...])
)

# Toggle active/paused
workflow = await client.workflows.toggle(workflow_id="...")

# Delete a workflow
await client.workflows.delete(workflow_id="...")

# Activate a draft workflow (v0.2.0+)
workflow = await client.workflows.activate(workflow_id="...")

Workflow Execution

# Execute a workflow
result = await client.workflows.execute(
workflow_id="...",
request=models.WorkflowExecutionRequest(...) # Optional
)

# Test a workflow with sample data
result = await client.workflows.test(
workflow_id="...",
request=models.WorkflowTestRequest(...) # Optional
)

# List executions for a workflow
executions = await client.workflows.list_executions(
workflow_id="...",
limit=50,
offset=0,
)

# Get execution details
execution = await client.workflows.get_execution(
workflow_id="...",
execution_id="...",
)

Workflow Monitoring

# Get performance metrics
performance = await client.workflows.get_performance(workflow_id="...")

# Get statistics
stats = await client.workflows.get_statistics(workflow_id="...")

# Get retry policy
policy = await client.workflows.get_retry_policy(workflow_id="...")

# Update retry policy
policy = await client.workflows.update_retry_policy(
workflow_id="...",
request=models.WorkflowRetryPolicyRequest(max_retries=5, backoff_seconds=120)
)

# Get bottleneck analysis
bottlenecks = await client.workflows.get_bottlenecks(workflow_id="...")

# Get step-level performance
step_perf = await client.workflows.get_step_performance(workflow_id="...")

# List all executions across workspace
all_executions = await client.workflows.list_all_executions(limit=100)

Workflow Versioning

# List versions
versions = await client.workflows.list_versions(workflow_id="...")

# Restore a previous version
workflow = await client.workflows.restore_version(
workflow_id="...",
version_number=3,
)

# Duplicate a workflow
duplicate = await client.workflows.duplicate(
workflow_id="...",
request=models.WorkflowDuplicateRequest(name="Copy of Workflow"),
)

# Export workflow as JSON
export_data = await client.workflows.export_workflow(workflow_id="...")

# Import workflow from JSON
workflow = await client.workflows.import_workflow(
request=models.WorkflowImportRequest(data=export_data)
)
tip

Use the workflow builders to create workflow configurations programmatically.

Webhooks

# List webhooks
response = await client.webhooks.list()

# Create a webhook
webhook = await client.webhooks.create(
models.CreateWebhookRequest(
name="My Webhook",
url="https://example.com/webhook",
events=["geofence.enter", "geofence.exit"],
)
)

# Get a webhook
webhook = await client.webhooks.get(webhook_id="...")

# Update a webhook
webhook = await client.webhooks.update(
webhook_id="...",
request=models.UpdateWebhookRequest(url="https://new-url.com/webhook")
)

# Rotate secret
result = await client.webhooks.rotate_secret(webhook_id="...")
print(f"New secret: {result.secret}")

# Delete a webhook
await client.webhooks.delete(webhook_id="...")

# Test a webhook
result = await client.webhooks.test(
webhook_id="...",
request=models.WebhookTestRequest(...) # Optional
)

Webhook Delivery Tracking

# List deliveries for a webhook
deliveries = await client.webhooks.list_deliveries(
webhook_id="...",
limit=50,
offset=0,
)

# Get delivery details
delivery = await client.webhooks.get_delivery(
webhook_id="...",
delivery_id="...",
)

# Retry a failed delivery
result = await client.webhooks.retry_delivery(
webhook_id="...",
delivery_id="...",
)

Webhook Monitoring

# Get webhook metrics
metrics = await client.webhooks.get_metrics(webhook_id="...")

# Get success/failure timeline
timeline = await client.webhooks.get_success_timeline(webhook_id="...")

Dead Letter Queue (DLQ)

# List DLQ entries
dlq_entries = await client.webhooks.list_dlq_entries(limit=50)

# Retry from DLQ
result = await client.webhooks.retry_dlq(dlq_entry_id="...")

Devices

# List devices
response = await client.devices.list()

# Create a device
device = await client.devices.create(
models.DeviceIn(
name="Truck 001",
device_id="truck-001",
)
)
device_uuid = device.id # Use device UUID for route params

# Get a device
device = await client.devices.get(device_id=device_uuid)

# Update a device
device = await client.devices.update(
device_id=device_uuid,
request=models.DeviceIn(name="Updated Name")
)

# Update device location
result = await client.devices.update_location(
device_id=device_uuid,
request=models.LocationUpdateRequest(
latitude=37.7749,
longitude=-122.4194,
)
)

# Delete a device
await client.devices.delete(device_id=device_uuid)

Locations (Public Ingest API)

The locations API accepts location updates for any device without pre-registration.

# Ingest single location
result = await client.locations.ingest(
device_id="device-123",
lat=37.7749,
lon=-122.4194,
accuracy=10.0, # Optional: GPS accuracy in meters
speed=5.5, # Optional: Speed in m/s
heading=90.0, # Optional: Heading in degrees
altitude=100.0, # Optional: Altitude in meters
metadata={"key": "value"}, # Optional
)

# Ingest batch (up to 5000 points)
result = await client.locations.ingest_batch(
locations=[
{"device_id": "dev-1", "lat": 37.77, "lon": -122.41, "ts": "2024-01-15T10:00:00Z"},
{"device_id": "dev-1", "lat": 37.78, "lon": -122.42, "ts": "2024-01-15T10:01:00Z"},
],
idempotency_key="batch-001", # Optional: For deduplication
)

# Get ingestion stats
stats = await client.locations.get_stats()

Integrations

Integrations are reusable service connections (webhooks, Slack, SMS, etc.) for workflow actions.

# List integrations
response = await client.integrations.list()

# Create a generic integration
integration = await client.integrations.create(
models.CreateIntegrationSchema(
name="My Slack",
type="slack",
config={"webhook_url": "https://hooks.slack.com/..."},
)
)

# Create webhook integration (convenience method)
integration = await client.integrations.create_webhook(
name="My Webhook",
url="https://example.com/hook",
method="POST",
headers={"Authorization": "Bearer xxx"},
)

# Get an integration
integration = await client.integrations.get(integration_id="...")

# Update an integration
integration = await client.integrations.update(
integration_id="...",
request=models.UpdateIntegrationSchema(name="Updated Name")
)

# Test an integration
result = await client.integrations.test(integration_id="...")

# Delete an integration
await client.integrations.delete(integration_id="...")

Workspaces

Workspaces are the top-level organizational unit containing all your resources.

# Get current workspace
workspace = await client.workspaces.get()
print(f"Workspace: {workspace.name} ({workspace.slug})")

# Update workspace settings
workspace = await client.workspaces.update(
request=models.WorkspaceIn(name="Updated Workspace Name")
)

# Get usage metrics for current billing period
usage = await client.workspaces.get_usage()
print(f"Location events: {usage.location_events}")
print(f"Action deliveries: {usage.action_deliveries}")
print(f"Event units: {usage.event_units}")
print(f"Tier: {usage.tier} (limit: {usage.tier_limit})")

Storage

# Create presigned URL for upload
result = await client.storage.create_presigned_url(
models.PresignedUrlRequest(
filename="boundaries.geojson",
content_type="application/json",
)
)

# List files
response = await client.storage.list_files()

# Get download URL
result = await client.storage.get_download_url(file_id="...")

# Delete file
await client.storage.delete_file(file_id="...")

Account

The account resource provides access to user profile, API key management, dashboard metrics, and notifications.

User Profile

# Get current user's profile
profile = await client.account.get_profile()
print(f"User: {profile.email}")

# Update profile
profile = await client.account.update_profile(
models.UpdateProfileRequest(name="New Name")
)

API Key Management

# List all API keys
keys = await client.account.list_api_keys()
for key in keys:
print(f"{key.name}: {key.prefix}...")

# Get a specific API key
key = await client.account.get_api_key(key_id="...")

# Create a new API key
new_key = await client.account.create_api_key(
models.ApiKeyCreateRequest(name="Production Key")
)
print(f"New key (save this!): {new_key.key}")

# Update an API key
key = await client.account.update_api_key(
key_id="...",
request=models.ApiKeyUpdateRequest(name="Renamed Key")
)

# Rotate an API key (generates new key value)
rotated = await client.account.rotate_api_key(key_id="...")
print(f"New key value: {rotated.key}")

# Delete an API key
await client.account.delete_api_key(key_id="...")

Dashboard Metrics

# Get dashboard metrics
metrics = await client.account.get_dashboard_metrics()
print(f"Geofences: {metrics.geofence_count}")
print(f"Workflows: {metrics.workflow_count}")

Notifications

# Get notifications
notifications = await client.account.get_notifications()
for notif in notifications:
print(f"[{notif['read']}] {notif['message']}")

# Mark a notification as read
await client.account.mark_notification_read(notification_id="...")

# Mark all notifications as read
await client.account.mark_all_notifications_read()
note

Notification methods return List[Dict[str, Any]] as there is no specific response model for notifications in the generated API.

Raw API Access

For endpoints not covered by the clean API, use client.raw:

# Access raw generated APIs
response = await client.raw.geofences.apps_geofences_api_list_geofences()
response = await client.raw.workflows.apps_workflows_api_toggle_workflow(workflow_id="...")
response = await client.raw.locations.apps_public_locations_api_get_ingest_stats()

Raw API methods follow the pattern: apps_{resource}_api_{operation}