Polling for Notifications
Polling allows you to retrieve notifications by making periodic API requests. This guide shows you how to implement notification polling effectively.
Overview
The Thirdfort API provides organization-level notifications that you can retrieve by polling the ListNotifications endpoint. Notifications are returned in chronological order (oldest first), making them suitable for cursor-based consumption.
When to Use Polling
Use polling when:
- You cannot expose a public webhook endpoint
- You prefer simpler infrastructure (no webhook server required)
- You have moderate notification volume
- Slight delays (30-60 seconds) are acceptable
Use webhooks when:
- You need real-time notifications (sub-second delivery)
- You have high notification volume
- You want to minimize API calls
- You can maintain a public HTTPS endpoint
Basic Polling Pattern
Initial Request
Retrieve the first page of notifications:
GET /v1/organizations/{organization}/notifications?page_size=25
Response:
{
"notifications": [
{
"name": "organizations/acme-corp/notifications/01JRA4ABC123",
"type": "notification.client.check_complete.v1",
"createTime": "2024-06-15T10:30:00Z",
"payload": {
"@type": "type.googleapis.com/thirdfort.client.notifications.v1alpha1.CheckCompleteData",
"check": "organizations/acme-corp/teams/default/checks/01JRA3XYZ789",
"displayName": "John Smith ID Verification",
"state": "ACTIVE",
"recommendation": "APPROVE",
"completeTime": "2024-06-15T10:29:55Z"
}
}
],
"nextPageToken": "01JRA4ABC123"
}
Subsequent Requests
Use the nextPageToken from the previous response to get newer notifications:
GET /v1/organizations/{organization}/notifications?page_size=25&page_token=01JRA4ABC123
Implementation Examples
Python
import requests
import time
from typing import Optional
class NotificationPoller:
def __init__(self, base_url: str, organization: str, access_token: str):
self.base_url = base_url
self.organization = organization
self.headers = {"Authorization": f"Bearer {access_token}"}
self.cursor: Optional[str] = None
def poll_once(self) -> list:
"""Poll for new notifications once."""
url = f"{self.base_url}/v1/organizations/{self.organization}/notifications"
params = {"page_size": 25}
if self.cursor:
params["page_token"] = self.cursor
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
data = response.json()
notifications = data.get("notifications", [])
# Update cursor for next poll
if "nextPageToken" in data:
self.cursor = data["nextPageToken"]
return notifications
def poll_continuously(self, interval_seconds: int = 30, callback=None):
"""Poll continuously at the specified interval."""
while True:
try:
notifications = self.poll_once()
if notifications and callback:
for notification in notifications:
callback(notification)
time.sleep(interval_seconds)
except Exception as e:
print(f"Error polling notifications: {e}")
time.sleep(interval_seconds)
# Usage
def handle_notification(notification):
print(f"Received: {notification['type']}")
# Process notification...
poller = NotificationPoller(
base_url="https://api.thirdfort.com/client/api",
organization="acme-corp",
access_token="your_access_token"
)
# Poll every 30 seconds
poller.poll_continuously(interval_seconds=30, callback=handle_notification)
JavaScript/TypeScript
interface Notification {
name: string;
type: string;
createTime: string;
payload: any;
}
interface ListNotificationsResponse {
notifications: Notification[];
nextPageToken?: string;
}
class NotificationPoller {
private cursor: string | null = null;
constructor(
private baseUrl: string,
private organization: string,
private accessToken: string
) {}
async pollOnce(): Promise<Notification[]> {
const url = new URL(
`/v1/organizations/${this.organization}/notifications`,
this.baseUrl
);
url.searchParams.set('page_size', '25');
if (this.cursor) {
url.searchParams.set('page_token', this.cursor);
}
const response = await fetch(url.toString(), {
headers: {
'Authorization': `Bearer ${this.accessToken}`
}
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const data: ListNotificationsResponse = await response.json();
// Update cursor for next poll
if (data.nextPageToken) {
this.cursor = data.nextPageToken;
}
return data.notifications || [];
}
async pollContinuously(
intervalSeconds: number = 30,
callback?: (notification: Notification) => void | Promise<void>
): Promise<void> {
while (true) {
try {
const notifications = await this.pollOnce();
if (notifications.length > 0 && callback) {
for (const notification of notifications) {
await callback(notification);
}
}
await new Promise(resolve => setTimeout(resolve, intervalSeconds * 1000));
} catch (error) {
console.error('Error polling notifications:', error);
await new Promise(resolve => setTimeout(resolve, intervalSeconds * 1000));
}
}
}
}
// Usage
const poller = new NotificationPoller(
'https://api.thirdfort.com/client/api',
'acme-corp',
'your_access_token'
);
// Poll every 30 seconds
poller.pollContinuously(30, async (notification) => {
console.log(`Received: ${notification.type}`);
// Process notification...
});
Best Practices
1. Store the Cursor
Always store the nextPageToken between polling cycles. This ensures you don't miss notifications or process duplicates.
# Good: Store cursor persistently
cursor = load_cursor_from_database()
poller.cursor = cursor
notifications = poller.poll_once()
save_cursor_to_database(poller.cursor)
2. Implement Deduplication
Use the notification ID (last segment of the name field) to prevent processing the same notification twice:
processed_ids = set()
def handle_notification(notification):
notification_id = notification["name"].split("/")[-1]
if notification_id in processed_ids:
return # Already processed
# Process notification
process(notification)
# Mark as processed
processed_ids.add(notification_id)
save_processed_id(notification_id)
3. Choose an Appropriate Polling Interval
Recommended intervals:
- 30-60 seconds: Standard use cases
- 10-30 seconds: Time-sensitive workflows
- 60-300 seconds: Low-priority notifications
Avoid polling more frequently than every 10 seconds to prevent rate limiting.
4. Handle Errors Gracefully
Implement exponential backoff for transient errors:
import time
def poll_with_backoff(poller, max_retries=3):
retries = 0
backoff = 1
while retries < max_retries:
try:
return poller.poll_once()
except requests.exceptions.RequestException as e:
retries += 1
if retries >= max_retries:
raise
print(f"Error polling (attempt {retries}/{max_retries}): {e}")
time.sleep(backoff)
backoff *= 2 # Exponential backoff
5. Filter Notifications
Use filters to retrieve only relevant notifications:
# Only check completion notifications
GET /v1/organizations/{org}/notifications?filter=type = "notification.client.check_complete.v1"
# Only recent notifications
GET /v1/organizations/{org}/notifications?filter=create_time > "2024-06-01T00:00:00Z"
# Combined filters
GET /v1/organizations/{org}/notifications?filter=type = "notification.client.check_complete.v1" AND create_time > "2024-06-01T00:00:00Z"
Note: URL-encode filter strings in actual requests.
Handling Notification Types
Different notification types contain different payload structures. Always check the @type field.
Available Notification Types
| Type | Payload | Description |
|---|---|---|
notification.client.check_complete.v1 | CheckCompleteData | A check has completed processing |
notification.client.report_generated.v1 | ReportGeneratedData | A report has been generated (e.g., finding reviewed) |
notification.client.ongoing_monitoring_hit.v1 | OngoingMonitoringHitData | Ongoing monitoring screening match detected |
notification.client.ongoing_monitoring_started.v1 | OngoingMonitoringStartedData | Ongoing monitoring started |
notification.client.ongoing_monitoring_renewed.v1 | OngoingMonitoringRenewedData | Ongoing monitoring renewed |
Handling Example
def handle_notification(notification):
payload = notification["payload"]
notification_type = notification["type"]
if notification_type == "notification.client.check_complete.v1":
check_name = payload["check"]
recommendation = payload["recommendation"]
print(f"Check {check_name} completed with recommendation: {recommendation}")
elif notification_type == "notification.client.report_generated.v1":
check_name = payload["check"]
print(f"Report generated for check: {check_name}")
elif notification_type == "notification.client.ongoing_monitoring_hit.v1":
check_name = payload["check"]
hit_count = payload.get("hitCount", 0)
print(f"Monitoring hit detected for {check_name}: {hit_count} hits")
else:
print(f"Unknown notification type: {notification_type}")
Idempotency
Notifications include a unique resource name (e.g., organizations/{org}/notifications/01JRA4ABC123). Use the notification ID portion (the last segment) for deduplication when processing notifications.
Backfilling Missed Notifications
If your polling service was offline, you can backfill missed notifications using time-based filtering:
from datetime import datetime, timedelta
def backfill_notifications(poller, since: datetime):
"""Retrieve all notifications since a specific time."""
# Format timestamp for filter
since_str = since.strftime("%Y-%m-%dT%H:%M:%SZ")
url = f"{poller.base_url}/v1/organizations/{poller.organization}/notifications"
params = {
"page_size": 100, # Use max page size for backfill
"filter": f'create_time > "{since_str}"'
}
all_notifications = []
while True:
response = requests.get(url, headers=poller.headers, params=params)
response.raise_for_status()
data = response.json()
notifications = data.get("notifications", [])
all_notifications.extend(notifications)
if "nextPageToken" not in data:
break
params["page_token"] = data["nextPageToken"]
return all_notifications
# Backfill last 24 hours
since = datetime.utcnow() - timedelta(hours=24)
missed_notifications = backfill_notifications(poller, since)
Monitoring Your Polling Service
Track these metrics to ensure reliable notification delivery:
- Poll success rate: Percentage of successful polls
- Notifications per poll: Average number of notifications retrieved
- Processing latency: Time from notification creation to processing
- Cursor age: Time since the cursor was last updated
import time
from datetime import datetime
class MonitoredPoller(NotificationPoller):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.metrics = {
"total_polls": 0,
"successful_polls": 0,
"total_notifications": 0,
"last_poll_time": None
}
def poll_once(self):
self.metrics["total_polls"] += 1
start_time = time.time()
try:
notifications = super().poll_once()
self.metrics["successful_polls"] += 1
self.metrics["total_notifications"] += len(notifications)
self.metrics["last_poll_time"] = datetime.utcnow()
# Log metrics
success_rate = (self.metrics["successful_polls"] / self.metrics["total_polls"]) * 100
print(f"Poll metrics: {success_rate:.1f}% success, "
f"{len(notifications)} notifications, "
f"{time.time() - start_time:.2f}s latency")
return notifications
except Exception as e:
print(f"Poll failed: {e}")
raise
Troubleshooting
No Notifications Returned
Possible causes:
- No new notifications since last poll
- Cursor is already at the latest notification
- Filter is too restrictive
Solution: Remove filters and check if notifications exist.
Duplicate Notifications
Possible causes:
- Cursor not being stored correctly
- Multiple polling instances running
Solution: Implement deduplication using notification IDs.
Missing Notifications
Possible causes:
- Polling service was offline
- Cursor was reset incorrectly
Solution: Use backfill pattern with time-based filtering.
Webhooks (Alternative to Polling)
For real-time push delivery instead of polling, Thirdfort can configure webhook endpoints at the organization level. Webhooks provide sub-second notification delivery and reduce API calls.
Webhook Setup
Contact api-support@thirdfort.com to configure webhooks for your organization. You'll need to provide:
- Your webhook endpoint URL (must be HTTPS)
- Any specific notification types you want to receive
Thirdfort will provide you with an HMAC secret for signature verification.
Webhook Signature Verification
All webhook requests include an x-thirdfort-signature header containing an HMAC-SHA256 signature. Always verify this signature before processing the notification.
Python Example:
import hashlib
import hmac
import base64
def verify_webhook(request, secret):
# Extract signature header
signature_header = request.headers.get("x-thirdfort-signature")
# Compute expected signature
expected_signature = base64.b64encode(
hmac.new(
secret.encode(),
request.body,
hashlib.sha256
).digest()
).decode()
# Compare signatures (constant-time comparison)
return hmac.compare_digest(signature_header, expected_signature)
# Usage
if verify_webhook(request, "your-hmac-secret"):
# Process notification
notification = request.json()
else:
# Invalid signature - reject request
return 401
Webhook Headers
Webhook requests include these headers:
x-thirdfort-notification-id- Unique notification IDx-thirdfort-notification-type- Notification type (e.g.,notification.client.check_complete.v1)x-thirdfort-signature- HMAC-SHA256 signature (base64-encoded)
Webhook Payload
The webhook payload structure matches the polling API response. Each notification includes:
name- Resource name (e.g.,organizations/{org}/notifications/{id})type- Notification typecreateTime- When the notification was createdpayload- Type-specific data with@typefield
Next Steps
- Production deployment: Set up monitoring and alerting
- Webhook setup: Contact api-support@thirdfort.com for real-time delivery
- Notification handling: Implement business logic for each notification type