Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Examples

Quick Start with KabuBuilder

The simplest way to start a Kabu MEV bot:

use kabu::core::blockchain::{Blockchain, BlockchainState};
use kabu::core::components::{KabuBuilder, MevComponentChannels};
use kabu::core::node::{KabuBuildContext, KabuEthereumNode};
use kabu::evm::db::KabuDB;
use kabu::execution::multicaller::MulticallerSwapEncoder;
use kabu::strategy::backrun::BackrunConfig;
use kabu::types::blockchain::KabuDataTypesEthereum;
use alloy::providers::ProviderBuilder;
use reth_tasks::TaskManager;

#[tokio::main]
async fn main() -> Result<()> {
    // Create provider
    let provider = ProviderBuilder::new()
        .on_http("http://localhost:8545")
        .build()?;
        
    // Initialize blockchain components
    let blockchain = Blockchain::new(1); // Chain ID 1 for mainnet
    let blockchain_state = BlockchainState::<KabuDB, KabuDataTypesEthereum>::new();
    
    // Load configurations
    let topology_config = TopologyConfig::load_from_file("config.toml")?;
    let backrun_config = BackrunConfig::new_dumb(); // Simple config for testing
    
    // Deploy or get multicaller address
    let multicaller_address = "0x...".parse()?;
    
    // Create task manager
    let task_manager = TaskManager::new(tokio::runtime::Handle::current());
    let task_executor = task_manager.executor();
    
    // Build and launch
    let kabu_context = KabuBuildContext::builder(
        provider,
        blockchain,
        blockchain_state,
        topology_config,
        backrun_config,
        multicaller_address,
        None, // No database for simple example
        false, // Not running as reth ExEx
    )
    .build();
    
    let handle = KabuBuilder::new(kabu_context)
        .node(KabuEthereumNode::default())
        .build()
        .launch(task_executor)
        .await?;
    
    // Wait for shutdown signal
    tokio::signal::ctrl_c().await?;
    handle.shutdown().await?;
    
    Ok(())
}

Running with Reth Node

Kabu can run as a Reth Execution Extension (ExEx):

use reth::cli::Cli;
use reth::builder::NodeHandle;

fn main() -> eyre::Result<()> {
    Cli::<EthereumChainSpecParser, KabuArgs>::parse().run(|builder, kabu_args| async move {
        let blockchain = Blockchain::new(builder.config().chain.chain.id());
        
        let NodeHandle { node, node_exit_future } = builder
            .with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
            .with_components(EthereumNode::components())
            .with_add_ons(EthereumAddOns::default())
            .install_exex("kabu-exex", |ctx| init_kabu_exex(ctx, blockchain))
            .launch()
            .await?;
            
        // Start Kabu MEV components
        let provider = node.provider.clone();
        let handle = start_kabu_mev(provider, blockchain, true).await?;
        
        // Wait for node exit
        node_exit_future.await?;
        Ok(())
    })
}

Custom Component Example

Create a component that monitors specific pools:

use kabu_core_components::{Component, MevComponentChannels};
use kabu_types_events::MarketEvents;
use reth_tasks::TaskExecutor;

pub struct PoolMonitorComponent<DB> {
    pools_to_monitor: Vec<Address>,
    channels: Option<MevComponentChannels<DB>>,
}

impl<DB: Clone + Send + Sync + 'static> PoolMonitorComponent<DB> {
    pub fn new(pools: Vec<Address>) -> Self {
        Self {
            pools_to_monitor: pools,
            channels: None,
        }
    }
    
    pub fn with_channels(mut self, channels: &MevComponentChannels<DB>) -> Self {
        self.channels = Some(channels.clone());
        self
    }
}

impl<DB: Clone + Send + Sync + 'static> Component for PoolMonitorComponent<DB> {
    fn spawn(self, executor: TaskExecutor) -> Result<()> {
        let channels = self.channels.ok_or_else(|| eyre!("channels not set"))?;
        let pools = self.pools_to_monitor;
        
        let mut market_events = channels.market_events.subscribe();
        
        executor.spawn_critical("PoolMonitor", async move {
            while let Ok(event) = market_events.recv().await {
                match event {
                    MarketEvents::PoolUpdate { address, .. } => {
                        if pools.contains(&address) {
                            info!("Monitored pool updated: {:?}", address);
                            // Trigger specific actions for monitored pools
                        }
                    }
                    _ => {}
                }
            }
        });
        
        Ok(())
    }
    
    fn spawn_boxed(self: Box<Self>, executor: TaskExecutor) -> Result<()> {
        (*self).spawn(executor)
    }
    
    fn name(&self) -> &'static str {
        "PoolMonitorComponent"
    }
}

Custom Node Implementation

Create a minimal node with only essential components:

use kabu_core_components::{KabuNode, KabuBuildContext, BoxedComponent};

pub struct MinimalNode;

#[async_trait]
impl<P, DB> KabuNode<P, DB> for MinimalNode 
where
    P: Provider + Send + Sync + 'static,
    DB: Database + Send + Sync + 'static,
{
    async fn build_components(
        &self,
        context: &KabuBuildContext<P, DB>,
    ) -> Result<Vec<BoxedComponent>> {
        let mut components = vec![];
        
        // Add only arbitrage searcher
        components.push(Box::new(
            StateChangeArbSearcherComponent::new(context.backrun_config.clone())
                .with_channels(&context.channels)
                .with_market_state(context.market_state.clone())
        ));
        
        // Add signer component
        components.push(Box::new(
            SignersComponent::<DB, KabuDataTypesEthereum>::new()
                .with_channels(&context.channels)
        ));
        
        // Add broadcaster
        if let Some(broadcaster_config) = context.topology_config.actors.broadcaster.as_ref() {
            components.push(Box::new(
                FlashbotsBroadcastComponent::new(
                    context.provider.clone(),
                    broadcaster_config.clone(),
                )
                .with_channels(&context.channels)
            ));
        }
        
        Ok(components)
    }
}

// Usage
let handle = KabuBuilder::new(kabu_context)
    .node(MinimalNode)
    .build()
    .launch(task_executor)
    .await?;

Working with State

Loading Pool State

When working with pools, ensure their state is loaded:

use kabu_types_market::{Pool, RequiredStateReader};
use kabu_defi_pools::UniswapV3Pool;

// Fetch pool data
let pool = UniswapV3Pool::fetch_pool_data(provider.clone(), pool_address).await?;

// Get required state
let state_required = pool.get_state_required()?;

// Fetch state from chain
let state_update = RequiredStateReader::<KabuDataTypesEthereum>::fetch_calls_and_slots(
    provider.clone(),
    state_required,
    Some(block_number),
)
.await?;

// Apply to state DB
market_state.write().await.state_db.apply_geth_update(state_update);

// Add pool to market
market.write().await.add_pool(pool.into())?;

Accessing Shared State

// Read market data
let market_guard = context.market.read().await;
let pool = market_guard.get_pool(&pool_address)?;
let tokens = market_guard.tokens();
drop(market_guard); // Release lock

// Update signer state
let mut signers = context.channels.signers.write().await;
signers.add_privkey(private_key);

Testing Components

#![allow(unused)]
fn main() {
#[cfg(test)]
mod tests {
    use super::*;
    use tokio::sync::broadcast;
    
    #[tokio::test]
    async fn test_component_startup() {
        // Create test channels
        let channels = MevComponentChannels::default();
        
        // Create component
        let component = MyComponent::new(MyConfig::default())
            .with_channels(&channels);
        
        // Create test executor
        let task_manager = TaskManager::new(tokio::runtime::Handle::current());
        let executor = task_manager.executor();
        
        // Spawn component
        assert!(component.spawn(executor).is_ok());
        
        // Send test event
        channels.market_events.send(MarketEvents::BlockHeaderUpdate {
            block_number: 100,
            block_hash: Default::default(),
            timestamp: 0,
            base_fee: U256::ZERO,
            next_base_fee: U256::ZERO,
        }).unwrap();
        
        // Allow processing
        tokio::time::sleep(Duration::from_millis(100)).await;
        
        // Shutdown
        task_manager.shutdown().await;
    }
}
}

Complete Example: Custom Arbitrage Bot

See the testing/backtest-runner for a complete example that:

  • Initializes providers and blockchain state
  • Loads pools and their state
  • Configures components
  • Processes historical blocks
  • Tracks arbitrage opportunities

Key patterns from the backtest runner:

  • Uses MevComponentChannels for pre-initialized channels
  • Loads pool state before processing
  • Manually triggers events for testing
  • Tracks performance metrics