Listener Worker
Referenced Files in This Document - listener.py - scanner.py - w3.py - manager.py - base.py - ethereum.py - bsc.py - anvil.py - config.py - payment.py - chain.py - token.py - webhook.py - webhook_worker.py - async_session.py - chains.yaml - workers_init.py
Table of Contents
- Introduction
- Project Structure
- Core Components
- Architecture Overview
- Detailed Component Analysis
- Dependency Analysis
- Performance Considerations
- Troubleshooting Guide
- Conclusion
- Appendices
Introduction
This document describes the Listener Worker responsible for blockchain payment monitoring in the cTrip Payment Gateway. It explains the two-part detection architecture:
- Real-time detection via ChainSniper WebSocket listeners (started on worker startup) — these push new blocks and ERC20 logs directly to handler functions, updating payment status to
DETECTEDimmediately. - Periodic confirmation and expiry via the
listen_for_paymentsARQ cron task (runs every second) — this promotesDETECTEDpayments toCONFIRMEDonce enough blocks have passed, and marks stale payments asEXPIRED.
It also covers integration with blockchain managers, scanner services, and payment processing workflows; ARQ cron scheduling; error handling; and interaction with external blockchain APIs via WebSocket.
Project Structure
The Listener Worker resides in the workers package and orchestrates scanning via a service layer that interacts with blockchain providers and the database. Key modules include: - Workers: listener actor and webhook sender - Services: blockchain scanner service - Blockchain: provider abstraction and chain-specific implementations - Database models: payment, chain state, token - Configuration: runtime settings and chain configuration file - Asynchronous database session management
listen_for_payments actor"] W["webhook.py
send_webhook_task actor"] end subgraph "Services" S["scanner.py
ScannerService"] WH["webhook.py
WebhookService"] end subgraph "Blockchain Layer" M["manager.py
get_blockchains()"] W3["w3.py
get_w3()"] B["base.py
BlockchainBase"] E["ethereum.py
EthereumBlockchain"] BSC["bsc.py
BSCBlockchain"] A["anvil.py
AnvilBlockchain"] end subgraph "Database" P["payment.py
Payment model"] C["chain.py
ChainState model"] T["token.py
Token model"] AS["async_session.py
AsyncSessionLocal"] end subgraph "Config" CFG["config.py
Settings"] Y["chains.yaml"] end L --> S S --> W3 W3 --> M M --> B B --> E B --> BSC B --> A S --> P S --> C S --> T S --> AS L --> CFG L --> Y S --> WH W --> WH
Diagram sources - listener.py - scanner.py - w3.py - manager.py - base.py - ethereum.py - bsc.py - anvil.py - payment.py - chain.py - token.py - webhook.py - webhook_worker.py - async_session.py - config.py - chains.yaml
Section sources - listener.py - scanner.py - w3.py - manager.py - base.py - payment.py - chain.py - token.py - webhook.py - webhook_worker.py - async_session.py - config.py - chains.yaml
Core Components
- ChainSniper WebSocket Listeners: Started once on worker startup via
ScannerService.start_listeners(). One listener per chain with aws://orwss://RPC URL. Calls_on_block()for native transfers and_on_log()for ERC20 Transfer events. listen_for_paymentsARQ cron task: Runs every second. CallsScannerService.confirm_payments()for each chain andScannerService.check_expired_payments()globally.- Scanner Service: Handles both real-time detection callbacks and periodic confirmation/expiry logic.
- Blockchain Provider Abstraction: AsyncWeb3-backed clients per chain with POA support, gas estimation, and transaction building.
- Database Models: Payment, ChainState, Token; used to track pending payments, last scanned block, and token metadata.
- Webhook Integration: Dispatched directly from
ScannerServicewhen a payment is detected or confirmed.
Key configuration:
- CONFIRMATIONS_REQUIRED = 1 in app/workers/listener.py (configurable)
- Chains loaded from chains.yaml via get_enabled_chains() in app/workers/utils.py
- WebSocket RPC URLs required in chains.yaml for detection to work
Section sources - listener.py - scanner.py - config.py - chains.yaml - webhook_worker.py
Architecture Overview
The Listener Worker runs periodically, invoking the Scanner Service for each configured chain. The Scanner Service queries the latest block number, determines a block range based on ChainState and batch size, and scans blocks for matching payments. It updates payment status to detected and later to confirmed after sufficient confirmations. On confirmation, the worker optionally triggers a webhook notification.
Diagram sources - listener.py - scanner.py - scanner.py - w3.py - manager.py - base.py - webhook_worker.py - webhook.py
Detailed Component Analysis
Listener Worker
Responsibilities: - Periodic execution via ARQ task. - Builds ScannerService with configuration constants. - Iterates over configured chains, scanning and confirming payments. - Schedules the next run with a fixed delay.
Behavioral notes: - Uses a time limit and disables retries at the actor level. - Schedules the next run unconditionally after each cycle. - Logs lifecycle events and exceptions.
Operational parameters: - Confirmation threshold and block batch size are constants in the module. - Chain list comes from settings.chains, defaults to a local chain if none configured.
Scheduling pattern: - Actor completion triggers a delayed message (5 seconds) to repeat the cycle.
Error handling: - Catches exceptions during the run and logs them. - Ensures the next scheduled run is still queued.
Section sources - listener.py - listener.py - config.py - chains.yaml
Scanner Service
Responsibilities: - Scan a single chain for new blocks and detect payments. - Confirm detected payments based on required confirmations. - Persist ChainState and update Payment records.
Scan algorithm: - Load ChainState for the chain; skip if not found. - Determine from/to block window using last scanned block and batch size. - If no pending payments, update last scanned block and return. - For each block in the window: - Native transfers: iterate transactions, match destination address, compare amount. - ERC20 transfers: fetch Transfer logs, match topics and token address, compare value. - Update ChainState.last_scanned_block and commit.
Confirmation algorithm: - For each detected payment, compute confirmations as latest block minus detected block plus one. - If confirmations meet or exceed required threshold, mark as confirmed and optionally emit a webhook.
Logging: - Emits informational messages for detected and confirmed payments, and scan progress.
Section sources - scanner.py - scanner.py - scanner.py - chain.py - payment.py - token.py
Blockchain Provider Abstraction
- BlockchainBase encapsulates AsyncWeb3 provider creation, POA middleware injection, and gas/fee helpers.
- Chain-specific subclasses set chain IDs and POA flags.
- get_w3 resolves AsyncWeb3 instances from configured chains.
Timeouts and reliability: - AsyncHTTPProvider supports a request timeout parameter. - Connection checks and error logging are provided.
Section sources - base.py - base.py - ethereum.py - bsc.py - anvil.py - w3.py - manager.py
Database Models and Sessions
- Payment tracks chain, address, amount, status, confirmations, detection block, expiry, and timestamps.
- ChainState tracks last scanned block per chain.
- Token stores chain, address, symbol, decimals, and enabled flag.
- AsyncSessionLocal provides async SQLAlchemy sessions for the worker’s lifecycle.
Section sources - payment.py - chain.py - token.py - async_session.py
Webhook Integration
- WebhookService sends asynchronous HTTP requests with optional HMAC signature.
- send_webhook_task is a ARQ task that invokes WebhookService and raises on failure to trigger retries.
- ScannerService conditionally enqueues a webhook task upon confirmation.
Section sources - webhook.py - webhook_worker.py - scanner.py - config.py
Task Scheduling and Polling
- The listener actor is scheduled by the broker; after each run completes, it schedules the next run with a fixed delay.
- Block scanning uses a batch size to limit work per cycle.
- Confirmation checks use the latest block number to evaluate confirmations.
Configuration: - Fixed delay between cycles is set in the listener. - Batch size and confirmation threshold are constants in the listener and scanner respectively.
Section sources - listener.py - listener.py - scanner.py
Transaction Detection Algorithms
- Native transfers: Match transaction recipient address against payment addresses and compare transferred value with required amount.
- ERC20 transfers: Filter logs by Transfer topic, decode destination from topic index, match token address, and compare log data value.
Complexity considerations: - Native scanning iterates all transactions per block; ER20 scanning fetches logs per block. - Address matching uses hash maps keyed by lowercased addresses for O(1) lookup.
Section sources - scanner.py - scanner.py
Payment Verification and Status Transitions
- Pending payments are matched against detected transfers; status updated to detected.
- After sufficient confirmations, status transitions to confirmed.
- Webhook is emitted on confirmation if configured.
Section sources - scanner.py - scanner.py - payment.py
Error Handling, Retry Mechanisms, and Failure Recovery
- Listener catches exceptions, logs them, and still schedules the next run.
- Webhook actor raises on failure to enable ARQ retries with a small backoff.
- WebhookService logs HTTP errors and generic exceptions, returning failure status.
- BlockchainBase logs connection failures and warns on gas estimation fallbacks.
Recovery: - Next cycle resumes scanning from ChainState.last_scanned_block. - Webhook failures are retried by the broker until successful or exhausted.
Section sources - listener.py - webhook_worker.py - webhook.py - base.py - base.py
External API Interaction, Rate Limiting, and Timeouts
- AsyncWeb3 uses AsyncHTTPProvider with a configurable request timeout.
- Gas and fee estimation include caching and fallbacks.
- get_logs and get_block are called per block; batching reduces load.
- WebhookService uses an async HTTP client with a short timeout.
Recommendations: - Monitor provider latency and adjust batch size accordingly. - Consider provider-side rate limits and implement backoff if needed. - Use chain-specific timeouts and circuit breaker patterns if available.
Section sources - base.py - base.py - base.py - webhook.py - scanner.py
Performance Monitoring, Logging, and Debugging
- Logging is used extensively in scanner, listener, and webhook components.
- Key metrics include detected and confirmed counts per cycle.
- Debugging tips:
- Verify ChainState last scanned block alignment with actual chain progress.
- Confirm payment addresses and token addresses match chain configuration.
- Inspect webhook delivery logs and signatures.
Section sources - scanner.py - scanner.py - scanner.py - webhook_worker.py
Dependency Analysis
The listener depends on the scanner service and configuration; the scanner depends on database models, AsyncWeb3 via get_w3, and the webhook service. Providers are resolved through a manager that maps chain names to implementations.
Diagram sources - listener.py - scanner.py - w3.py - manager.py - base.py - ethereum.py - bsc.py - anvil.py - payment.py - chain.py - token.py - webhook.py - webhook_worker.py - async_session.py - config.py - chains.yaml
Section sources - listener.py - scanner.py - w3.py - manager.py - base.py - payment.py - chain.py - token.py - webhook.py - webhook_worker.py - async_session.py - config.py - chains.yaml
Performance Considerations
- Block batch size controls work per cycle; larger batches increase throughput but risk timeouts or overload.
- Confirmation threshold balances safety and latency; higher thresholds reduce reorg risk but delay finalization.
- Gas and fee estimation cache reduces repeated provider calls.
- Webhook delivery is asynchronous; tune broker concurrency and retry policies.
- Consider provider rate limits and implement backoff or pagination if needed.
[No sources needed since this section provides general guidance]
Troubleshooting Guide
Common issues and resolutions: - No chain state found: Ensure ChainState entries exist for each chain before scanning. - Payments not detected: Verify addresses and amounts; check token decimals and ERC20 topic filtering. - Webhook failures: Check URL, secret, and network connectivity; inspect webhook logs for HTTP errors. - Timeout errors: Increase provider request timeout or reduce batch size; monitor provider latency. - Reorgs: Increase confirmation threshold to mitigate late-mine scenarios.
Section sources - scanner.py - scanner.py - webhook.py - base.py
Conclusion
The Listener Worker provides a robust, modular pipeline for monitoring blockchain payments. It leverages asynchronous primitives, configurable chain providers, and a clear status lifecycle to reliably detect and confirm payments. With proper configuration of batch sizes, confirmation thresholds, and webhook delivery, it integrates cleanly with the broader payment gateway infrastructure while offering resilient error handling and observability.
[No sources needed since this section summarizes without analyzing specific files]
Appendices
Configuration Reference
- Chains configuration file path and content format.
- Runtime settings for database, RPC, Redis, and webhook parameters.
Section sources - config.py - chains.yaml
Class Relationships
Diagram sources - base.py - ethereum.py - bsc.py - anvil.py