Push Notifications With Temporal: A Complete Guide
Push notifications are the lifeblood of modern mobile applications, keeping users informed, engaged, and connected in real-time. Whether it's an urgent OTP, a critical security alert, a timely appointment reminder, or a general app update, the ability to deliver these messages directly to a user's device is paramount. Without them, mobile apps risk feeling disconnected and unresponsive, leading to decreased user satisfaction and engagement. This article delves into a robust solution for implementing a push notification system, leveraging the power of Temporal Workers for reliable, asynchronous delivery.
The Challenge: Bridging the Gap in Mobile Communication
Many applications face a common hurdle: the lack of native push notification support for their mobile apps, spanning both Android and iOS platforms. This absence means users miss out on crucial real-time information. Imagine the frustration of not receiving an One-Time Password (OTP) on your phone when trying to log in, or missing a vital security alert that could protect your account. Similarly, appointment reminders might not arrive, leading to missed meetings, and general app notifications fail to keep users updated on new features or important information. This disconnect not only impacts user experience but also hinders the app's ability to foster a loyal and active user base. Push notifications are not just a feature; they are a necessity for effective user engagement and real-time communication in today's fast-paced digital world. The existing infrastructure might lack a dedicated system to handle these critical messages, necessitating a well-defined solution.
Our Proposed Solution: A Scalable Push Notification System
To address these challenges, we propose a sophisticated, table-based asynchronous push notification system. This system is designed to integrate seamlessly with your existing Temporal infrastructure and leverage Firebase Cloud Messaging (FCM) for cross-platform delivery. Our approach involves several key components working in harmony to ensure reliable notification dispatch. At its core, we introduce a dedicated push_notifications collection in your database. This collection is meticulously designed with expiry support, ensuring that outdated notifications are automatically handled. Complementing the database is the PushNotificationWriter, responsible for meticulously recording every notification request, and the PushNotificationReader, which retrieves these records for processing. The orchestrator of this system is the Master Worker (or PushNotificationsWorker), a Temporal worker scheduled to run periodically, typically via a cron job every five minutes. This master worker scans for pending notifications. For each pending notification, it triggers an Individual Worker (or PushNotificationWorker), which is tasked with the actual sending of the push notification to devices. The backbone of our delivery mechanism is FCM integration, enabling us to send notifications to both Android and iOS devices efficiently. We also incorporate an immediate mode, allowing for synchronous execution of the worker for high-priority, urgent notifications that require instant delivery. Conversely, the async mode ensures that standard notifications are queued and processed efficiently by the master worker, maintaining system responsiveness and preventing overload.
Database Schema: The push_notifications Collection
The foundation of our push notification system lies in a well-structured database collection. We define a PushNotificationModel to capture all necessary details for each notification. This model includes essential fields such as account_id to link notifications to specific users, and a status field that tracks the notification's lifecycle through states like 'pending', 'processing', 'sent', 'delivered', 'failed', and 'expired'. A priority field distinguishes between 'immediate' and 'normal' notifications. Crucially, we include fields for the push-specific content: title, body, and a flexible data payload for custom information. Optional fields like image_url and click_action enhance the notification's richness. Provider-specific details, like provider_message_ids and lists of device_tokens_used and invalid_tokens, help in tracking and debugging. To manage reliability, we incorporate retry_count, max_retries, and next_retry_at fields. The expires_at field allows for time-bound notifications, preventing stale messages from being processed. Error tracking is facilitated by error_message and error_type. Finally, timestamps for created_at, updated_at, sent_at, and delivered_at provide a complete audit trail. For efficient querying, we establish critical indexes on status, expires_at, next_retry_at, account_id, and created_at.
@dataclass
class PushNotificationModel(BaseModel):
id: ObjectId
account_id: str
status: str # 'pending', 'processing', 'sent', 'delivered', 'failed', 'expired'
priority: str # 'immediate', 'normal'
# Push-specific fields
title: str
body: str
data: Dict[str, str] # Custom data payload
image_url: Optional[str] = None
click_action: Optional[str] = None
# Provider fields
provider: str = 'fcm'
provider_message_ids: List[str] = [] # One per device
device_tokens_used: List[str] = [] # Tokens used for this notification
invalid_tokens: List[str] = [] # Tokens that failed
# Retry fields
retry_count: int = 0
max_retries: int = 4
next_retry_at: Optional[datetime] = None
# Expiry
expires_at: Optional[datetime] = None
# Error tracking
error_message: Optional[str] = None
error_type: Optional[str] = None
# Timestamps
created_at: datetime
updated_at: datetime
sent_at: Optional[datetime] = None
delivered_at: Optional[datetime] = None
active: bool = True
Indexes:
indexes = [
{"keys": [("status", 1), ("expires_at", 1), ("next_retry_at", 1)]},
{"keys": [("account_id", 1), ("created_at", -1)]},
]
Implementation Plan: A Phased Approach
Our implementation follows a structured, phased approach to ensure modularity and maintainability. This allows us to build and test each component systematically.
Phase 1: Database Layer
This initial phase focuses on establishing the persistence layer for our notifications. We will create the PushNotificationModel which defines the structure of our notification data, ensuring all necessary fields are present and correctly typed. Following this, the PushNotificationRepository will be developed. This repository acts as an interface to our database, providing methods for basic CRUD operations and, critically, for setting up the essential indexes outlined earlier. These indexes are vital for efficient querying, particularly for the master worker that needs to fetch pending notifications. Finally, we'll integrate these components into the application's bootstrap process, ensuring the collection and indexes are created upon application startup if they don't already exist.
-
Files to create:
modules/notification/internals/store/push_notification_model.pymodules/notification/internals/store/push_notification_repository.py
-
Tasks:
- [X] Create
PushNotificationModel - [X] Create
PushNotificationRepositorywith indexes - [X] Add to bootstrap process
- [X] Create
Phase 2: Writer & Reader Layers
With the database layer in place, we move to abstracting the logic for writing and reading notification data. The PushNotificationWriter class will encapsulate the logic for creating new notification records and updating their statuses and other relevant fields. This includes handling the initial 'pending' state, updating to 'processing', and finalizing with 'sent', 'failed', or 'expired' statuses. The PushNotificationReader class will provide a clean interface for fetching notification data, primarily by its ID, and will raise a specific PushNotificationNotFoundError if a notification cannot be found. This separation ensures that business logic doesn't directly interact with database queries, promoting cleaner code and easier testing. Unit tests will be crucial here to verify the correct data manipulation and error handling.
-
Files to create:
modules/notification/internals/push_notification_writer.pymodules/notification/internals/push_notification_reader.py
-
Tasks:
- [X] Create
PushNotificationWriterclass - [X] Create
PushNotificationReaderclass - [X] Add unit tests
- [X] Create
Phase 3: FCM Service Integration
This phase is dedicated to integrating with Firebase Cloud Messaging, the service that will handle the actual delivery of notifications to Android and iOS devices. We'll create an FCMService class, responsible for initializing the Firebase Admin SDK using provided credentials and configuration. The core functionality will be the send_push method, which takes a list of device tokens and the notification payload, constructs a MulticastMessage for efficient sending to multiple devices, and handles the response from FCM. This method will differentiate between successful sends, failures, and importantly, identify invalid or unregistered device tokens. A companion FCMParams class will include validation logic to ensure that the notification parameters (like title and body length) adhere to FCM's limits before attempting to send. This proactive validation prevents unnecessary errors and improves the reliability of the service. Error handling will be robust, catching FirebaseError exceptions and translating them into our internal ServiceError for consistent error management.
-
Files to create:
modules/notification/internals/fcm_service.pymodules/notification/internals/fcm_params.py
-
Tasks:
- [X] Create
FCMServiceclass - [X] Implement
send_push()method - [X] Create
FCMParamsvalidation - [X] Add unit tests
- [X] Create
Phase 4: Temporal Workers
This is where the orchestration power of Temporal comes into play. We will define two key Temporal workers. The PushNotificationsWorker (Master Worker) will be scheduled to run periodically (e.g., every 5 minutes via cron). Its primary role is to query the push_notifications collection for records that are 'pending', active, not expired, and ready for processing (considering next_retry_at). For each eligible notification, it will trigger an instance of the PushNotificationWorker (Individual Worker). This individual worker is responsible for fetching the full notification details, checking user preferences (unless it's an immediate notification), retrieving active device tokens for the associated account, constructing the notification payload, and finally sending it via the FCMService. It will then update the notification status based on the FCM response, marking any invalid tokens and handling retries with exponential backoff for transient errors. Error classification will determine if a notification should be retried or marked as permanently failed.
-
Files to create:
modules/notification/workers/push_notifications_worker.py(Master)modules/notification/workers/push_notification_worker.py(Individual)
-
Tasks:
- [X] Create
PushNotificationsWorker(master) - [X] Create
PushNotificationWorker(individual) - [X] Update error classification for FCM errors
- [X] Add to
temporal_config.py - [X] Add unit tests
- [X] Create
Phase 5: Error Classification (Update)
To ensure our retry mechanism is intelligent, we need to update the ErrorClassifier. This class will be enhanced to recognize specific error types returned by FCM. For instance, certain FCM errors, like invalid-argument or unregistered token errors, indicate permanent issues and should not be retried. Other errors, such as unavailable, internal, or deadline-exceeded, often represent transient issues and are suitable for retries. By accurately classifying these errors, we can optimize the retry logic, preventing unnecessary attempts for non-recoverable errors and ensuring timely retries for temporary glitches, thus improving the overall reliability and efficiency of the notification system.
-
File to update:
modules/notification/internals/error_classifier.py
-
Tasks:
- [X] Add FCM error classification
- [X] Add unit tests
Phase 6: Update NotificationService
We need to integrate the new push notification functionality into our main NotificationService. A new method, send_push_for_account, will be added. This method will serve as the primary entry point for triggering push notifications. It will utilize the PushNotificationWriter to create the initial notification record in the database. Crucially, it will handle the immediate flag: if set to True, it will directly invoke the PushNotificationWorker synchronously using ApplicationService.run_worker_immediately. If immediate is False, the notification will be queued for the master worker to pick up later. This method will also accept an expires_at parameter, allowing for time-bound notifications. This integration ensures that the push notification system is accessible through a consistent service layer, simplifying its usage across the application.
-
File to update:
modules/notification/notification_service.py
-
Tasks:
- [X] Add
send_push_for_account()method - [X] Add push types to
types.py - [X] Update authentication service for push OTPs
- [X] Add
Phase 7: Types & Configuration
To maintain code clarity and consistency, we’ll define specific data transfer objects (DTOs) for push notifications in modules/notification/types.py. This includes SendPushParams for the notification content and SendPushResult for the outcome of sending operations. Alongside these types, we need to configure FCM. This involves adding a fcm section to our configuration files (e.g., config/development.yml), specifying the path to the Firebase service account credentials file (credentials_path). This configuration is essential for the FCMService to authenticate with Firebase and send notifications.
-
File to update:
modules/notification/types.py
-
Configuration:
# config/development.yml
fcm:
credentials_path: "./firebase-adminsdk.json"
- Tasks:
- [X] Add push types
- [X] Add FCM configuration
- [X] Add Firebase service account credentials
Phase 8: Worker Registration & Cron
Finally, we need to ensure our new Temporal workers are recognized by the system and that the master worker runs on schedule. This involves updating temporal_config.py to include both PushNotificationsWorker and PushNotificationWorker in the WORKERS list. Additionally, we'll use ApplicationService.schedule_worker_as_cron to schedule the PushNotificationsWorker to run every 5 minutes using the cron expression */5 * * * *. This step makes our workers discoverable and ensures the automated processing of notifications.
-
File to update:
temporal_config.py
-
Bootstrap:
# Schedule push master worker
ApplicationService.schedule_worker_as_cron(
cls=PushNotificationsWorker,
cron_schedule="*/5 * * * *"
)
- Tasks:
- [X] Add workers to
temporal_config.py - [X] Schedule master worker cron
- [X] Test execution
- [X] Add workers to
Usage Examples
Here are a few examples demonstrating how to utilize the new push notification service:
Example 1: Security Alert (Immediate)
For critical alerts that require immediate delivery, such as a new login detection, you can use the immediate=True flag. This ensures the notification is sent without delay.
notification_id = NotificationService.send_push_for_account(
account_id=user.id,
params=SendPushParams(
title="Security Alert",
body="New login from Chrome on Windows",
data={"type": "login_alert", "session_id": "abc123"}
),
immediate=True
)
Example 2: Appointment Reminder (Async with Expiry)
For scheduled events like appointment reminders, you can send them asynchronously and set an expiry time. This prevents notifications from being sent if they are no longer relevant (e.g., if the appointment has passed).
notification_id = NotificationService.send_push_for_account(
account_id=user.id,
params=SendPushParams(
title="Appointment Reminder",
body="Your appointment is in 1 hour",
data={"type": "appointment", "appointment_id": "appt123"},
click_action="/appointments/appt123"
),
immediate=False,
expires_at=appointment_time
)
Example 3: OTP (Immediate)
One-Time Passwords (OTPs) are a prime use case for immediate push notifications. They need to be delivered instantly and can have a short expiry.
notification_id = NotificationService.send_push_for_account(
account_id=user.id,
params=SendPushParams(
title="Your OTP Code",
body=f"Your OTP is {otp_code}",
data={"type": "otp", "code": otp_code}
),
immediate=True,
expires_at=datetime.now() + timedelta(minutes=10)
)