Worker Architecture
Referenced Files in This Document - app/workers/init.py - app/workers/listener.py - app/workers/sweeper.py - app/workers/webhook.py - app/core/config.py - docker-compose.yml - pyproject.toml - requirements.txt - server.py - app/blockchain/manager.py - app/services/webhook.py - chains.yaml
Table of Contents
- Introduction
- Project Structure
- Core Components
- Architecture Overview
- Detailed Component Analysis
- Dependency Analysis
- Performance Considerations
- Troubleshooting Guide
- Conclusion
- Appendices
Introduction
This document describes the ARQ worker architecture used in the cTrip Payment Gateway. It covers Redis connection configuration, worker initialization, task scheduling via ARQ cron jobs, lifecycle hooks, and the three main task modules: listener (confirmation/expiry), sweeper (fund settlement), and webhook dispatch. The system uses ARQ (async task queue) with Redis for background processing, orchestrated by a dedicated worker process launched via python run_worker.py or Docker Compose.
Project Structure
The worker architecture spans several modules:
- Workers: ARQ WorkerSettings, Redis connection helpers, and task functions for payment confirmation, sweeping, and webhook dispatch.
- Core configuration: Centralized settings including Redis URL, chains configuration, and secrets.
- Application entrypoint: FastAPI server that seeds chain states on startup; workers run as a separate process.
- Deployment: Docker Compose defines the database, Redis, API server, and worker service.
get_redis_settings()"] WWorker["app/workers/worker.py
WorkerSettings + cron jobs"] WListener["app/workers/listener.py
listen_for_payments task"] WSweeper["app/workers/sweeper.py
sweep_funds task"] WWebhook["app/workers/webhook.py
send_webhook_notification task"] WClient["app/workers/client.py
WorkerClient (enqueue from API)"] end subgraph "Core" Cfg["app/core/config.py
Settings + redis_url + chains"] Chains["chains.yaml
Chain configs"] end subgraph "App" Srv["server.py
FastAPI lifespan seeds chain states"] end subgraph "Deployment" Dc["docker-compose.yml
db + redis + app + worker"] RunW["run_worker.py
arq.run_worker(WorkerSettings)"] end WInit --> Cfg WWorker --> WListener WWorker --> WSweeper WWorker --> WWebhook WClient --> WInit Srv --> Cfg Dc --> RunW RunW --> WWorker
Diagram sources - app/workers/init.py - app/workers/listener.py - app/workers/sweeper.py - app/workers/webhook.py - app/core/config.py - chains.yaml - server.py - docker-compose.yml - requirements.txt - pyproject.toml
Section sources - app/workers/init.py - app/core/config.py - server.py - docker-compose.yml
Core Components
- ARQ Redis connection:
get_redis_settings()parsesREDIS_URLand returnsRedisSettingsfor ARQ. WorkerSettings: Defines all task functions, cron schedules, lifecycle hooks, and worker parameters.- Tasks:
listen_for_payments: Cron task (every second) that confirms detected payments and expires stale ones. Block detection is handled by always-on ChainSniper WebSocket listeners.sweep_funds: Cron task (every 30 seconds, currently commented out) that sweeps confirmed payments to the admin wallet.send_webhook_notification: Task for sending webhook notifications for payment events.retry_failed_webhooks: Cron task for retrying failed webhooks (placeholder).send_custom_webhook: Task for sending custom webhooks.- Lifecycle hooks:
startup: Launches ChainSniper WebSocket listeners viaScannerService.start_listeners().shutdown: Cancels running ChainSniper tasks.WorkerClient: Used by FastAPI admin endpoints to enqueue tasks on demand.
Key implementation references:
- ARQ WorkerSettings class in app/workers/worker.py
- Redis settings parsing in app/workers/__init__.py
- ChainSniper startup in app/services/blockchain/scanner.py
Section sources - app/workers/init.py - app/workers/listener.py - app/workers/sweeper.py - app/workers/webhook.py - app/core/config.py - chains.yaml - server.py
Architecture Overview
The ARQ worker architecture uses Redis as the task queue backend. The worker process is started separately from the API and runs ARQ's event loop. On startup, it launches ChainSniper WebSocket listeners for real-time block detection. Cron jobs handle periodic confirmation checks, sweeping, and webhook retries.
server.py"] Worker["ARQ Worker
python run_worker.py"] end subgraph "Worker Tasks" LFP["listen_for_payments
(cron: every second)"] SWP["sweep_funds
(cron: every 30s, disabled)"] WHT["send_webhook_notification"] end subgraph "Startup" Sniper["ChainSniper WebSocket Listeners
(one per chain with ws:// URL)"] end subgraph "External Services" DB["PostgreSQL"] CHAINS["Chains Config
chains.yaml"] end Worker --> Redis Worker --> Sniper LFP --> Redis SWP --> Redis WHT --> Redis Worker --> LFP Worker --> SWP Worker --> WHT LFP --> DB SWP --> DB Sniper --> DB Sniper --> CHAINS
Diagram sources - app/workers/init.py - server.py - docker-compose.yml - app/workers/listener.py - app/workers/sweeper.py - app/workers/webhook.py - chains.yaml
Detailed Component Analysis
ARQ Redis Connection and WorkerSettings
get_redis_settings()inapp/workers/__init__.pyparsesREDIS_URLand returns an ARQRedisSettingsobject.WorkerSettingsinapp/workers/worker.pyis the central ARQ configuration class defining:functions: list of task functions available to the workercron_jobs: scheduled tasks using ARQ'scron()helperon_startup/on_shutdown: lifecycle hooksredis_settings: connection to Redismax_jobs,job_timeout,keep_result,max_tries: operational parameters
Worker Initialization and Startup
- The worker process is started via
python run_worker.py, which callsarq.run_worker(WorkerSettings). - In Docker Compose, the
workerservice runspython run_worker.py. - On startup, the
startuphook callsScannerService.start_listeners()which launches one ChainSniper WebSocket listener per chain that has aws://orwss://RPC URL. - References to running tasks are stored in
_sniper_tasksto prevent garbage collection.
Operational flow:
- Docker Compose builds the image and runs python run_worker.py.
- ARQ loads WorkerSettings, connects to Redis, and starts the event loop.
- startup hook fires, launching ChainSniper listeners.
- Cron jobs begin executing on their schedules.
Task Scheduling and Cron Jobs
- ARQ uses
cron()to schedule tasks at specific second/minute intervals. listen_for_paymentsruns every second (second=set(range(60))).sweep_fundsis defined but commented out in the currentWorkerSettings.- Tasks are enqueued by ARQ's scheduler and consumed by the same worker process.
Payment Detection vs Confirmation Split
- Detection (real-time): ChainSniper WebSocket listeners call
ScannerService._on_block()andScannerService._on_log()for every new block/log. These update payment status toDETECTEDimmediately. - Confirmation (cron):
listen_for_paymentscallsScannerService.confirm_payments()every second to promoteDETECTEDpayments toCONFIRMEDonce enough blocks have passed. - Expiry (cron):
listen_for_paymentsalso callsScannerService.check_expired_payments()to mark stale payments asEXPIRED.
Admin API and WorkerClient
app/workers/client.pyprovidesWorkerClient, which usesarq.create_pool()to enqueue tasks from FastAPI endpoints.app/api/admin.pyexposes/admin/*endpoints for manual task triggering:POST /admin/scan-now— triggerslisten_for_paymentsPOST /admin/sweep-now— triggerssweep_fundsPOST /admin/sweep-address— triggerssweep_specific_addressPOST /admin/process-payment— triggersprocess_single_paymentPOST /admin/send-webhook— triggerssend_webhook_notificationPOST /admin/custom-webhook— triggerssend_custom_webhook- listen_for_payments: Scans chains and confirms payments; schedules next run after completion.
- sweep_funds: Iterates chains and performs sweeping actions; schedules next run after completion.
- send_webhook_task: Sends webhooks asynchronously with retries; raises exceptions to trigger ARQ retries.
Execution patterns: - Periodic rescheduling via send_with_options(delay=...) - Asynchronous execution using asyncio loops within actors.
Diagram sources - server.py - app/workers/listener.py
Section sources - app/workers/listener.py - app/workers/sweeper.py - app/workers/webhook.py
Chain Configuration and Runtime Behavior
- Chains are loaded from chains.yaml at runtime; if absent, defaults are applied.
- Actors iterate over configured chains to perform operations.
Behavioral notes: - Empty chains fallback to a default chain. - Chain-specific RPC endpoints are used by blockchain services.
Section sources - app/core/config.py - chains.yaml - app/blockchain/manager.py
Webhook Actor and Retry Strategy
- The webhook actor runs on a dedicated event loop and raises exceptions to trigger ARQ retries.
- WebhookService signs payloads when a secret is provided and handles HTTP errors.
Diagram sources - app/workers/webhook.py - app/services/webhook.py
Section sources - app/workers/webhook.py - app/services/webhook.py
Dependency Analysis
- ARQ and Redis are declared as dependencies in both requirements and pyproject.
- The worker initialization module depends on settings for Redis URL.
- Actors depend on settings for chain configuration and on external services for blockchain and webhook operations.
- The server depends on worker modules to trigger initial tasks.
Diagram sources - requirements.txt - pyproject.toml - app/workers/init.py - app/core/config.py - server.py - app/workers/listener.py - app/workers/sweeper.py - app/workers/webhook.py
Section sources - requirements.txt - pyproject.toml - app/workers/init.py - app/core/config.py - server.py
Performance Considerations
- Event loop management: The webhook actor maintains a persistent event loop to avoid overhead from creating new loops per task.
- Time limits and retries: Actors define time limits and retry policies to bound execution and improve resilience.
- Periodic scheduling: Actors reschedule themselves after completing cycles; tune delays to balance throughput and resource usage.
- Redis connectivity: Ensure the Redis URL is reachable and optimized for latency; consider connection pooling and network topology.
- Chain enumeration: Limit the number of chains processed per cycle to reduce contention and improve responsiveness.
[No sources needed since this section provides general guidance]
Troubleshooting Guide
Common issues and remedies: - Redis connectivity failures: Verify REDIS_URL and network reachability; check Redis service status. - Missing chains.yaml: If chains.yaml is missing or invalid, actors fall back to default chain behavior; ensure the file exists and is valid. - Webhook failures: Inspect webhook actor logs and WebhookService error handling; verify signatures and timeouts. - Actor not running: Confirm the worker service is started with the correct ARQ command and that modules are importable. - Health checks: Use the /health endpoint to validate API availability.
Section sources - app/core/config.py - app/services/webhook.py - docker-compose.yml - app/api/health.py
Conclusion
The cTrip Payment Gateway employs a straightforward ARQ worker architecture centered on a Redis broker. Workers are deployed as a separate service and consume tasks enqueued by the FastAPI server. Actors encapsulate distinct responsibilities—payment scanning, sweeping, and webhook dispatch—with built-in scheduling and retry mechanisms. Configuration is centralized via settings and chains.yaml, enabling flexible chain support. The architecture supports horizontal scaling by running multiple worker instances against the same broker.
[No sources needed since this section summarizes without analyzing specific files]
Appendices
Configuration Options and Parameters
- Redis connectivity
- redis_url: Connection string for Redis broker
- Broker parameters
- Broker is initialized with the Redis URL; additional broker options can be passed during construction
- Worker pool sizing
- Adjust the number of worker processes via Docker Compose replicas for concurrency
- Chain configuration
- chains_yaml_path: Path to chains.yaml
- chains: Loaded chain configurations for runtime iteration
Section sources - app/core/config.py - chains.yaml - docker-compose.yml
Deployment Patterns and Load Distribution
- Single Redis broker: All workers share the same broker for task distribution.
- Multiple worker instances: Scale horizontally by increasing worker replicas; tasks are distributed automatically.
- Isolation: Separate services for app and worker ensure process isolation; containers share the same Redis backend.
Section sources - docker-compose.yml