Skip to main content

Polling for Notifications

Polling allows you to retrieve notifications by making periodic API requests. This guide shows you how to implement notification polling effectively.

Overview

The Thirdfort API provides organization-level notifications that you can retrieve by polling the ListNotifications endpoint. Notifications are returned in chronological order (oldest first), making them suitable for cursor-based consumption.

When to Use Polling

Use polling when:

  • You cannot expose a public webhook endpoint
  • You prefer simpler infrastructure (no webhook server required)
  • You have moderate notification volume
  • Slight delays (30-60 seconds) are acceptable

Use webhooks when:

  • You need real-time notifications (sub-second delivery)
  • You have high notification volume
  • You want to minimize API calls
  • You can maintain a public HTTPS endpoint

Basic Polling Pattern

Initial Request

Retrieve the first page of notifications:

GET /v1/organizations/{organization}/notifications?page_size=25

Response:

{
"notifications": [
{
"name": "organizations/acme-corp/notifications/01JRA4ABC123",
"type": "notification.client.check_complete.v1",
"createTime": "2024-06-15T10:30:00Z",
"payload": {
"@type": "type.googleapis.com/thirdfort.client.notifications.v1alpha1.CheckCompleteData",
"check": "organizations/acme-corp/teams/default/checks/01JRA3XYZ789",
"displayName": "John Smith ID Verification",
"state": "ACTIVE",
"recommendation": "APPROVE",
"completeTime": "2024-06-15T10:29:55Z"
}
}
],
"nextPageToken": "01JRA4ABC123"
}

Subsequent Requests

Use the nextPageToken from the previous response to get newer notifications:

GET /v1/organizations/{organization}/notifications?page_size=25&page_token=01JRA4ABC123

Implementation Examples

Python

import requests
import time
from typing import Optional

class NotificationPoller:
def __init__(self, base_url: str, organization: str, access_token: str):
self.base_url = base_url
self.organization = organization
self.headers = {"Authorization": f"Bearer {access_token}"}
self.cursor: Optional[str] = None

def poll_once(self) -> list:
"""Poll for new notifications once."""
url = f"{self.base_url}/v1/organizations/{self.organization}/notifications"
params = {"page_size": 25}

if self.cursor:
params["page_token"] = self.cursor

response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()

data = response.json()
notifications = data.get("notifications", [])

# Update cursor for next poll
if "nextPageToken" in data:
self.cursor = data["nextPageToken"]

return notifications

def poll_continuously(self, interval_seconds: int = 30, callback=None):
"""Poll continuously at the specified interval."""
while True:
try:
notifications = self.poll_once()

if notifications and callback:
for notification in notifications:
callback(notification)

time.sleep(interval_seconds)

except Exception as e:
print(f"Error polling notifications: {e}")
time.sleep(interval_seconds)

# Usage
def handle_notification(notification):
print(f"Received: {notification['type']}")
# Process notification...

poller = NotificationPoller(
base_url="https://api.thirdfort.com/client/api",
organization="acme-corp",
access_token="your_access_token"
)

# Poll every 30 seconds
poller.poll_continuously(interval_seconds=30, callback=handle_notification)

JavaScript/TypeScript

interface Notification {
name: string;
type: string;
createTime: string;
payload: any;
}

interface ListNotificationsResponse {
notifications: Notification[];
nextPageToken?: string;
}

class NotificationPoller {
private cursor: string | null = null;

constructor(
private baseUrl: string,
private organization: string,
private accessToken: string
) {}

async pollOnce(): Promise<Notification[]> {
const url = new URL(
`/v1/organizations/${this.organization}/notifications`,
this.baseUrl
);

url.searchParams.set('page_size', '25');
if (this.cursor) {
url.searchParams.set('page_token', this.cursor);
}

const response = await fetch(url.toString(), {
headers: {
'Authorization': `Bearer ${this.accessToken}`
}
});

if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}

const data: ListNotificationsResponse = await response.json();

// Update cursor for next poll
if (data.nextPageToken) {
this.cursor = data.nextPageToken;
}

return data.notifications || [];
}

async pollContinuously(
intervalSeconds: number = 30,
callback?: (notification: Notification) => void | Promise<void>
): Promise<void> {
while (true) {
try {
const notifications = await this.pollOnce();

if (notifications.length > 0 && callback) {
for (const notification of notifications) {
await callback(notification);
}
}

await new Promise(resolve => setTimeout(resolve, intervalSeconds * 1000));
} catch (error) {
console.error('Error polling notifications:', error);
await new Promise(resolve => setTimeout(resolve, intervalSeconds * 1000));
}
}
}
}

// Usage
const poller = new NotificationPoller(
'https://api.thirdfort.com/client/api',
'acme-corp',
'your_access_token'
);

// Poll every 30 seconds
poller.pollContinuously(30, async (notification) => {
console.log(`Received: ${notification.type}`);
// Process notification...
});

Best Practices

1. Store the Cursor

Always store the nextPageToken between polling cycles. This ensures you don't miss notifications or process duplicates.

# Good: Store cursor persistently
cursor = load_cursor_from_database()
poller.cursor = cursor

notifications = poller.poll_once()
save_cursor_to_database(poller.cursor)

2. Implement Deduplication

Use the notification ID (last segment of the name field) to prevent processing the same notification twice:

processed_ids = set()

def handle_notification(notification):
notification_id = notification["name"].split("/")[-1]

if notification_id in processed_ids:
return # Already processed

# Process notification
process(notification)

# Mark as processed
processed_ids.add(notification_id)
save_processed_id(notification_id)

3. Choose an Appropriate Polling Interval

Recommended intervals:

  • 30-60 seconds: Standard use cases
  • 10-30 seconds: Time-sensitive workflows
  • 60-300 seconds: Low-priority notifications

Avoid polling more frequently than every 10 seconds to prevent rate limiting.

4. Handle Errors Gracefully

Implement exponential backoff for transient errors:

import time

def poll_with_backoff(poller, max_retries=3):
retries = 0
backoff = 1

while retries < max_retries:
try:
return poller.poll_once()
except requests.exceptions.RequestException as e:
retries += 1
if retries >= max_retries:
raise

print(f"Error polling (attempt {retries}/{max_retries}): {e}")
time.sleep(backoff)
backoff *= 2 # Exponential backoff

5. Filter Notifications

Use filters to retrieve only relevant notifications:

# Only check completion notifications
GET /v1/organizations/{org}/notifications?filter=type = "notification.client.check_complete.v1"

# Only recent notifications
GET /v1/organizations/{org}/notifications?filter=create_time > "2024-06-01T00:00:00Z"

# Combined filters
GET /v1/organizations/{org}/notifications?filter=type = "notification.client.check_complete.v1" AND create_time > "2024-06-01T00:00:00Z"

Note: URL-encode filter strings in actual requests.

Handling Notification Types

Different notification types contain different payload structures. Always check the @type field.

Available Notification Types

TypePayloadDescription
notification.client.check_complete.v1CheckCompleteDataA check has completed processing
notification.client.report_generated.v1ReportGeneratedDataA report has been generated (e.g., finding reviewed)
notification.client.ongoing_monitoring_hit.v1OngoingMonitoringHitDataOngoing monitoring screening match detected
notification.client.ongoing_monitoring_started.v1OngoingMonitoringStartedDataOngoing monitoring started
notification.client.ongoing_monitoring_renewed.v1OngoingMonitoringRenewedDataOngoing monitoring renewed

Handling Example

def handle_notification(notification):
payload = notification["payload"]
notification_type = notification["type"]

if notification_type == "notification.client.check_complete.v1":
check_name = payload["check"]
recommendation = payload["recommendation"]
print(f"Check {check_name} completed with recommendation: {recommendation}")

elif notification_type == "notification.client.report_generated.v1":
check_name = payload["check"]
print(f"Report generated for check: {check_name}")

elif notification_type == "notification.client.ongoing_monitoring_hit.v1":
check_name = payload["check"]
hit_count = payload.get("hitCount", 0)
print(f"Monitoring hit detected for {check_name}: {hit_count} hits")

else:
print(f"Unknown notification type: {notification_type}")

Idempotency

Notifications include a unique resource name (e.g., organizations/{org}/notifications/01JRA4ABC123). Use the notification ID portion (the last segment) for deduplication when processing notifications.

Backfilling Missed Notifications

If your polling service was offline, you can backfill missed notifications using time-based filtering:

from datetime import datetime, timedelta

def backfill_notifications(poller, since: datetime):
"""Retrieve all notifications since a specific time."""
# Format timestamp for filter
since_str = since.strftime("%Y-%m-%dT%H:%M:%SZ")

url = f"{poller.base_url}/v1/organizations/{poller.organization}/notifications"
params = {
"page_size": 100, # Use max page size for backfill
"filter": f'create_time > "{since_str}"'
}

all_notifications = []

while True:
response = requests.get(url, headers=poller.headers, params=params)
response.raise_for_status()

data = response.json()
notifications = data.get("notifications", [])
all_notifications.extend(notifications)

if "nextPageToken" not in data:
break

params["page_token"] = data["nextPageToken"]

return all_notifications

# Backfill last 24 hours
since = datetime.utcnow() - timedelta(hours=24)
missed_notifications = backfill_notifications(poller, since)

Monitoring Your Polling Service

Track these metrics to ensure reliable notification delivery:

  • Poll success rate: Percentage of successful polls
  • Notifications per poll: Average number of notifications retrieved
  • Processing latency: Time from notification creation to processing
  • Cursor age: Time since the cursor was last updated
import time
from datetime import datetime

class MonitoredPoller(NotificationPoller):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.metrics = {
"total_polls": 0,
"successful_polls": 0,
"total_notifications": 0,
"last_poll_time": None
}

def poll_once(self):
self.metrics["total_polls"] += 1
start_time = time.time()

try:
notifications = super().poll_once()
self.metrics["successful_polls"] += 1
self.metrics["total_notifications"] += len(notifications)
self.metrics["last_poll_time"] = datetime.utcnow()

# Log metrics
success_rate = (self.metrics["successful_polls"] / self.metrics["total_polls"]) * 100
print(f"Poll metrics: {success_rate:.1f}% success, "
f"{len(notifications)} notifications, "
f"{time.time() - start_time:.2f}s latency")

return notifications

except Exception as e:
print(f"Poll failed: {e}")
raise

Troubleshooting

No Notifications Returned

Possible causes:

  • No new notifications since last poll
  • Cursor is already at the latest notification
  • Filter is too restrictive

Solution: Remove filters and check if notifications exist.

Duplicate Notifications

Possible causes:

  • Cursor not being stored correctly
  • Multiple polling instances running

Solution: Implement deduplication using notification IDs.

Missing Notifications

Possible causes:

  • Polling service was offline
  • Cursor was reset incorrectly

Solution: Use backfill pattern with time-based filtering.

Webhooks (Alternative to Polling)

For real-time push delivery instead of polling, Thirdfort can configure webhook endpoints at the organization level. Webhooks provide sub-second notification delivery and reduce API calls.

Webhook Setup

Contact api-support@thirdfort.com to configure webhooks for your organization. You'll need to provide:

  • Your webhook endpoint URL (must be HTTPS)
  • Any specific notification types you want to receive

Thirdfort will provide you with an HMAC secret for signature verification.

Webhook Signature Verification

All webhook requests include an x-thirdfort-signature header containing an HMAC-SHA256 signature. Always verify this signature before processing the notification.

Python Example:

import hashlib
import hmac
import base64

def verify_webhook(request, secret):
# Extract signature header
signature_header = request.headers.get("x-thirdfort-signature")

# Compute expected signature
expected_signature = base64.b64encode(
hmac.new(
secret.encode(),
request.body,
hashlib.sha256
).digest()
).decode()

# Compare signatures (constant-time comparison)
return hmac.compare_digest(signature_header, expected_signature)

# Usage
if verify_webhook(request, "your-hmac-secret"):
# Process notification
notification = request.json()
else:
# Invalid signature - reject request
return 401

Webhook Headers

Webhook requests include these headers:

  • x-thirdfort-notification-id - Unique notification ID
  • x-thirdfort-notification-type - Notification type (e.g., notification.client.check_complete.v1)
  • x-thirdfort-signature - HMAC-SHA256 signature (base64-encoded)

Webhook Payload

The webhook payload structure matches the polling API response. Each notification includes:

  • name - Resource name (e.g., organizations/{org}/notifications/{id})
  • type - Notification type
  • createTime - When the notification was created
  • payload - Type-specific data with @type field

Next Steps

  • Production deployment: Set up monitoring and alerting
  • Webhook setup: Contact api-support@thirdfort.com for real-time delivery
  • Notification handling: Implement business logic for each notification type