Monitoring On-Chain Events in Real-Time
Monitoring On-Chain Events in Real-Time
Real-time event monitoring is critical for trading bots, liquidation systems, and yield optimization. Miss an event by a few blocks and you’ve missed your opportunity. Here’s what actually works after running production systems for over a year.
Why Event Monitoring Matters
On-chain events tell you when something happened - a swap executed, liquidity added, price oracle updated, governance proposal created. If you’re building automation, you need to react to these events fast.
Common use cases:
- Trading bots watching for large swaps that move prices
- Liquidation bots monitoring health factors
- Yield aggregators tracking APY changes
- MEV searchers scanning mempool for opportunities
The challenge: doing this reliably without missing events or drowning in noise.
The Basic Approach: WebSocket Subscriptions
Most people start with WebSocket connections to their RPC provider:
from web3 import Web3
# Connect via WebSocket
w3 = Web3(Web3.WebsocketProvider('wss://eth-mainnet.g.alchemy.com/v2/YOUR_KEY'))
# Subscribe to new blocks
def handle_block(block_hash):
block = w3.eth.get_block(block_hash, full_transactions=True)
# Process transactions
block_filter = w3.eth.filter('latest')
w3.eth.watch_filter(block_filter, handle_block)
This works for demos. In production, it fails constantly.
Problems:
- WebSocket connections drop randomly
- No reconnection handling built-in
- Rate limits hit you during high activity
- Events can arrive out of order
- You miss blocks during reconnection
What Actually Works: Event Filters with Polling
After trying various approaches, I settled on event logs with strategic polling:
import time
from web3 import Web3
w3 = Web3(Web3.HTTPProvider('https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY'))
# Uniswap V3 USDC/ETH pool swap event
POOL_ADDRESS = '0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640'
SWAP_EVENT_SIGNATURE = w3.keccak(text='Swap(address,address,int256,int256,uint160,uint128,int24)').hex()
last_block = w3.eth.block_number
while True:
current_block = w3.eth.block_number
if current_block > last_block:
# Get swap events from new blocks
logs = w3.eth.get_logs({
'fromBlock': last_block + 1,
'toBlock': current_block,
'address': POOL_ADDRESS,
'topics': [SWAP_EVENT_SIGNATURE]
})
for log in logs:
process_swap_event(log)
last_block = current_block
time.sleep(2) # Poll every 2 seconds
Why this works better:
- HTTP is more reliable than WebSockets
- Explicit block tracking prevents missed events
- Batch fetching multiple blocks reduces API calls
- Easy to persist
last_blockand resume after crashes - Works with any RPC provider
The tradeoff: You’re 2-4 seconds behind real-time. For most DeFi strategies, this is acceptable. If you need faster, see mempool monitoring below.
Filtering Events Efficiently
Don’t fetch all events and filter in code. Use RPC filters:
# BAD: Fetch everything, filter locally
all_logs = w3.eth.get_logs({'fromBlock': start, 'toBlock': end})
swap_logs = [log for log in all_logs if log.address == POOL_ADDRESS]
# GOOD: Filter at RPC level
swap_logs = w3.eth.get_logs({
'fromBlock': start,
'toBlock': end,
'address': POOL_ADDRESS, # Filter by contract
'topics': [SWAP_EVENT_SIGNATURE] # Filter by event type
})
Topic filtering saves bandwidth:
Topics are indexed parameters in events. The first topic is always the event signature. Additional topics are indexed parameters:
event Swap(
address indexed sender, // topic[1]
address indexed recipient, // topic[2]
int256 amount0, // not indexed, in data
int256 amount1, // not indexed, in data
uint160 sqrtPriceX96, // not indexed, in data
uint128 liquidity, // not indexed, in data
int24 tick // not indexed, in data
);
You can filter by indexed parameters:
# Only swaps from specific address
logs = w3.eth.get_logs({
'address': POOL_ADDRESS,
'topics': [
SWAP_EVENT_SIGNATURE,
'0x' + SENDER_ADDRESS[2:].zfill(64) # Must be 32 bytes
]
})
Decoding Events
Raw logs are hex data. You need to decode them:
from eth_abi import decode
def decode_swap_event(log):
# Topics: [event_sig, sender, recipient]
sender = '0x' + log['topics'][1].hex()[-40:]
recipient = '0x' + log['topics'][2].hex()[-40:]
# Data contains non-indexed parameters
data = bytes.fromhex(log['data'][2:]) # Remove '0x'
amount0, amount1, sqrtPriceX96, liquidity, tick = decode(
['int256', 'int256', 'uint160', 'uint128', 'int24'],
data
)
return {
'sender': sender,
'recipient': recipient,
'amount0': amount0,
'amount1': amount1,
'price': (sqrtPriceX96 ** 2) / (2 ** 192), # Convert to readable price
'block': log['blockNumber'],
'tx_hash': log['transactionHash'].hex()
}
Pro tip: Use contract ABI for automatic decoding:
from web3 import Web3
w3 = Web3(Web3.HTTPProvider(RPC_URL))
contract = w3.eth.contract(address=POOL_ADDRESS, abi=POOL_ABI)
# Get event object
swap_event = contract.events.Swap()
# Decode logs automatically
decoded_logs = swap_event.process_log(log)
Much cleaner, handles all the hex conversion automatically.
Handling Reorganizations
Blockchains reorganize. A block you processed might not be the canonical chain anymore.
SAFE_BLOCK_DEPTH = 3 # Wait 3 blocks before considering final
while True:
current_block = w3.eth.block_number
safe_block = current_block - SAFE_BLOCK_DEPTH
if safe_block > last_processed:
logs = w3.eth.get_logs({
'fromBlock': last_processed + 1,
'toBlock': safe_block,
'address': POOL_ADDRESS,
'topics': [SWAP_EVENT_SIGNATURE]
})
process_events(logs)
last_processed = safe_block
time.sleep(2)
For time-sensitive operations, process optimistically but mark as “pending” until confirmed after reorg depth.
Multi-Chain Monitoring
Monitoring multiple chains efficiently:
import asyncio
from web3 import AsyncWeb3
async def monitor_chain(chain_name, rpc_url, contracts):
w3 = AsyncWeb3(AsyncWeb3.AsyncHTTPProvider(rpc_url))
last_block = await w3.eth.block_number
while True:
current_block = await w3.eth.block_number
if current_block > last_block:
# Fetch logs from all contracts in parallel
tasks = [
w3.eth.get_logs({
'fromBlock': last_block + 1,
'toBlock': current_block,
'address': contract['address'],
'topics': contract['topics']
})
for contract in contracts
]
results = await asyncio.gather(*tasks)
for logs in results:
process_logs(chain_name, logs)
last_block = current_block
await asyncio.sleep(2)
# Run multiple chains concurrently
async def main():
await asyncio.gather(
monitor_chain('ethereum', ETH_RPC, eth_contracts),
monitor_chain('bsc', BSC_RPC, bsc_contracts),
monitor_chain('polygon', POLY_RPC, poly_contracts)
)
asyncio.run(main())
Risk Analysis
Technical risks:
RPC providers rate limit. Have fallback providers ready. I run with 3 providers per chain - primary, secondary, and local node for critical systems.
Node sync issues can give you stale data. Always check w3.eth.syncing before trusting block numbers.
Economic risks:
Latency costs money in DeFi. Every second of delay is potential profit lost to faster bots. If you’re competing on speed, you need dedicated infrastructure closer to validators.
Operational risks:
Memory leaks in long-running event monitors are common. Restart your processes daily or monitor memory usage carefully.
Database writes become bottleneck when processing high-volume events. Batch inserts and use async I/O.
When to Use Mempool Monitoring
If 2-second latency isn’t good enough, you need mempool monitoring:
# Subscribe to pending transactions
pending_filter = w3.eth.filter('pending')
while True:
pending_txs = w3.eth.get_filter_changes(pending_filter.filter_id)
for tx_hash in pending_txs:
tx = w3.eth.get_transaction(tx_hash)
# Analyze transaction before it's mined
if is_interesting(tx):
# React (submit competing tx with higher gas)
submit_competing_transaction(tx)
Reality check: Public RPC providers don’t give you complete mempool access. You need your own node or specialized services like Blocknative or Flashbots.
Practical Takeaways
Start with HTTP polling and event logs. It’s reliable and handles 90% of use cases.
Filter at the RPC level using address and topics. Don’t pull data you’ll discard.
Wait 3 blocks before treating events as final to handle reorgs.
Use async I/O for multi-chain monitoring to avoid thread overhead.
Have fallback RPC providers. Primary will fail at the worst time.