Monitoring On-Chain Events in Real-Time

Monitoring On-Chain Events in Real-Time

Real-time event monitoring is critical for trading bots, liquidation systems, and yield optimization. Miss an event by a few blocks and you’ve missed your opportunity. Here’s what actually works after running production systems for over a year.

Why Event Monitoring Matters

On-chain events tell you when something happened - a swap executed, liquidity added, price oracle updated, governance proposal created. If you’re building automation, you need to react to these events fast.

Common use cases:

  • Trading bots watching for large swaps that move prices
  • Liquidation bots monitoring health factors
  • Yield aggregators tracking APY changes
  • MEV searchers scanning mempool for opportunities

The challenge: doing this reliably without missing events or drowning in noise.

The Basic Approach: WebSocket Subscriptions

Most people start with WebSocket connections to their RPC provider:

from web3 import Web3

# Connect via WebSocket
w3 = Web3(Web3.WebsocketProvider('wss://eth-mainnet.g.alchemy.com/v2/YOUR_KEY'))

# Subscribe to new blocks
def handle_block(block_hash):
    block = w3.eth.get_block(block_hash, full_transactions=True)
    # Process transactions

block_filter = w3.eth.filter('latest')
w3.eth.watch_filter(block_filter, handle_block)

This works for demos. In production, it fails constantly.

Problems:

  • WebSocket connections drop randomly
  • No reconnection handling built-in
  • Rate limits hit you during high activity
  • Events can arrive out of order
  • You miss blocks during reconnection

What Actually Works: Event Filters with Polling

After trying various approaches, I settled on event logs with strategic polling:

import time
from web3 import Web3

w3 = Web3(Web3.HTTPProvider('https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY'))

# Uniswap V3 USDC/ETH pool swap event
POOL_ADDRESS = '0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640'
SWAP_EVENT_SIGNATURE = w3.keccak(text='Swap(address,address,int256,int256,uint160,uint128,int24)').hex()

last_block = w3.eth.block_number

while True:
    current_block = w3.eth.block_number

    if current_block > last_block:
        # Get swap events from new blocks
        logs = w3.eth.get_logs({
            'fromBlock': last_block + 1,
            'toBlock': current_block,
            'address': POOL_ADDRESS,
            'topics': [SWAP_EVENT_SIGNATURE]
        })

        for log in logs:
            process_swap_event(log)

        last_block = current_block

    time.sleep(2)  # Poll every 2 seconds

Why this works better:

  • HTTP is more reliable than WebSockets
  • Explicit block tracking prevents missed events
  • Batch fetching multiple blocks reduces API calls
  • Easy to persist last_block and resume after crashes
  • Works with any RPC provider

The tradeoff: You’re 2-4 seconds behind real-time. For most DeFi strategies, this is acceptable. If you need faster, see mempool monitoring below.

Filtering Events Efficiently

Don’t fetch all events and filter in code. Use RPC filters:

# BAD: Fetch everything, filter locally
all_logs = w3.eth.get_logs({'fromBlock': start, 'toBlock': end})
swap_logs = [log for log in all_logs if log.address == POOL_ADDRESS]

# GOOD: Filter at RPC level
swap_logs = w3.eth.get_logs({
    'fromBlock': start,
    'toBlock': end,
    'address': POOL_ADDRESS,  # Filter by contract
    'topics': [SWAP_EVENT_SIGNATURE]  # Filter by event type
})

Topic filtering saves bandwidth:

Topics are indexed parameters in events. The first topic is always the event signature. Additional topics are indexed parameters:

event Swap(
    address indexed sender,      // topic[1]
    address indexed recipient,   // topic[2]
    int256 amount0,              // not indexed, in data
    int256 amount1,              // not indexed, in data
    uint160 sqrtPriceX96,        // not indexed, in data
    uint128 liquidity,           // not indexed, in data
    int24 tick                   // not indexed, in data
);

You can filter by indexed parameters:

# Only swaps from specific address
logs = w3.eth.get_logs({
    'address': POOL_ADDRESS,
    'topics': [
        SWAP_EVENT_SIGNATURE,
        '0x' + SENDER_ADDRESS[2:].zfill(64)  # Must be 32 bytes
    ]
})

Decoding Events

Raw logs are hex data. You need to decode them:

from eth_abi import decode

def decode_swap_event(log):
    # Topics: [event_sig, sender, recipient]
    sender = '0x' + log['topics'][1].hex()[-40:]
    recipient = '0x' + log['topics'][2].hex()[-40:]

    # Data contains non-indexed parameters
    data = bytes.fromhex(log['data'][2:])  # Remove '0x'
    amount0, amount1, sqrtPriceX96, liquidity, tick = decode(
        ['int256', 'int256', 'uint160', 'uint128', 'int24'],
        data
    )

    return {
        'sender': sender,
        'recipient': recipient,
        'amount0': amount0,
        'amount1': amount1,
        'price': (sqrtPriceX96 ** 2) / (2 ** 192),  # Convert to readable price
        'block': log['blockNumber'],
        'tx_hash': log['transactionHash'].hex()
    }

Pro tip: Use contract ABI for automatic decoding:

from web3 import Web3

w3 = Web3(Web3.HTTPProvider(RPC_URL))
contract = w3.eth.contract(address=POOL_ADDRESS, abi=POOL_ABI)

# Get event object
swap_event = contract.events.Swap()

# Decode logs automatically
decoded_logs = swap_event.process_log(log)

Much cleaner, handles all the hex conversion automatically.

Handling Reorganizations

Blockchains reorganize. A block you processed might not be the canonical chain anymore.

SAFE_BLOCK_DEPTH = 3  # Wait 3 blocks before considering final

while True:
    current_block = w3.eth.block_number
    safe_block = current_block - SAFE_BLOCK_DEPTH

    if safe_block > last_processed:
        logs = w3.eth.get_logs({
            'fromBlock': last_processed + 1,
            'toBlock': safe_block,
            'address': POOL_ADDRESS,
            'topics': [SWAP_EVENT_SIGNATURE]
        })

        process_events(logs)
        last_processed = safe_block

    time.sleep(2)

For time-sensitive operations, process optimistically but mark as “pending” until confirmed after reorg depth.

Multi-Chain Monitoring

Monitoring multiple chains efficiently:

import asyncio
from web3 import AsyncWeb3

async def monitor_chain(chain_name, rpc_url, contracts):
    w3 = AsyncWeb3(AsyncWeb3.AsyncHTTPProvider(rpc_url))
    last_block = await w3.eth.block_number

    while True:
        current_block = await w3.eth.block_number

        if current_block > last_block:
            # Fetch logs from all contracts in parallel
            tasks = [
                w3.eth.get_logs({
                    'fromBlock': last_block + 1,
                    'toBlock': current_block,
                    'address': contract['address'],
                    'topics': contract['topics']
                })
                for contract in contracts
            ]

            results = await asyncio.gather(*tasks)

            for logs in results:
                process_logs(chain_name, logs)

            last_block = current_block

        await asyncio.sleep(2)

# Run multiple chains concurrently
async def main():
    await asyncio.gather(
        monitor_chain('ethereum', ETH_RPC, eth_contracts),
        monitor_chain('bsc', BSC_RPC, bsc_contracts),
        monitor_chain('polygon', POLY_RPC, poly_contracts)
    )

asyncio.run(main())

Risk Analysis

Technical risks:

RPC providers rate limit. Have fallback providers ready. I run with 3 providers per chain - primary, secondary, and local node for critical systems.

Node sync issues can give you stale data. Always check w3.eth.syncing before trusting block numbers.

Economic risks:

Latency costs money in DeFi. Every second of delay is potential profit lost to faster bots. If you’re competing on speed, you need dedicated infrastructure closer to validators.

Operational risks:

Memory leaks in long-running event monitors are common. Restart your processes daily or monitor memory usage carefully.

Database writes become bottleneck when processing high-volume events. Batch inserts and use async I/O.

When to Use Mempool Monitoring

If 2-second latency isn’t good enough, you need mempool monitoring:

# Subscribe to pending transactions
pending_filter = w3.eth.filter('pending')

while True:
    pending_txs = w3.eth.get_filter_changes(pending_filter.filter_id)

    for tx_hash in pending_txs:
        tx = w3.eth.get_transaction(tx_hash)
        # Analyze transaction before it's mined
        if is_interesting(tx):
            # React (submit competing tx with higher gas)
            submit_competing_transaction(tx)

Reality check: Public RPC providers don’t give you complete mempool access. You need your own node or specialized services like Blocknative or Flashbots.

Practical Takeaways

Start with HTTP polling and event logs. It’s reliable and handles 90% of use cases.

Filter at the RPC level using address and topics. Don’t pull data you’ll discard.

Wait 3 blocks before treating events as final to handle reorgs.

Use async I/O for multi-chain monitoring to avoid thread overhead.

Have fallback RPC providers. Primary will fail at the worst time.

Resources