Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Kabu Documentation

Documentation for the Kabu MEV bot

Kabu Logo

Telegram Chat

What is Kabu?

Kabu is a backrunning bot, currently under heavy development. It continues the journey of loom. Since then many breaking changes have been made to revm, reth and alloy. The goal here is to make everything work again and modernize the codebase. Currently, Kabu is a work in progress and not yet ready for production use.

Who is Kabu for?

For everyone that does not like to reinvent the wheel all the time. Have foundation to work with, extend it, rewrite it or use it as playground to learn about MEV and backrunning.

Kabu is opinionated

  • Kabu will only support exex and json-rpc
  • We reuse as much as possible from reth, alloy and revm
  • We keep as close as possible to the architecture of reth

Why "Kabu"?

In Japanese, kabu (株) means "stock" — both in the financial sense and as a metaphor for growth.

High level architecture

The kabu framework is using the alloy type system and has a deep integration with reth to receive events.

High level architecture

Getting Started

Prerequisites

  • Rust (latest stable)
  • Optional: PostgreSQL (for database)
  • Optional: InfluxDB (for metrics)
  • RPC node (e.g., your own node, Alchemy, Infura)

Building Kabu

1. Clone the repository

git clone https://github.com/cakevm/kabu.git
cd kabu

2. Build the project

# Development build
make

# Release build (optimized)
make release

# Maximum performance build
make maxperf

Configuration

1. Create configuration file

cp config.example.toml config.toml

2. Edit config.toml

[clients.local]
url = "ws://localhost:8545"  # Your node endpoint

[database]
url = "postgresql://kabu:kabu@localhost/kabu"

[actors.signers]
# Add your signer configuration

[actors.broadcaster]
flashbots_signer = "0x..."  # Your flashbots signer

3. Set up environment variables

Create a .env file:

# .env file
DATABASE_URL=postgresql://kabu:kabu@localhost/kabu
MAINNET_WS=wss://eth-mainnet.g.alchemy.com/v2/YOUR_API_KEY
DATA=your_encrypted_private_key  # Optional: for signing

Database Setup (Optional)

1. Create the database and user

sudo -u postgres psql
CREATE DATABASE kabu;
CREATE USER kabu WITH PASSWORD 'kabu';

\c kabu;
CREATE SCHEMA kabu AUTHORIZATION kabu;
GRANT ALL PRIVILEGES ON DATABASE kabu TO kabu;
ALTER ROLE kabu SET search_path TO kabu, public;
\q

2. Set environment variables

# Create .env file
echo "DATABASE_URL=postgresql://kabu:kabu@localhost/kabu" >> .env

3. Run migrations

Migrations will run automatically on first startup, or manually:

diesel migration run --database-url postgresql://kabu:kabu@localhost/kabu

Running Kabu

As a standalone application

# Run with remote node
cargo run --bin kabu -- remote --kabu-config config.toml

# Run with local Reth node
cargo run --bin kabu -- node --kabu-config config.toml

As a Reth ExEx (Execution Extension)

# Start Reth with Kabu ExEx
reth node \
  --chain mainnet \
  --datadir ./reth-data \
  --kabu-config config.toml

For testing with backtest runner

# Run specific test
cargo run --bin kabu-backtest-runner -- \
  --config ./testing/backtest-runner/test_18567709.toml

# Run all tests
make swap-test-all

Development Tools

Code Quality

# Format code
make fmt

# Run linter
make clippy

# Run tests
make test

# Pre-release checks (fmt, clippy, taplo, udeps)
make pre-release

# Clean unused dependencies
make udeps

Documentation

# Build the book
make book

# Test book examples
make test-book

# Serve book locally (opens at http://localhost:3000)
make serve-book

# Build API documentation
make doc

Multicaller Contract

Kabu uses a custom multicaller contract for efficient swap execution. The contract needs to be deployed to your target network. Find the contract at kabu-contract.

Private Key Management

Generate encryption password

cargo run --bin keys generate-password

Replace the generated password in ./crates/defi-entities/private.rs:

pub const KEY_ENCRYPTION_PWD: [u8; 16] = [/* your generated password */];

Encrypt your private key

cargo run --bin keys encrypt --key 0xYOUR_PRIVATE_KEY

Use the encrypted key in the DATA environment variable when running Kabu.

Quick Start Example

  1. Clone and build:

    git clone https://github.com/cakevm/kabu.git
    cd kabu
    make release
    
  2. Configure:

    cp config.example.toml config.toml
    # Edit config.toml with your settings
    
  3. Run:

    cargo run --bin kabu -- remote --kabu-config config.toml
    

Troubleshooting

  • Connection issues: Ensure your RPC endpoint is accessible and supports WebSocket
  • Database errors: Check PostgreSQL is running and credentials are correct
  • Build failures: Run make clean and rebuild
  • Test failures: Ensure you have proper archive node access via environment variables

Examples

Quick Start with KabuBuilder

The simplest way to start a Kabu MEV bot:

use kabu::core::blockchain::{Blockchain, BlockchainState};
use kabu::core::components::{KabuBuilder, MevComponentChannels};
use kabu::core::node::{KabuBuildContext, KabuEthereumNode};
use kabu::evm::db::KabuDB;
use kabu::execution::multicaller::MulticallerSwapEncoder;
use kabu::strategy::backrun::BackrunConfig;
use kabu::types::blockchain::KabuDataTypesEthereum;
use alloy::providers::ProviderBuilder;
use reth_tasks::TaskManager;

#[tokio::main]
async fn main() -> Result<()> {
    // Create provider
    let provider = ProviderBuilder::new()
        .on_http("http://localhost:8545")
        .build()?;
        
    // Initialize blockchain components
    let blockchain = Blockchain::new(1); // Chain ID 1 for mainnet
    let blockchain_state = BlockchainState::<KabuDB, KabuDataTypesEthereum>::new();
    
    // Load configurations
    let topology_config = TopologyConfig::load_from_file("config.toml")?;
    let backrun_config = BackrunConfig::new_dumb(); // Simple config for testing
    
    // Deploy or get multicaller address
    let multicaller_address = "0x...".parse()?;
    
    // Create task manager
    let task_manager = TaskManager::new(tokio::runtime::Handle::current());
    let task_executor = task_manager.executor();
    
    // Build and launch
    let kabu_context = KabuBuildContext::builder(
        provider,
        blockchain,
        blockchain_state,
        topology_config,
        backrun_config,
        multicaller_address,
        None, // No database for simple example
        false, // Not running as reth ExEx
    )
    .build();
    
    let handle = KabuBuilder::new(kabu_context)
        .node(KabuEthereumNode::default())
        .build()
        .launch(task_executor)
        .await?;
    
    // Wait for shutdown signal
    tokio::signal::ctrl_c().await?;
    handle.shutdown().await?;
    
    Ok(())
}

Running with Reth Node

Kabu can run as a Reth Execution Extension (ExEx):

use reth::cli::Cli;
use reth::builder::NodeHandle;

fn main() -> eyre::Result<()> {
    Cli::<EthereumChainSpecParser, KabuArgs>::parse().run(|builder, kabu_args| async move {
        let blockchain = Blockchain::new(builder.config().chain.chain.id());
        
        let NodeHandle { node, node_exit_future } = builder
            .with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
            .with_components(EthereumNode::components())
            .with_add_ons(EthereumAddOns::default())
            .install_exex("kabu-exex", |ctx| init_kabu_exex(ctx, blockchain))
            .launch()
            .await?;
            
        // Start Kabu MEV components
        let provider = node.provider.clone();
        let handle = start_kabu_mev(provider, blockchain, true).await?;
        
        // Wait for node exit
        node_exit_future.await?;
        Ok(())
    })
}

Custom Component Example

Create a component that monitors specific pools:

use kabu_core_components::{Component, MevComponentChannels};
use kabu_types_events::MarketEvents;
use reth_tasks::TaskExecutor;

pub struct PoolMonitorComponent<DB> {
    pools_to_monitor: Vec<Address>,
    channels: Option<MevComponentChannels<DB>>,
}

impl<DB: Clone + Send + Sync + 'static> PoolMonitorComponent<DB> {
    pub fn new(pools: Vec<Address>) -> Self {
        Self {
            pools_to_monitor: pools,
            channels: None,
        }
    }
    
    pub fn with_channels(mut self, channels: &MevComponentChannels<DB>) -> Self {
        self.channels = Some(channels.clone());
        self
    }
}

impl<DB: Clone + Send + Sync + 'static> Component for PoolMonitorComponent<DB> {
    fn spawn(self, executor: TaskExecutor) -> Result<()> {
        let channels = self.channels.ok_or_else(|| eyre!("channels not set"))?;
        let pools = self.pools_to_monitor;
        
        let mut market_events = channels.market_events.subscribe();
        
        executor.spawn_critical("PoolMonitor", async move {
            while let Ok(event) = market_events.recv().await {
                match event {
                    MarketEvents::PoolUpdate { address, .. } => {
                        if pools.contains(&address) {
                            info!("Monitored pool updated: {:?}", address);
                            // Trigger specific actions for monitored pools
                        }
                    }
                    _ => {}
                }
            }
        });
        
        Ok(())
    }
    
    fn spawn_boxed(self: Box<Self>, executor: TaskExecutor) -> Result<()> {
        (*self).spawn(executor)
    }
    
    fn name(&self) -> &'static str {
        "PoolMonitorComponent"
    }
}

Custom Node Implementation

Create a minimal node with only essential components:

use kabu_core_components::{KabuNode, KabuBuildContext, BoxedComponent};

pub struct MinimalNode;

#[async_trait]
impl<P, DB> KabuNode<P, DB> for MinimalNode 
where
    P: Provider + Send + Sync + 'static,
    DB: Database + Send + Sync + 'static,
{
    async fn build_components(
        &self,
        context: &KabuBuildContext<P, DB>,
    ) -> Result<Vec<BoxedComponent>> {
        let mut components = vec![];
        
        // Add only arbitrage searcher
        components.push(Box::new(
            StateChangeArbSearcherComponent::new(context.backrun_config.clone())
                .with_channels(&context.channels)
                .with_market_state(context.market_state.clone())
        ));
        
        // Add signer component
        components.push(Box::new(
            SignersComponent::<DB, KabuDataTypesEthereum>::new()
                .with_channels(&context.channels)
        ));
        
        // Add broadcaster
        if let Some(broadcaster_config) = context.topology_config.actors.broadcaster.as_ref() {
            components.push(Box::new(
                FlashbotsBroadcastComponent::new(
                    context.provider.clone(),
                    broadcaster_config.clone(),
                )
                .with_channels(&context.channels)
            ));
        }
        
        Ok(components)
    }
}

// Usage
let handle = KabuBuilder::new(kabu_context)
    .node(MinimalNode)
    .build()
    .launch(task_executor)
    .await?;

Working with State

Loading Pool State

When working with pools, ensure their state is loaded:

use kabu_types_market::{Pool, RequiredStateReader};
use kabu_defi_pools::UniswapV3Pool;

// Fetch pool data
let pool = UniswapV3Pool::fetch_pool_data(provider.clone(), pool_address).await?;

// Get required state
let state_required = pool.get_state_required()?;

// Fetch state from chain
let state_update = RequiredStateReader::<KabuDataTypesEthereum>::fetch_calls_and_slots(
    provider.clone(),
    state_required,
    Some(block_number),
)
.await?;

// Apply to state DB
market_state.write().await.state_db.apply_geth_update(state_update);

// Add pool to market
market.write().await.add_pool(pool.into())?;

Accessing Shared State

// Read market data
let market_guard = context.market.read().await;
let pool = market_guard.get_pool(&pool_address)?;
let tokens = market_guard.tokens();
drop(market_guard); // Release lock

// Update signer state
let mut signers = context.channels.signers.write().await;
signers.add_privkey(private_key);

Testing Components

#![allow(unused)]
fn main() {
#[cfg(test)]
mod tests {
    use super::*;
    use tokio::sync::broadcast;
    
    #[tokio::test]
    async fn test_component_startup() {
        // Create test channels
        let channels = MevComponentChannels::default();
        
        // Create component
        let component = MyComponent::new(MyConfig::default())
            .with_channels(&channels);
        
        // Create test executor
        let task_manager = TaskManager::new(tokio::runtime::Handle::current());
        let executor = task_manager.executor();
        
        // Spawn component
        assert!(component.spawn(executor).is_ok());
        
        // Send test event
        channels.market_events.send(MarketEvents::BlockHeaderUpdate {
            block_number: 100,
            block_hash: Default::default(),
            timestamp: 0,
            base_fee: U256::ZERO,
            next_base_fee: U256::ZERO,
        }).unwrap();
        
        // Allow processing
        tokio::time::sleep(Duration::from_millis(100)).await;
        
        // Shutdown
        task_manager.shutdown().await;
    }
}
}

Complete Example: Custom Arbitrage Bot

See the testing/backtest-runner for a complete example that:

  • Initializes providers and blockchain state
  • Loads pools and their state
  • Configures components
  • Processes historical blocks
  • Tracks arbitrage opportunities

Key patterns from the backtest runner:

  • Uses MevComponentChannels for pre-initialized channels
  • Loads pool state before processing
  • Manually triggers events for testing
  • Tracks performance metrics

Architecture

Kabu is built on a modern component-based architecture that emphasizes modularity, concurrency, and type safety. The system efficiently processes blockchain data and identifies arbitrage opportunities through a unified node design with pluggable components.

Core Design Principles

  1. Node-Based Architecture: Centralized node pattern with pluggable components
  2. Component-Based Concurrency: Each component runs independently with message-passing communication
  3. Type Safety: Extensive use of Rust's type system for correctness
  4. Modular Design: Clear separation between data types, business logic, and infrastructure
  5. Performance First: Optimized for low-latency arbitrage detection and execution

Node System

The new node system provides a unified way to build and launch Kabu instances:

KabuNode Trait

The KabuNode trait defines the interface for different node implementations:

pub trait KabuNode<P, DB>: Send + Sync + 'static {
    async fn build_components(
        &self,
        context: &KabuBuildContext<P, DB>,
    ) -> Result<Vec<BoxedComponent>>;
}

KabuBuilder

The KabuBuilder provides a fluent API for constructing and launching nodes:

let handle = KabuBuilder::new(kabu_context)
    .node(KabuEthereumNode::<P, DB>::default())
    .build()
    .launch(task_executor)
    .await?;

KabuBuildContext

The build context centralizes all configuration and shared resources:

let kabu_context = KabuBuildContext::builder(
    provider,
    blockchain,
    blockchain_state,
    topology_config,
    backrun_config,
    multicaller_address,
    db_pool,
    is_exex,
)
.with_pools_config(pools_config)
.with_swap_encoder(swap_encoder)
.with_channels(mev_channels)
.build();

Component System

Components are independent processing units that:

  • Implement the Component trait
  • Run as tokio tasks via TaskExecutor
  • Communicate through MevComponentChannels
  • Use builder pattern for configuration
  • Support graceful shutdown

Key Components

  • StateChangeArbSearcherComponent: Detects arbitrage in state changes
  • SwapRouterComponent: Routes and encodes swap transactions
  • SignersComponent: Manages transaction signing with nonce tracking
  • FlashbotsBroadcastComponent: Submits bundles to Flashbots
  • Market Components: Pool discovery and state management
  • PoolHealthMonitorComponent: Tracks pool reliability

Type System Organization

Kabu's type system is split into three crates for modularity:

types/entities

Core blockchain and configuration types:

  • Block, Transaction, Account
  • StrategyConfig trait and implementations
  • Pool loading configurations

types/market

Market structure and routing:

  • Token: ERC20 with decimals, price, categories
  • Pool trait: Unified pool interface
  • Market: Registry of tokens and pools
  • SwapDirection: Token pair direction in pool
  • SwapPath: Route through multiple pools

types/swap

Execution and profit tracking:

  • Swap: Final transaction ready for execution
  • SwapLine: Path with amounts and gas costs
  • SwapStep: Single pool interaction
  • Calculation utilities for profit estimation

Communication Architecture

MevComponentChannels

All component communication is centralized through MevComponentChannels:

pub struct MevComponentChannels<DB> {
    // Market events
    pub market_events: broadcast::Sender<MarketEvents>,
    pub mempool_events: broadcast::Sender<MempoolEvents>,
    
    // Swap processing
    pub swap_compose: broadcast::Sender<SwapComposeMessage>,
    pub estimated_swaps: broadcast::Sender<SwapComposeMessage>,
    
    // Health monitoring
    pub health_events: broadcast::Sender<HealthEvent>,
    
    // Shared state
    pub signers: Arc<RwLock<TxSigners>>,
    pub account_state: Arc<RwLock<AccountNonceAndBalanceState>>,
}

Message Flow

The typical arbitrage detection and execution flow:

Block/Mempool Event
    ↓
StateChangeArbSearcher
    ↓
SwapCompose (prepare)
    ↓
Merger Components
    ↓
EvmEstimator
    ↓
SwapRouter
    ↓
SignersComponent
    ↓
FlashbotsBroadcast

Building a Kabu Instance

Using KabuEthereumNode

The standard way to create a Kabu instance:

// Create provider
let provider = ProviderBuilder::new()
    .on_http(node_url)
    .build()?;

// Initialize blockchain and state
let blockchain = Blockchain::new(chain_id);
let blockchain_state = BlockchainState::<KabuDB, KabuDataTypesEthereum>::new();

// Build context
let kabu_context = KabuBuildContext::builder(
    provider,
    blockchain,
    blockchain_state,
    topology_config,
    backrun_config,
    multicaller_address,
    db_pool,
    false, // is_exex
)
.build();

// Launch with KabuBuilder
let handle = KabuBuilder::new(kabu_context)
    .node(KabuEthereumNode::default())
    .build()
    .launch(task_executor)
    .await?;

// Wait for shutdown
handle.wait_for_shutdown().await?;

Custom Node Implementation

You can create custom nodes by implementing the KabuNode trait:

pub struct CustomNode;

impl<P, DB> KabuNode<P, DB> for CustomNode 
where
    P: Provider + Send + Sync + 'static,
    DB: Database + Send + Sync + 'static,
{
    async fn build_components(
        &self,
        context: &KabuBuildContext<P, DB>,
    ) -> Result<Vec<BoxedComponent>> {
        let mut components = vec![];
        
        // Add your custom components
        components.push(Box::new(
            MyCustomComponent::new(context.config.clone())
                .with_channels(&context.channels)
        ));
        
        Ok(components)
    }
}

This architecture provides:

  • Unified node construction
  • Centralized configuration
  • Easy component composition
  • Clean shutdown handling
  • Flexible customization

Node System

The Kabu node system provides a unified, extensible framework for building MEV bots with different configurations and capabilities.

Overview

The node system consists of three main parts:

  1. KabuNode trait - Defines how to build components
  2. KabuBuilder - Orchestrates node construction and launch
  3. KabuBuildContext - Centralizes configuration and resources

KabuNode Trait

The KabuNode trait is the core abstraction for creating different node types:

pub trait KabuNode<P, DB>: Send + Sync + 'static 
where
    P: Provider + Send + Sync + 'static,
    DB: Database + Send + Sync + 'static,
{
    async fn build_components(
        &self,
        context: &KabuBuildContext<P, DB>,
    ) -> Result<Vec<BoxedComponent>>;
}

KabuEthereumNode

The standard implementation that includes all MEV components:

pub struct KabuEthereumNode<P, DB> {
    _phantom: PhantomData<(P, DB)>,
}

impl<P, DB> KabuNode<P, DB> for KabuEthereumNode<P, DB> {
    async fn build_components(
        &self,
        context: &KabuBuildContext<P, DB>,
    ) -> Result<Vec<BoxedComponent>> {
        // Builds market, network, strategy, execution, 
        // monitoring, and broadcasting components
    }
}

KabuBuilder

The builder provides a fluent API for node construction:

pub struct KabuBuilder<P, DB> {
    context: KabuBuildContext<P, DB>,
    node: Option<Box<dyn KabuNode<P, DB>>>,
}

impl<P, DB> KabuBuilder<P, DB> {
    pub fn new(context: KabuBuildContext<P, DB>) -> Self {
        Self { context, node: None }
    }
    
    pub fn node(mut self, node: impl KabuNode<P, DB>) -> Self {
        self.node = Some(Box::new(node));
        self
    }
    
    pub fn build(self) -> BuiltKabu<P, DB> {
        BuiltKabu {
            context: self.context,
            node: self.node.unwrap_or_else(|| 
                Box::new(KabuEthereumNode::default())
            ),
        }
    }
}

Launching

The built node can be launched with a task executor:

impl<P, DB> BuiltKabu<P, DB> {
    pub async fn launch(self, executor: TaskExecutor) -> Result<KabuHandle> {
        // Build components from the node
        let components = self.node.build_components(&self.context).await?;
        
        // Spawn all components
        for component in components {
            component.spawn_boxed(executor.clone())?;
        }
        
        Ok(KabuHandle::new())
    }
}

KabuBuildContext

The build context contains all shared resources and configuration:

pub struct KabuBuildContext<P, DB> {
    // Provider and blockchain
    pub provider: P,
    pub blockchain: Blockchain,
    pub blockchain_state: BlockchainState<DB, KabuDataTypesEthereum>,
    
    // Communication channels
    pub channels: MevComponentChannels<DB>,
    
    // Configuration
    pub topology_config: TopologyConfig,
    pub backrun_config: BackrunConfig,
    pub pools_config: PoolsLoadingConfig,
    
    // Infrastructure
    pub multicaller_address: Address,
    pub swap_encoder: MulticallerSwapEncoder,
    pub db_pool: Option<DbPool>,
    
    // Shared state
    pub market: Arc<RwLock<Market>>,
    pub market_state: Arc<RwLock<MarketState<DB>>>,
    pub mempool: Arc<RwLock<Mempool>>,
    pub block_history: Arc<RwLock<BlockHistory>>,
    pub latest_block: Arc<RwLock<LatestBlock>>,
    
    // Flags
    pub is_exex: bool,
    pub enable_web_server: bool,
}

Builder Pattern

The context uses a builder for flexible initialization:

let context = KabuBuildContext::builder(
    provider,
    blockchain,
    blockchain_state,
    topology_config,
    backrun_config,
    multicaller_address,
    db_pool,
    is_exex,
)
.with_pools_config(pools_config)
.with_swap_encoder(swap_encoder)
.with_channels(channels)
.with_enable_web_server(false)
.build();

Component Selection

KabuEthereumNode builds components based on configuration:

Market Components

  • HistoryPoolLoaderComponent - Loads pools from database
  • ProtocolPoolLoaderComponent - Discovers new pools
  • MarketStatePreloadedComponent - Initializes market state

Network Components

  • WaitForNodeSyncComponent - Ensures node is synced
  • BlockProcessingComponent - Processes new blocks
  • MempoolProcessingComponent - Monitors mempool

Strategy Components

  • StateChangeArbSearcherComponent - Main arbitrage engine
  • Merger components - Optimize swap paths

Execution Components

  • EvmEstimatorComponent - Gas estimation
  • SwapRouterComponent - Transaction routing
  • SignersComponent - Transaction signing

Monitoring Components

  • PoolHealthMonitorComponent - Tracks pool reliability
  • MetricsRecorderComponent - Performance metrics
  • InfluxDbWriterComponent - Metrics export

Broadcasting Components

  • FlashbotsBroadcastComponent - Flashbots submission
  • PublicBroadcastComponent - Public mempool

Custom Nodes

Create specialized nodes for different use cases:

Research Node

pub struct ResearchNode;

impl<P, DB> KabuNode<P, DB> for ResearchNode {
    async fn build_components(
        &self,
        context: &KabuBuildContext<P, DB>,
    ) -> Result<Vec<BoxedComponent>> {
        let mut components = vec![];
        
        // Only market and monitoring components
        components.extend(build_market_components(context)?);
        components.extend(build_monitoring_components(context)?);
        
        // Custom research component
        components.push(Box::new(
            ResearchRecorderComponent::new()
                .with_channels(&context.channels)
        ));
        
        Ok(components)
    }
}

Minimal Production Node

pub struct MinimalProductionNode;

impl<P, DB> KabuNode<P, DB> for MinimalProductionNode {
    async fn build_components(
        &self,
        context: &KabuBuildContext<P, DB>,
    ) -> Result<Vec<BoxedComponent>> {
        let mut components = vec![];
        
        // Only essential components
        components.push(Box::new(
            StateChangeArbSearcherComponent::new(
                context.backrun_config.clone()
            )
            .with_channels(&context.channels)
            .with_market_state(context.market_state.clone())
        ));
        
        components.push(Box::new(
            SignersComponent::new()
                .with_channels(&context.channels)
        ));
        
        components.push(Box::new(
            FlashbotsBroadcastComponent::new(
                context.provider.clone(),
                context.topology_config.actors.broadcaster.clone().unwrap(),
            )
            .with_channels(&context.channels)
        ));
        
        Ok(components)
    }
}

Best Practices

  1. Resource Sharing

    • Use Arc<RwLock<_>> for shared state
    • Clone channels from MevComponentChannels
    • Pass context by reference
  2. Component Selection

    • Only include necessary components
    • Consider resource usage
    • Balance functionality vs performance
  3. Configuration

    • Use topology config for external services
    • Use backrun config for strategy parameters
    • Use pools config to control pool loading
  4. Error Handling

    • Components should handle their own errors
    • Use spawn_critical for essential components
    • Implement graceful shutdown
  5. Testing

    • Test nodes with mock components
    • Use controlled environments
    • Verify component interactions

Components

Components are the building blocks of Kabu's architecture. Each component is an independent processing unit that communicates with others through message passing.

Component Trait

All components implement the Component trait:

pub trait Component: Send + Sync + 'static {
    fn spawn(self, executor: TaskExecutor) -> Result<()>;
    fn spawn_boxed(self: Box<Self>, executor: TaskExecutor) -> Result<()>;
    fn name(&self) -> &'static str;
}

Component Lifecycle

stateDiagram-v2
    direction LR
    [*] --> New: new()
    New --> Configured: builder methods
    Configured --> Running: spawn(executor)
    Running --> [*]: shutdown/error

Builder Pattern

Components use the builder pattern for configuration. Instead of manual channel wiring, components now use centralized MevComponentChannels:

pub struct MyComponent<DB> {
    config: MyConfig,
    channels: Option<MevComponentChannels<DB>>,
}

impl<DB> MyComponent<DB> {
    pub fn new(config: MyConfig) -> Self {
        Self {
            config,
            channels: None,
        }
    }
    
    pub fn with_channels(mut self, channels: &MevComponentChannels<DB>) -> Self {
        self.channels = Some(channels.clone());
        self
    }
}

Core Components

StateChangeArbSearcherComponent

Monitors state changes and identifies arbitrage opportunities:

StateChangeArbSearcherComponent::new(backrun_config)
    .with_channels(&channels)
    .with_market_state(market_state)

SignersComponent

Manages transaction signing with nonce tracking:

SignersComponent::<DB, KabuDataTypesEthereum>::new()
    .with_channels(&channels)

FlashbotsBroadcastComponent

Submits transaction bundles to Flashbots:

FlashbotsBroadcastComponent::new(
    client.clone(),
    BroadcasterConfig::flashbots(signer),
)
.with_channels(&channels)

PoolHealthMonitorComponent

Tracks pool reliability and removes unhealthy pools:

PoolHealthMonitorComponent::new()
    .with_channels(&channels)
    .with_market(market)

Creating a Component

Here's a template for creating a new component:

use std::sync::Arc;
use tokio::sync::RwLock;
use eyre::{Result, eyre};
use kabu_core_components::{Component, MevComponentChannels};
use reth_tasks::TaskExecutor;

pub struct MyComponent<DB: Clone + Send + Sync + 'static> {
    config: MyConfig,
    channels: Option<MevComponentChannels<DB>>,
}

impl<DB> MyComponent<DB> {
    pub fn new(config: MyConfig) -> Self {
        Self {
            config,
            channels: None,
        }
    }
    
    pub fn with_channels(mut self, channels: &MevComponentChannels<DB>) -> Self {
        self.channels = Some(channels.clone());
        self
    }
}

impl<DB> Component for MyComponent<DB> 
where
    DB: DatabaseRef + Send + Sync + Clone + 'static,
{
    fn spawn(self, executor: TaskExecutor) -> Result<()> {
        let name = self.name();
        let channels = self.channels.ok_or_else(|| eyre!("channels not set"))?;
        
        // Subscribe to relevant channels
        let mut market_events = channels.market_events.subscribe();
        
        executor.spawn_critical(name, async move {
            info!("Starting {}", name);
            
            while let Ok(event) = market_events.recv().await {
                // Process events
                match event {
                    MarketEvents::BlockHeaderUpdate { .. } => {
                        // Handle block update
                    }
                    _ => {}
                }
            }
            
            info!("{} shutting down", name);
        });
        
        Ok(())
    }
    
    fn spawn_boxed(self: Box<Self>, executor: TaskExecutor) -> Result<()> {
        (*self).spawn(executor)
    }
    
    fn name(&self) -> &'static str {
        "MyComponent"
    }
}

Component Communication

Components communicate through typed channels in MevComponentChannels:

Event Types

  • MarketEvents: Block headers, state updates, pool updates
  • MempoolEvents: New transactions, transaction updates
  • SwapComposeMessage: Swap preparation, routing, execution
  • HealthEvent: Pool errors, component health

Example: Processing Swaps

// In your component
let mut swap_compose = channels.swap_compose.subscribe();

while let Ok(msg) = swap_compose.recv().await {
    match msg.inner {
        SwapComposeMessage::Prepare(data) => {
            // Prepare swap for execution
            let estimated = estimate_swap(data).await?;
            channels.estimated_swaps.send(estimated)?;
        }
        SwapComposeMessage::Ready(data) => {
            // Swap is ready for broadcast
        }
        _ => {}
    }
}

Best Practices

  1. Channel Management

    • Always use MevComponentChannels for communication
    • Handle RecvError::Closed gracefully for shutdown
    • Use bounded channels to prevent memory issues
  2. Error Handling

    match rx.recv().await {
        Ok(msg) => process(msg),
        Err(RecvError::Closed) => {
            debug!("{} channel closed, shutting down", name);
            break;
        }
        Err(RecvError::Lagged(n)) => {
            warn!("{} lagged by {} messages", name, n);
            continue;
        }
    }
  3. State Access

    • Minimize lock duration
    • Use read locks when possible
    • Clone data out of locks for processing
  4. Lifecycle

    • Components should be stateless when possible
    • Use spawn_critical for essential components
    • Log startup and shutdown
  5. Testing

    • Test component logic separately from messaging
    • Use mock channels for unit tests
    • Integration test with real channels

State management

kabu provides multiple ways to fetch the state and keep it up to date. The state can be fetched using different methods as described in the following section.

Receiving new state

kabu is allow to fetch the state using three different methods:

  • WS/IPC based: Subscribing to new events using WebSocket or IPC. For each new event a debug_trace_block call is made to get the state diff.
  • Direct DB: Subscribing to new events like before using WebSocket or IPC, but fetching the state diff directly from the DB.
  • ExEx: Subscribing to new ExEx events and reading the execution outcome from reth.

Receiving new state

Adding new state to the DB

kabu keeps all required state in-memory and optionally fetches missing state from an external database provider. The KabuDB is split in three parts to be efficient cloneable. The first part is mutable where every new or changed state will be added.

With each new block a background task will be spawned that merges all state to the inner read-only KabuDB. This inner KabuDB lives inside an Arc. The motivation is here to not wait for the merge and save costs for not cloning the whole state all the time.

The third part in a DatabaseRef to an external database provider. This is used to fetch missing state that was not prefetched. Both parts are optional e.g. for testing if the prefetched state is working correct.

Receiving new state

Tips & Tricks

This section contains various tips and tricks for working with the Kabu framework effectively.

Component Development

When developing components, keep these tips in mind:

  1. Use the Builder Pattern: Components should provide builder methods for configuration
  2. Handle Errors Gracefully: Use proper error handling and logging
  3. Test in Isolation: Write unit tests for component logic separately from the messaging infrastructure

Performance Optimization

  1. Channel Sizing: Size channels based on expected throughput
  2. Parallel Processing: Use tokio's concurrency features effectively
  3. State Management: Minimize lock contention with Arc<RwLock>

Debugging

  1. Enable Tracing: Use RUST_LOG=debug to see detailed logs
  2. Monitor Channels: Track channel depths and message flow
  3. Component Lifecycle: Log component startup and shutdown

For more specific tips, see the subsections:

Custom messages

If you need to add new messages without modifying kabu, you can easily add a custom struct like Blockchain to keep the references for states and channels.

Custom Blockchain

pub struct CustomBlockchain {
    custom_channel: Broadcaster<CustomMessage>,
}

impl CustomBlockchain {
    pub fn new() -> Self {
        Self {
            custom_channel: Broadcaster::new(10),
        }
    }
    pub fn custom_channel(&self) -> Broadcaster<CustomMessage> {
        self.custom_channel.clone()
    }
}

Custom Component

Create a component that can use your custom blockchain:

pub struct ExampleComponent {
    custom_channel_rx: Option<broadcast::Sender<CustomMessage>>,
}

impl ExampleComponent {
    pub fn new() -> Self {
        Self {
            custom_channel_rx: None,
        }
    }
    
    pub fn on_custom_bc(self, custom_bc: &CustomBlockchain) -> Self {
        Self { 
            custom_channel_rx: Some(custom_bc.custom_channel()), 
            ..self 
        }
    }
}

impl Component for ExampleComponent {
    fn spawn(self, executor: TaskExecutor) -> Result<()> {
        let mut rx = self.custom_channel_rx
            .ok_or_else(|| eyre!("custom channel not set"))?
            .subscribe();
            
        executor.spawn_critical("ExampleComponent", async move {
            while let Ok(msg) = rx.recv().await {
                // Process custom messages
            }
        });
        
        Ok(())
    }
    
    fn name(&self) -> &'static str {
        "ExampleComponent"
    }
}

Starting Custom Components

When loading your custom component, you can set the custom blockchain:

let custom_bc = CustomBlockchain::new();
let executor = TaskExecutor::new();

// Create and configure the component
let component = ExampleComponent::new()
    .on_custom_bc(&custom_bc);

// Spawn the component
component.spawn(executor)?;

Integration with KabuComponentsBuilder

For more complex setups, you can extend the component builder pattern:

impl KabuComponentsBuilder {
    pub fn with_custom_components(self, custom_bc: &CustomBlockchain) -> Self {
        let component = ExampleComponent::new()
            .on_custom_bc(custom_bc);
            
        self.add_component(Box::new(component))
    }
}

Address book

The address book contain ofter used addresses to have a convenient way to access them. It is less error-prone and easier to read.

Address types

Right now you will find TokenAddress, FactoryAddress, PeripheryAddress and other more specific address clusters for different protocols like UniswapV2PoolAddress.

Example

Just import is using the kabu or the dedicated defi-address-book crate.

use kabu::eth::address_book::TokenAddress;

let weth_address = TokenAddress::WETH;