Polling for Notifications
Polling allows you to retrieve notifications by making periodic API requests. This guide shows you how to implement notification polling effectively.
Overview
The Thirdfort API provides organization-level notifications that you can retrieve by polling the ListNotifications endpoint. Notifications are returned in chronological order (oldest first), making them suitable for cursor-based consumption.
When to Use Polling
Use polling when:
- You cannot expose a public webhook endpoint
- You prefer simpler infrastructure (no webhook server required)
- You have moderate notification volume
- Slight delays (30-60 seconds) are acceptable
Use webhooks when:
- You need real-time notifications (sub-second delivery)
- You have high notification volume
- You want to minimize API calls
- You can maintain a public HTTPS endpoint
Basic Polling Pattern
Initial Request
Retrieve the first page of notifications:
GET /v1/organizations/{organization}/notifications?page_size=25
Response:
{
"notifications": [
{
"name": "organizations/acme-corp/notifications/01JRA4ABC123",
"type": "notification.client.check_complete.v1",
"createTime": "2024-06-15T10:30:00Z",
"payload": {
"@type": "type.googleapis.com/thirdfort.client.notifications.v1.CheckCompleteData",
"check": "organizations/acme-corp/teams/default/checks/01JRA3XYZ789",
"displayName": "John Smith ID Verification",
"state": "COMPLETE",
"recommendation": "APPROVE"
}
}
],
"nextPageToken": "01JRA4ABC123"
}
Subsequent Requests
Use the nextPageToken from the previous response to get newer notifications:
GET /v1/organizations/{organization}/notifications?page_size=25&page_token=01JRA4ABC123
Implementation Examples
Python
import requests
import time
from typing import Optional
class NotificationPoller:
def __init__(self, base_url: str, organization: str, access_token: str):
self.base_url = base_url
self.organization = organization
self.headers = {"Authorization": f"Bearer {access_token}"}
self.cursor: Optional[str] = None
def poll_once(self) -> list:
"""Poll for new notifications once."""
url = f"{self.base_url}/v1/organizations/{self.organization}/notifications"
params = {"page_size": 25}
if self.cursor:
params["page_token"] = self.cursor
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
data = response.json()
notifications = data.get("notifications", [])
# Update cursor for next poll
if "nextPageToken" in data:
self.cursor = data["nextPageToken"]
return notifications
def poll_continuously(self, interval_seconds: int = 30, callback=None):
"""Poll continuously at the specified interval."""
while True:
try:
notifications = self.poll_once()
if notifications and callback:
for notification in notifications:
callback(notification)
time.sleep(interval_seconds)
except Exception as e:
print(f"Error polling notifications: {e}")
time.sleep(interval_seconds)
# Usage
def handle_notification(notification):
print(f"Received: {notification['type']}")
# Process notification...
poller = NotificationPoller(
base_url="https://api.thirdfort.com/client/api",
organization="acme-corp",
access_token="your_access_token"
)
# Poll every 30 seconds
poller.poll_continuously(interval_seconds=30, callback=handle_notification)
JavaScript/TypeScript
interface Notification {
name: string;
type: string;
createTime: string;
payload: any;
}
interface ListNotificationsResponse {
notifications: Notification[];
nextPageToken?: string;
}
class NotificationPoller {
private cursor: string | null = null;
constructor(
private baseUrl: string,
private organization: string,
private accessToken: string
) {}
async pollOnce(): Promise<Notification[]> {
const url = new URL(
`/v1/organizations/${this.organization}/notifications`,
this.baseUrl
);
url.searchParams.set('page_size', '25');
if (this.cursor) {
url.searchParams.set('page_token', this.cursor);
}
const response = await fetch(url.toString(), {
headers: {
'Authorization': `Bearer ${this.accessToken}`
}
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const data: ListNotificationsResponse = await response.json();
// Update cursor for next poll
if (data.nextPageToken) {
this.cursor = data.nextPageToken;
}
return data.notifications || [];
}
async pollContinuously(
intervalSeconds: number = 30,
callback?: (notification: Notification) => void | Promise<void>
): Promise<void> {
while (true) {
try {
const notifications = await this.pollOnce();
if (notifications.length > 0 && callback) {
for (const notification of notifications) {
await callback(notification);
}
}
await new Promise(resolve => setTimeout(resolve, intervalSeconds * 1000));
} catch (error) {
console.error('Error polling notifications:', error);
await new Promise(resolve => setTimeout(resolve, intervalSeconds * 1000));
}
}
}
}
// Usage
const poller = new NotificationPoller(
'https://api.thirdfort.com/client/api',
'acme-corp',
'your_access_token'
);
// Poll every 30 seconds
poller.pollContinuously(30, async (notification) => {
console.log(`Received: ${notification.type}`);
// Process notification...
});
Best Practices
1. Store the Cursor
Always store the nextPageToken between polling cycles. This ensures you don't miss notifications or process duplicates.
# Good: Store cursor persistently
cursor = load_cursor_from_database()
poller.cursor = cursor
notifications = poller.poll_once()
save_cursor_to_database(poller.cursor)
2. Implement Deduplication
Use the notification ID (last segment of the name field) to prevent processing the same notification twice:
processed_ids = set()
def handle_notification(notification):
notification_id = notification["name"].split("/")[-1]
if notification_id in processed_ids:
return # Already processed
# Process notification
process(notification)
# Mark as processed
processed_ids.add(notification_id)
save_processed_id(notification_id)
3. Choose an Appropriate Polling Interval
Recommended intervals:
- 30-60 seconds: Standard use cases
- 10-30 seconds: Time-sensitive workflows
- 60-300 seconds: Low-priority notifications
Avoid polling more frequently than every 10 seconds to prevent rate limiting.
4. Handle Errors Gracefully
Implement exponential backoff for transient errors:
import time
def poll_with_backoff(poller, max_retries=3):
retries = 0
backoff = 1
while retries < max_retries:
try:
return poller.poll_once()
except requests.exceptions.RequestException as e:
retries += 1
if retries >= max_retries:
raise
print(f"Error polling (attempt {retries}/{max_retries}): {e}")
time.sleep(backoff)
backoff *= 2 # Exponential backoff
5. Filter Notifications
Use filters to retrieve only relevant notifications:
# Only check completion notifications
GET /v1/organizations/{org}/notifications?filter=type = "notification.client.check_complete.v1"
# Only recent notifications
GET /v1/organizations/{org}/notifications?filter=create_time > "2024-06-01T00:00:00Z"
# Combined filters
GET /v1/organizations/{org}/notifications?filter=type = "notification.client.check_complete.v1" AND create_time > "2024-06-01T00:00:00Z"
Note: URL-encode filter strings in actual requests.
Handling Notification Types
Different notification types contain different payload structures. Always check the @type field.
Available Notification Types
The full list of notification types and their payload schemas is in the Notification schema. Note that if you receive the notification via webhook, the @type field will not be populated, and the x-thirdfort-notification-type header will indicate the type of notification instead.
Handling Example
def handle_notification(notification):
payload = notification["payload"]
notification_type = notification["type"]
if notification_type == "notification.client.check_complete.v1":
check_name = payload["check"]
recommendation = payload["recommendation"]
print(f"Check {check_name} completed with recommendation: {recommendation}")
elif notification_type == "notification.client.report_generated.v1":
check_name = payload["check"]
print(f"Report generated for check: {check_name}")
elif notification_type == "notification.client.ongoing_monitoring_hit.v1":
check_name = payload["check"]
hit_count = payload.get("hitCount", 0)
print(f"Monitoring hit detected for {check_name}: {hit_count} hits")
else:
print(f"Unknown notification type: {notification_type}")
Idempotency
Notifications include a unique resource name (e.g., organizations/{org}/notifications/01JRA4ABC123). Use the notification ID portion (the last segment) for deduplication when processing notifications.
Backfilling Missed Notifications
If your polling service was offline, you can backfill missed notifications using time-based filtering:
from datetime import datetime, timedelta
def backfill_notifications(poller, since: datetime):
"""Retrieve all notifications since a specific time."""
# Format timestamp for filter
since_str = since.strftime("%Y-%m-%dT%H:%M:%SZ")
url = f"{poller.base_url}/v1/organizations/{poller.organization}/notifications"
params = {
"page_size": 100, # Use max page size for backfill
"filter": f'create_time > "{since_str}"'
}
all_notifications = []
while True:
response = requests.get(url, headers=poller.headers, params=params)
response.raise_for_status()
data = response.json()
notifications = data.get("notifications", [])
all_notifications.extend(notifications)
if "nextPageToken" not in data:
break
params["page_token"] = data["nextPageToken"]
return all_notifications
# Backfill last 24 hours
since = datetime.utcnow() - timedelta(hours=24)
missed_notifications = backfill_notifications(poller, since)
Monitoring Your Polling Service
Track these metrics to ensure reliable notification delivery:
- Poll success rate: Percentage of successful polls
- Notifications per poll: Average number of notifications retrieved
- Processing latency: Time from notification creation to processing
- Cursor age: Time since the cursor was last updated
import time
from datetime import datetime
class MonitoredPoller(NotificationPoller):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.metrics = {
"total_polls": 0,
"successful_polls": 0,
"total_notifications": 0,
"last_poll_time": None
}
def poll_once(self):
self.metrics["total_polls"] += 1
start_time = time.time()
try:
notifications = super().poll_once()
self.metrics["successful_polls"] += 1
self.metrics["total_notifications"] += len(notifications)
self.metrics["last_poll_time"] = datetime.utcnow()
# Log metrics
success_rate = (self.metrics["successful_polls"] / self.metrics["total_polls"]) * 100
print(f"Poll metrics: {success_rate:.1f}% success, "
f"{len(notifications)} notifications, "
f"{time.time() - start_time:.2f}s latency")
return notifications
except Exception as e:
print(f"Poll failed: {e}")
raise
Troubleshooting
No Notifications Returned
Possible causes:
- No new notifications since last poll
- Cursor is already at the latest notification
- Filter is too restrictive
Solution: Remove filters and check if notifications exist.
Duplicate Notifications
Possible causes:
- Cursor not being stored correctly
- Multiple polling instances running
Solution: Implement deduplication using notification IDs.
Missing Notifications
Possible causes:
- Polling service was offline
- Cursor was reset incorrectly
Solution: Use backfill pattern with time-based filtering.
Webhooks (Alternative to Polling)
For real-time push delivery instead of polling, Thirdfort can configure webhook endpoints at the organization level. Webhooks provide sub-second notification delivery and reduce API calls.
Webhook Setup
Contact api-support@thirdfort.com to configure webhooks for your organization. You'll need to provide your webhook endpoint URL (must be HTTPS).
Thirdfort will provide you with an HMAC secret for signature verification.
Webhook Signature Verification
All webhook requests will include an x-thirdfort-signature header containing an HMAC-SHA256 signature. You must always verify this signature before processing the notification.
Python Example:
import hashlib
import hmac
import base64
def verify_webhook(request, secret):
# Extract signature header
signature_header = request.headers.get("x-thirdfort-signature")
# Compute expected signature
expected_signature = base64.b64encode(
hmac.new(
secret.encode(),
request.body,
hashlib.sha256
).digest()
).decode()
# Compare signatures (constant-time comparison)
return hmac.compare_digest(signature_header, expected_signature)
# Usage
if verify_webhook(request, "your-hmac-secret"):
# Process notification
notification = request.json()
else:
# Invalid signature - reject request
return 401
Webhook Headers
Webhook requests include these headers:
x-thirdfort-notification-id- Unique notification IDx-thirdfort-notification-type- Notification type (e.g.,notification.client.check_complete.v1)x-thirdfort-signature- HMAC-SHA256 signature (base64-encoded)
Webhook Payload
The webhook payload structure differs depending on the type of notification. For example, a notification.client.check_complete.v1 payload includes:
check- The check name (e.g.,organizations/{org}/teams/{team}/checks/{check})displayName- The check display name (e.g.,Sale of 10 Mayfair)recommendation- The overall recommendation of the check (e.g.,APPROVE)state- The current state of the check (e.g.,COMPLETE)
Next Steps
- Production deployment: Set up monitoring and alerting
- Webhook setup: Contact api-support@thirdfort.com for real-time delivery
- Notification handling: Implement business logic for each notification type