Kabu Documentation
Documentation for the Kabu MEV bot
What is Kabu?
Kabu is a backrunning bot, currently under heavy development. It continues the journey of loom. Since then many breaking changes have been made to revm, reth and alloy. The goal here is to make everything work again and modernize the codebase. Currently, Kabu is a work in progress and not yet ready for production use.
Who is Kabu for?
For everyone that does not like to reinvent the wheel all the time. Have foundation to work with, extend it, rewrite it or use it as playground to learn about MEV and backrunning.
Kabu is opinionated
- Kabu will only support exex and json-rpc
- We reuse as much as possible from reth, alloy and revm
- We keep as close as possible to the architecture of reth
Why "Kabu"?
In Japanese, kabu (株) means "stock" — both in the financial sense and as a metaphor for growth.
High level architecture
The kabu framework is using the alloy type system and has a deep integration with reth to receive events.
Getting Started
Prerequisites
- Rust (latest stable)
- Optional: PostgreSQL (for database)
- Optional: InfluxDB (for metrics)
- RPC node (e.g., your own node, Alchemy, Infura)
Building Kabu
1. Clone the repository
git clone https://github.com/cakevm/kabu.git
cd kabu
2. Build the project
# Development build
make
# Release build (optimized)
make release
# Maximum performance build
make maxperf
Configuration
1. Create configuration file
cp config.example.toml config.toml
2. Edit config.toml
[clients.local]
url = "ws://localhost:8545" # Your node endpoint
[database]
url = "postgresql://kabu:kabu@localhost/kabu"
[actors.signers]
# Add your signer configuration
[actors.broadcaster]
flashbots_signer = "0x..." # Your flashbots signer
3. Set up environment variables
Create a .env
file:
# .env file
DATABASE_URL=postgresql://kabu:kabu@localhost/kabu
MAINNET_WS=wss://eth-mainnet.g.alchemy.com/v2/YOUR_API_KEY
DATA=your_encrypted_private_key # Optional: for signing
Database Setup (Optional)
1. Create the database and user
sudo -u postgres psql
CREATE DATABASE kabu;
CREATE USER kabu WITH PASSWORD 'kabu';
\c kabu;
CREATE SCHEMA kabu AUTHORIZATION kabu;
GRANT ALL PRIVILEGES ON DATABASE kabu TO kabu;
ALTER ROLE kabu SET search_path TO kabu, public;
\q
2. Set environment variables
# Create .env file
echo "DATABASE_URL=postgresql://kabu:kabu@localhost/kabu" >> .env
3. Run migrations
Migrations will run automatically on first startup, or manually:
diesel migration run --database-url postgresql://kabu:kabu@localhost/kabu
Running Kabu
As a standalone application
# Run with remote node
cargo run --bin kabu -- remote --kabu-config config.toml
# Run with local Reth node
cargo run --bin kabu -- node --kabu-config config.toml
As a Reth ExEx (Execution Extension)
# Start Reth with Kabu ExEx
reth node \
--chain mainnet \
--datadir ./reth-data \
--kabu-config config.toml
For testing with backtest runner
# Run specific test
cargo run --bin kabu-backtest-runner -- \
--config ./testing/backtest-runner/test_18567709.toml
# Run all tests
make swap-test-all
Development Tools
Code Quality
# Format code
make fmt
# Run linter
make clippy
# Run tests
make test
# Pre-release checks (fmt, clippy, taplo, udeps)
make pre-release
# Clean unused dependencies
make udeps
Documentation
# Build the book
make book
# Test book examples
make test-book
# Serve book locally (opens at http://localhost:3000)
make serve-book
# Build API documentation
make doc
Multicaller Contract
Kabu uses a custom multicaller contract for efficient swap execution. The contract needs to be deployed to your target network. Find the contract at kabu-contract.
Private Key Management
Generate encryption password
cargo run --bin keys generate-password
Replace the generated password in ./crates/defi-entities/private.rs
:
pub const KEY_ENCRYPTION_PWD: [u8; 16] = [/* your generated password */];
Encrypt your private key
cargo run --bin keys encrypt --key 0xYOUR_PRIVATE_KEY
Use the encrypted key in the DATA
environment variable when running Kabu.
Quick Start Example
-
Clone and build:
git clone https://github.com/cakevm/kabu.git cd kabu make release
-
Configure:
cp config.example.toml config.toml # Edit config.toml with your settings
-
Run:
cargo run --bin kabu -- remote --kabu-config config.toml
Troubleshooting
- Connection issues: Ensure your RPC endpoint is accessible and supports WebSocket
- Database errors: Check PostgreSQL is running and credentials are correct
- Build failures: Run
make clean
and rebuild - Test failures: Ensure you have proper archive node access via environment variables
Examples
Quick Start with KabuBuilder
The simplest way to start a Kabu MEV bot:
use kabu::core::blockchain::{Blockchain, BlockchainState};
use kabu::core::components::{KabuBuilder, MevComponentChannels};
use kabu::core::node::{KabuBuildContext, KabuEthereumNode};
use kabu::evm::db::KabuDB;
use kabu::execution::multicaller::MulticallerSwapEncoder;
use kabu::strategy::backrun::BackrunConfig;
use kabu::types::blockchain::KabuDataTypesEthereum;
use alloy::providers::ProviderBuilder;
use reth_tasks::TaskManager;
#[tokio::main]
async fn main() -> Result<()> {
// Create provider
let provider = ProviderBuilder::new()
.on_http("http://localhost:8545")
.build()?;
// Initialize blockchain components
let blockchain = Blockchain::new(1); // Chain ID 1 for mainnet
let blockchain_state = BlockchainState::<KabuDB, KabuDataTypesEthereum>::new();
// Load configurations
let topology_config = TopologyConfig::load_from_file("config.toml")?;
let backrun_config = BackrunConfig::new_dumb(); // Simple config for testing
// Deploy or get multicaller address
let multicaller_address = "0x...".parse()?;
// Create task manager
let task_manager = TaskManager::new(tokio::runtime::Handle::current());
let task_executor = task_manager.executor();
// Build and launch
let kabu_context = KabuBuildContext::builder(
provider,
blockchain,
blockchain_state,
topology_config,
backrun_config,
multicaller_address,
None, // No database for simple example
false, // Not running as reth ExEx
)
.build();
let handle = KabuBuilder::new(kabu_context)
.node(KabuEthereumNode::default())
.build()
.launch(task_executor)
.await?;
// Wait for shutdown signal
tokio::signal::ctrl_c().await?;
handle.shutdown().await?;
Ok(())
}
Running with Reth Node
Kabu can run as a Reth Execution Extension (ExEx):
use reth::cli::Cli;
use reth::builder::NodeHandle;
fn main() -> eyre::Result<()> {
Cli::<EthereumChainSpecParser, KabuArgs>::parse().run(|builder, kabu_args| async move {
let blockchain = Blockchain::new(builder.config().chain.chain.id());
let NodeHandle { node, node_exit_future } = builder
.with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
.with_components(EthereumNode::components())
.with_add_ons(EthereumAddOns::default())
.install_exex("kabu-exex", |ctx| init_kabu_exex(ctx, blockchain))
.launch()
.await?;
// Start Kabu MEV components
let provider = node.provider.clone();
let handle = start_kabu_mev(provider, blockchain, true).await?;
// Wait for node exit
node_exit_future.await?;
Ok(())
})
}
Custom Component Example
Create a component that monitors specific pools:
use kabu_core_components::{Component, MevComponentChannels};
use kabu_types_events::MarketEvents;
use reth_tasks::TaskExecutor;
pub struct PoolMonitorComponent<DB> {
pools_to_monitor: Vec<Address>,
channels: Option<MevComponentChannels<DB>>,
}
impl<DB: Clone + Send + Sync + 'static> PoolMonitorComponent<DB> {
pub fn new(pools: Vec<Address>) -> Self {
Self {
pools_to_monitor: pools,
channels: None,
}
}
pub fn with_channels(mut self, channels: &MevComponentChannels<DB>) -> Self {
self.channels = Some(channels.clone());
self
}
}
impl<DB: Clone + Send + Sync + 'static> Component for PoolMonitorComponent<DB> {
fn spawn(self, executor: TaskExecutor) -> Result<()> {
let channels = self.channels.ok_or_else(|| eyre!("channels not set"))?;
let pools = self.pools_to_monitor;
let mut market_events = channels.market_events.subscribe();
executor.spawn_critical("PoolMonitor", async move {
while let Ok(event) = market_events.recv().await {
match event {
MarketEvents::PoolUpdate { address, .. } => {
if pools.contains(&address) {
info!("Monitored pool updated: {:?}", address);
// Trigger specific actions for monitored pools
}
}
_ => {}
}
}
});
Ok(())
}
fn spawn_boxed(self: Box<Self>, executor: TaskExecutor) -> Result<()> {
(*self).spawn(executor)
}
fn name(&self) -> &'static str {
"PoolMonitorComponent"
}
}
Custom Node Implementation
Create a minimal node with only essential components:
use kabu_core_components::{KabuNode, KabuBuildContext, BoxedComponent};
pub struct MinimalNode;
#[async_trait]
impl<P, DB> KabuNode<P, DB> for MinimalNode
where
P: Provider + Send + Sync + 'static,
DB: Database + Send + Sync + 'static,
{
async fn build_components(
&self,
context: &KabuBuildContext<P, DB>,
) -> Result<Vec<BoxedComponent>> {
let mut components = vec![];
// Add only arbitrage searcher
components.push(Box::new(
StateChangeArbSearcherComponent::new(context.backrun_config.clone())
.with_channels(&context.channels)
.with_market_state(context.market_state.clone())
));
// Add signer component
components.push(Box::new(
SignersComponent::<DB, KabuDataTypesEthereum>::new()
.with_channels(&context.channels)
));
// Add broadcaster
if let Some(broadcaster_config) = context.topology_config.actors.broadcaster.as_ref() {
components.push(Box::new(
FlashbotsBroadcastComponent::new(
context.provider.clone(),
broadcaster_config.clone(),
)
.with_channels(&context.channels)
));
}
Ok(components)
}
}
// Usage
let handle = KabuBuilder::new(kabu_context)
.node(MinimalNode)
.build()
.launch(task_executor)
.await?;
Working with State
Loading Pool State
When working with pools, ensure their state is loaded:
use kabu_types_market::{Pool, RequiredStateReader};
use kabu_defi_pools::UniswapV3Pool;
// Fetch pool data
let pool = UniswapV3Pool::fetch_pool_data(provider.clone(), pool_address).await?;
// Get required state
let state_required = pool.get_state_required()?;
// Fetch state from chain
let state_update = RequiredStateReader::<KabuDataTypesEthereum>::fetch_calls_and_slots(
provider.clone(),
state_required,
Some(block_number),
)
.await?;
// Apply to state DB
market_state.write().await.state_db.apply_geth_update(state_update);
// Add pool to market
market.write().await.add_pool(pool.into())?;
Accessing Shared State
// Read market data
let market_guard = context.market.read().await;
let pool = market_guard.get_pool(&pool_address)?;
let tokens = market_guard.tokens();
drop(market_guard); // Release lock
// Update signer state
let mut signers = context.channels.signers.write().await;
signers.add_privkey(private_key);
Testing Components
#![allow(unused)] fn main() { #[cfg(test)] mod tests { use super::*; use tokio::sync::broadcast; #[tokio::test] async fn test_component_startup() { // Create test channels let channels = MevComponentChannels::default(); // Create component let component = MyComponent::new(MyConfig::default()) .with_channels(&channels); // Create test executor let task_manager = TaskManager::new(tokio::runtime::Handle::current()); let executor = task_manager.executor(); // Spawn component assert!(component.spawn(executor).is_ok()); // Send test event channels.market_events.send(MarketEvents::BlockHeaderUpdate { block_number: 100, block_hash: Default::default(), timestamp: 0, base_fee: U256::ZERO, next_base_fee: U256::ZERO, }).unwrap(); // Allow processing tokio::time::sleep(Duration::from_millis(100)).await; // Shutdown task_manager.shutdown().await; } } }
Complete Example: Custom Arbitrage Bot
See the testing/backtest-runner
for a complete example that:
- Initializes providers and blockchain state
- Loads pools and their state
- Configures components
- Processes historical blocks
- Tracks arbitrage opportunities
Key patterns from the backtest runner:
- Uses
MevComponentChannels
for pre-initialized channels - Loads pool state before processing
- Manually triggers events for testing
- Tracks performance metrics
Architecture
Kabu is built on a modern component-based architecture that emphasizes modularity, concurrency, and type safety. The system efficiently processes blockchain data and identifies arbitrage opportunities through a unified node design with pluggable components.
Core Design Principles
- Node-Based Architecture: Centralized node pattern with pluggable components
- Component-Based Concurrency: Each component runs independently with message-passing communication
- Type Safety: Extensive use of Rust's type system for correctness
- Modular Design: Clear separation between data types, business logic, and infrastructure
- Performance First: Optimized for low-latency arbitrage detection and execution
Node System
The new node system provides a unified way to build and launch Kabu instances:
KabuNode Trait
The KabuNode
trait defines the interface for different node implementations:
pub trait KabuNode<P, DB>: Send + Sync + 'static {
async fn build_components(
&self,
context: &KabuBuildContext<P, DB>,
) -> Result<Vec<BoxedComponent>>;
}
KabuBuilder
The KabuBuilder
provides a fluent API for constructing and launching nodes:
let handle = KabuBuilder::new(kabu_context)
.node(KabuEthereumNode::<P, DB>::default())
.build()
.launch(task_executor)
.await?;
KabuBuildContext
The build context centralizes all configuration and shared resources:
let kabu_context = KabuBuildContext::builder(
provider,
blockchain,
blockchain_state,
topology_config,
backrun_config,
multicaller_address,
db_pool,
is_exex,
)
.with_pools_config(pools_config)
.with_swap_encoder(swap_encoder)
.with_channels(mev_channels)
.build();
Component System
Components are independent processing units that:
- Implement the
Component
trait - Run as tokio tasks via
TaskExecutor
- Communicate through
MevComponentChannels
- Use builder pattern for configuration
- Support graceful shutdown
Key Components
- StateChangeArbSearcherComponent: Detects arbitrage in state changes
- SwapRouterComponent: Routes and encodes swap transactions
- SignersComponent: Manages transaction signing with nonce tracking
- FlashbotsBroadcastComponent: Submits bundles to Flashbots
- Market Components: Pool discovery and state management
- PoolHealthMonitorComponent: Tracks pool reliability
Type System Organization
Kabu's type system is split into three crates for modularity:
types/entities
Core blockchain and configuration types:
Block
,Transaction
,Account
StrategyConfig
trait and implementations- Pool loading configurations
types/market
Market structure and routing:
Token
: ERC20 with decimals, price, categoriesPool
trait: Unified pool interfaceMarket
: Registry of tokens and poolsSwapDirection
: Token pair direction in poolSwapPath
: Route through multiple pools
types/swap
Execution and profit tracking:
Swap
: Final transaction ready for executionSwapLine
: Path with amounts and gas costsSwapStep
: Single pool interaction- Calculation utilities for profit estimation
Communication Architecture
MevComponentChannels
All component communication is centralized through MevComponentChannels
:
pub struct MevComponentChannels<DB> {
// Market events
pub market_events: broadcast::Sender<MarketEvents>,
pub mempool_events: broadcast::Sender<MempoolEvents>,
// Swap processing
pub swap_compose: broadcast::Sender<SwapComposeMessage>,
pub estimated_swaps: broadcast::Sender<SwapComposeMessage>,
// Health monitoring
pub health_events: broadcast::Sender<HealthEvent>,
// Shared state
pub signers: Arc<RwLock<TxSigners>>,
pub account_state: Arc<RwLock<AccountNonceAndBalanceState>>,
}
Message Flow
The typical arbitrage detection and execution flow:
Block/Mempool Event
↓
StateChangeArbSearcher
↓
SwapCompose (prepare)
↓
Merger Components
↓
EvmEstimator
↓
SwapRouter
↓
SignersComponent
↓
FlashbotsBroadcast
Building a Kabu Instance
Using KabuEthereumNode
The standard way to create a Kabu instance:
// Create provider
let provider = ProviderBuilder::new()
.on_http(node_url)
.build()?;
// Initialize blockchain and state
let blockchain = Blockchain::new(chain_id);
let blockchain_state = BlockchainState::<KabuDB, KabuDataTypesEthereum>::new();
// Build context
let kabu_context = KabuBuildContext::builder(
provider,
blockchain,
blockchain_state,
topology_config,
backrun_config,
multicaller_address,
db_pool,
false, // is_exex
)
.build();
// Launch with KabuBuilder
let handle = KabuBuilder::new(kabu_context)
.node(KabuEthereumNode::default())
.build()
.launch(task_executor)
.await?;
// Wait for shutdown
handle.wait_for_shutdown().await?;
Custom Node Implementation
You can create custom nodes by implementing the KabuNode
trait:
pub struct CustomNode;
impl<P, DB> KabuNode<P, DB> for CustomNode
where
P: Provider + Send + Sync + 'static,
DB: Database + Send + Sync + 'static,
{
async fn build_components(
&self,
context: &KabuBuildContext<P, DB>,
) -> Result<Vec<BoxedComponent>> {
let mut components = vec![];
// Add your custom components
components.push(Box::new(
MyCustomComponent::new(context.config.clone())
.with_channels(&context.channels)
));
Ok(components)
}
}
This architecture provides:
- Unified node construction
- Centralized configuration
- Easy component composition
- Clean shutdown handling
- Flexible customization
Node System
The Kabu node system provides a unified, extensible framework for building MEV bots with different configurations and capabilities.
Overview
The node system consists of three main parts:
- KabuNode trait - Defines how to build components
- KabuBuilder - Orchestrates node construction and launch
- KabuBuildContext - Centralizes configuration and resources
KabuNode Trait
The KabuNode
trait is the core abstraction for creating different node types:
pub trait KabuNode<P, DB>: Send + Sync + 'static
where
P: Provider + Send + Sync + 'static,
DB: Database + Send + Sync + 'static,
{
async fn build_components(
&self,
context: &KabuBuildContext<P, DB>,
) -> Result<Vec<BoxedComponent>>;
}
KabuEthereumNode
The standard implementation that includes all MEV components:
pub struct KabuEthereumNode<P, DB> {
_phantom: PhantomData<(P, DB)>,
}
impl<P, DB> KabuNode<P, DB> for KabuEthereumNode<P, DB> {
async fn build_components(
&self,
context: &KabuBuildContext<P, DB>,
) -> Result<Vec<BoxedComponent>> {
// Builds market, network, strategy, execution,
// monitoring, and broadcasting components
}
}
KabuBuilder
The builder provides a fluent API for node construction:
pub struct KabuBuilder<P, DB> {
context: KabuBuildContext<P, DB>,
node: Option<Box<dyn KabuNode<P, DB>>>,
}
impl<P, DB> KabuBuilder<P, DB> {
pub fn new(context: KabuBuildContext<P, DB>) -> Self {
Self { context, node: None }
}
pub fn node(mut self, node: impl KabuNode<P, DB>) -> Self {
self.node = Some(Box::new(node));
self
}
pub fn build(self) -> BuiltKabu<P, DB> {
BuiltKabu {
context: self.context,
node: self.node.unwrap_or_else(||
Box::new(KabuEthereumNode::default())
),
}
}
}
Launching
The built node can be launched with a task executor:
impl<P, DB> BuiltKabu<P, DB> {
pub async fn launch(self, executor: TaskExecutor) -> Result<KabuHandle> {
// Build components from the node
let components = self.node.build_components(&self.context).await?;
// Spawn all components
for component in components {
component.spawn_boxed(executor.clone())?;
}
Ok(KabuHandle::new())
}
}
KabuBuildContext
The build context contains all shared resources and configuration:
pub struct KabuBuildContext<P, DB> {
// Provider and blockchain
pub provider: P,
pub blockchain: Blockchain,
pub blockchain_state: BlockchainState<DB, KabuDataTypesEthereum>,
// Communication channels
pub channels: MevComponentChannels<DB>,
// Configuration
pub topology_config: TopologyConfig,
pub backrun_config: BackrunConfig,
pub pools_config: PoolsLoadingConfig,
// Infrastructure
pub multicaller_address: Address,
pub swap_encoder: MulticallerSwapEncoder,
pub db_pool: Option<DbPool>,
// Shared state
pub market: Arc<RwLock<Market>>,
pub market_state: Arc<RwLock<MarketState<DB>>>,
pub mempool: Arc<RwLock<Mempool>>,
pub block_history: Arc<RwLock<BlockHistory>>,
pub latest_block: Arc<RwLock<LatestBlock>>,
// Flags
pub is_exex: bool,
pub enable_web_server: bool,
}
Builder Pattern
The context uses a builder for flexible initialization:
let context = KabuBuildContext::builder(
provider,
blockchain,
blockchain_state,
topology_config,
backrun_config,
multicaller_address,
db_pool,
is_exex,
)
.with_pools_config(pools_config)
.with_swap_encoder(swap_encoder)
.with_channels(channels)
.with_enable_web_server(false)
.build();
Component Selection
KabuEthereumNode builds components based on configuration:
Market Components
- HistoryPoolLoaderComponent - Loads pools from database
- ProtocolPoolLoaderComponent - Discovers new pools
- MarketStatePreloadedComponent - Initializes market state
Network Components
- WaitForNodeSyncComponent - Ensures node is synced
- BlockProcessingComponent - Processes new blocks
- MempoolProcessingComponent - Monitors mempool
Strategy Components
- StateChangeArbSearcherComponent - Main arbitrage engine
- Merger components - Optimize swap paths
Execution Components
- EvmEstimatorComponent - Gas estimation
- SwapRouterComponent - Transaction routing
- SignersComponent - Transaction signing
Monitoring Components
- PoolHealthMonitorComponent - Tracks pool reliability
- MetricsRecorderComponent - Performance metrics
- InfluxDbWriterComponent - Metrics export
Broadcasting Components
- FlashbotsBroadcastComponent - Flashbots submission
- PublicBroadcastComponent - Public mempool
Custom Nodes
Create specialized nodes for different use cases:
Research Node
pub struct ResearchNode;
impl<P, DB> KabuNode<P, DB> for ResearchNode {
async fn build_components(
&self,
context: &KabuBuildContext<P, DB>,
) -> Result<Vec<BoxedComponent>> {
let mut components = vec![];
// Only market and monitoring components
components.extend(build_market_components(context)?);
components.extend(build_monitoring_components(context)?);
// Custom research component
components.push(Box::new(
ResearchRecorderComponent::new()
.with_channels(&context.channels)
));
Ok(components)
}
}
Minimal Production Node
pub struct MinimalProductionNode;
impl<P, DB> KabuNode<P, DB> for MinimalProductionNode {
async fn build_components(
&self,
context: &KabuBuildContext<P, DB>,
) -> Result<Vec<BoxedComponent>> {
let mut components = vec![];
// Only essential components
components.push(Box::new(
StateChangeArbSearcherComponent::new(
context.backrun_config.clone()
)
.with_channels(&context.channels)
.with_market_state(context.market_state.clone())
));
components.push(Box::new(
SignersComponent::new()
.with_channels(&context.channels)
));
components.push(Box::new(
FlashbotsBroadcastComponent::new(
context.provider.clone(),
context.topology_config.actors.broadcaster.clone().unwrap(),
)
.with_channels(&context.channels)
));
Ok(components)
}
}
Best Practices
-
Resource Sharing
- Use
Arc<RwLock<_>>
for shared state - Clone channels from
MevComponentChannels
- Pass context by reference
- Use
-
Component Selection
- Only include necessary components
- Consider resource usage
- Balance functionality vs performance
-
Configuration
- Use topology config for external services
- Use backrun config for strategy parameters
- Use pools config to control pool loading
-
Error Handling
- Components should handle their own errors
- Use
spawn_critical
for essential components - Implement graceful shutdown
-
Testing
- Test nodes with mock components
- Use controlled environments
- Verify component interactions
Components
Components are the building blocks of Kabu's architecture. Each component is an independent processing unit that communicates with others through message passing.
Component Trait
All components implement the Component
trait:
pub trait Component: Send + Sync + 'static {
fn spawn(self, executor: TaskExecutor) -> Result<()>;
fn spawn_boxed(self: Box<Self>, executor: TaskExecutor) -> Result<()>;
fn name(&self) -> &'static str;
}
Component Lifecycle
stateDiagram-v2 direction LR [*] --> New: new() New --> Configured: builder methods Configured --> Running: spawn(executor) Running --> [*]: shutdown/error
Builder Pattern
Components use the builder pattern for configuration. Instead of manual channel wiring, components now use centralized MevComponentChannels
:
pub struct MyComponent<DB> {
config: MyConfig,
channels: Option<MevComponentChannels<DB>>,
}
impl<DB> MyComponent<DB> {
pub fn new(config: MyConfig) -> Self {
Self {
config,
channels: None,
}
}
pub fn with_channels(mut self, channels: &MevComponentChannels<DB>) -> Self {
self.channels = Some(channels.clone());
self
}
}
Core Components
StateChangeArbSearcherComponent
Monitors state changes and identifies arbitrage opportunities:
StateChangeArbSearcherComponent::new(backrun_config)
.with_channels(&channels)
.with_market_state(market_state)
SignersComponent
Manages transaction signing with nonce tracking:
SignersComponent::<DB, KabuDataTypesEthereum>::new()
.with_channels(&channels)
FlashbotsBroadcastComponent
Submits transaction bundles to Flashbots:
FlashbotsBroadcastComponent::new(
client.clone(),
BroadcasterConfig::flashbots(signer),
)
.with_channels(&channels)
PoolHealthMonitorComponent
Tracks pool reliability and removes unhealthy pools:
PoolHealthMonitorComponent::new()
.with_channels(&channels)
.with_market(market)
Creating a Component
Here's a template for creating a new component:
use std::sync::Arc;
use tokio::sync::RwLock;
use eyre::{Result, eyre};
use kabu_core_components::{Component, MevComponentChannels};
use reth_tasks::TaskExecutor;
pub struct MyComponent<DB: Clone + Send + Sync + 'static> {
config: MyConfig,
channels: Option<MevComponentChannels<DB>>,
}
impl<DB> MyComponent<DB> {
pub fn new(config: MyConfig) -> Self {
Self {
config,
channels: None,
}
}
pub fn with_channels(mut self, channels: &MevComponentChannels<DB>) -> Self {
self.channels = Some(channels.clone());
self
}
}
impl<DB> Component for MyComponent<DB>
where
DB: DatabaseRef + Send + Sync + Clone + 'static,
{
fn spawn(self, executor: TaskExecutor) -> Result<()> {
let name = self.name();
let channels = self.channels.ok_or_else(|| eyre!("channels not set"))?;
// Subscribe to relevant channels
let mut market_events = channels.market_events.subscribe();
executor.spawn_critical(name, async move {
info!("Starting {}", name);
while let Ok(event) = market_events.recv().await {
// Process events
match event {
MarketEvents::BlockHeaderUpdate { .. } => {
// Handle block update
}
_ => {}
}
}
info!("{} shutting down", name);
});
Ok(())
}
fn spawn_boxed(self: Box<Self>, executor: TaskExecutor) -> Result<()> {
(*self).spawn(executor)
}
fn name(&self) -> &'static str {
"MyComponent"
}
}
Component Communication
Components communicate through typed channels in MevComponentChannels
:
Event Types
- MarketEvents: Block headers, state updates, pool updates
- MempoolEvents: New transactions, transaction updates
- SwapComposeMessage: Swap preparation, routing, execution
- HealthEvent: Pool errors, component health
Example: Processing Swaps
// In your component
let mut swap_compose = channels.swap_compose.subscribe();
while let Ok(msg) = swap_compose.recv().await {
match msg.inner {
SwapComposeMessage::Prepare(data) => {
// Prepare swap for execution
let estimated = estimate_swap(data).await?;
channels.estimated_swaps.send(estimated)?;
}
SwapComposeMessage::Ready(data) => {
// Swap is ready for broadcast
}
_ => {}
}
}
Best Practices
-
Channel Management
- Always use
MevComponentChannels
for communication - Handle
RecvError::Closed
gracefully for shutdown - Use bounded channels to prevent memory issues
- Always use
-
Error Handling
match rx.recv().await { Ok(msg) => process(msg), Err(RecvError::Closed) => { debug!("{} channel closed, shutting down", name); break; } Err(RecvError::Lagged(n)) => { warn!("{} lagged by {} messages", name, n); continue; } }
-
State Access
- Minimize lock duration
- Use read locks when possible
- Clone data out of locks for processing
-
Lifecycle
- Components should be stateless when possible
- Use
spawn_critical
for essential components - Log startup and shutdown
-
Testing
- Test component logic separately from messaging
- Use mock channels for unit tests
- Integration test with real channels
State management
kabu provides multiple ways to fetch the state and keep it up to date. The state can be fetched using different methods as described in the following section.
Receiving new state
kabu is allow to fetch the state using three different methods:
- WS/IPC based: Subscribing to new events using WebSocket or IPC. For each new event a
debug_trace_block
call is made to get the state diff. - Direct DB: Subscribing to new events like before using WebSocket or IPC, but fetching the state diff directly from the DB.
- ExEx: Subscribing to new ExEx events and reading the execution outcome from reth.
Adding new state to the DB
kabu keeps all required state in-memory and optionally fetches missing state from an external database provider. The KabuDB
is split in three parts to be efficient cloneable. The first part is mutable where every new or changed state will be added.
With each new block a background task will be spawned that merges all state to the inner read-only KabuDB
. This inner KabuDB
lives inside an Arc
. The motivation is here to not wait for the merge and save costs for not cloning the whole state all the time.
The third part in a DatabaseRef
to an external database provider. This is used to fetch missing state that was not prefetched. Both parts are optional e.g. for testing if the prefetched state is working correct.
Tips & Tricks
This section contains various tips and tricks for working with the Kabu framework effectively.
Component Development
When developing components, keep these tips in mind:
- Use the Builder Pattern: Components should provide builder methods for configuration
- Handle Errors Gracefully: Use proper error handling and logging
- Test in Isolation: Write unit tests for component logic separately from the messaging infrastructure
Performance Optimization
- Channel Sizing: Size channels based on expected throughput
- Parallel Processing: Use tokio's concurrency features effectively
- State Management: Minimize lock contention with Arc<RwLock
>
Debugging
- Enable Tracing: Use
RUST_LOG=debug
to see detailed logs - Monitor Channels: Track channel depths and message flow
- Component Lifecycle: Log component startup and shutdown
For more specific tips, see the subsections:
- Custom Messages - How to extend the messaging system
- Address Book - Managing addresses and configurations
Custom messages
If you need to add new messages without modifying kabu, you can easily add a custom struct like Blockchain
to keep the references for states and channels.
Custom Blockchain
pub struct CustomBlockchain {
custom_channel: Broadcaster<CustomMessage>,
}
impl CustomBlockchain {
pub fn new() -> Self {
Self {
custom_channel: Broadcaster::new(10),
}
}
pub fn custom_channel(&self) -> Broadcaster<CustomMessage> {
self.custom_channel.clone()
}
}
Custom Component
Create a component that can use your custom blockchain:
pub struct ExampleComponent {
custom_channel_rx: Option<broadcast::Sender<CustomMessage>>,
}
impl ExampleComponent {
pub fn new() -> Self {
Self {
custom_channel_rx: None,
}
}
pub fn on_custom_bc(self, custom_bc: &CustomBlockchain) -> Self {
Self {
custom_channel_rx: Some(custom_bc.custom_channel()),
..self
}
}
}
impl Component for ExampleComponent {
fn spawn(self, executor: TaskExecutor) -> Result<()> {
let mut rx = self.custom_channel_rx
.ok_or_else(|| eyre!("custom channel not set"))?
.subscribe();
executor.spawn_critical("ExampleComponent", async move {
while let Ok(msg) = rx.recv().await {
// Process custom messages
}
});
Ok(())
}
fn name(&self) -> &'static str {
"ExampleComponent"
}
}
Starting Custom Components
When loading your custom component, you can set the custom blockchain:
let custom_bc = CustomBlockchain::new();
let executor = TaskExecutor::new();
// Create and configure the component
let component = ExampleComponent::new()
.on_custom_bc(&custom_bc);
// Spawn the component
component.spawn(executor)?;
Integration with KabuComponentsBuilder
For more complex setups, you can extend the component builder pattern:
impl KabuComponentsBuilder {
pub fn with_custom_components(self, custom_bc: &CustomBlockchain) -> Self {
let component = ExampleComponent::new()
.on_custom_bc(custom_bc);
self.add_component(Box::new(component))
}
}
Address book
The address book contain ofter used addresses to have a convenient way to access them. It is less error-prone and easier to read.
Address types
Right now you will find TokenAddress
, FactoryAddress
, PeripheryAddress
and other more specific address clusters for different protocols like UniswapV2PoolAddress
.
Example
Just import is using the kabu
or the dedicated defi-address-book
crate.
use kabu::eth::address_book::TokenAddress;
let weth_address = TokenAddress::WETH;