A high-performance, Kafka-compatible message broker written in Rust with 100% Java client compatibility and 608k+ msg/sec throughput.
- 100% Java Kafka Compatible: Complete compatibility with Java Kafka clients (apache-kafka-java 4.1+)
- Ultra High Performance: 608,272+ messages/second throughput with Arena Memory optimizations
- 20 Kafka APIs Supported: Full wire protocol compatibility with metadata, produce, consume, and admin operations
- Distributed Architecture: Leader-follower replication with Raft-like consensus
- Consumer Groups: Load balancing across multiple consumers with partition assignment
- Persistent Storage: Hybrid memory-disk storage with crash recovery
- Multi-Partition Topics: Hash-based and round-robin partition assignment strategies
- Async Architecture: Built on Tokio for high-concurrency message processing
- MegaBatch Performance: 608,272 messages/second (1MB batch size, 16 threads)
- Java Client Compatibility: 100% working with all major Java Kafka libraries
- Sequential I/O: 20-40x HDD, 5-14x SSD performance improvement
- Lock-Free Metrics: 99.9% performance recovery with optimized atomic operations
- Zero-Copy Design: Memory-mapped I/O with
bytes::Bytesfor maximum efficiency - Sub-millisecond latency: 0.019-0.030 ms/message processing time
- ✅ Java:
org.apache.kafka:kafka-clientsv4.1+ (100% compatible) - ✅ Python:
kafka-pythonlibrary support - ✅ Scala: Native Kafka Scala clients
- ✅ Admin Operations: Topic creation, deletion, metadata queries
- Broker: TCP server handling client connections
- Storage Engine: Hybrid memory-disk persistence layer
- Topic Manager: Multi-partition topic management
- Replication Coordinator: Leader-follower data replication
- Consumer Group Coordinator: Load balancing and partition assignment
- Network Protocol: Binary protocol with length-prefixed frames
- In-memory operations: Primary read/write for maximum performance
- Async disk persistence: Background writes for durability
- Memory-mapped I/O: Efficient file operations for large datasets
- Append-only logs: Sequential writes with CRC integrity checks
- Rust 1.70+ (latest stable recommended)
- Cargo package manager
git clone https://github.com/gosuda/fluxmq.git
cd fluxmq
cargo build --releasecargo run -- --host 0.0.0.0 --port 9092# For core development
cd core
cargo run --release -- --port 9092 --enable-consumer-groups --log-level info
# Or with full features
RUSTFLAGS="-C target-cpu=native" cargo run --release -- \
--port 9092 \
--enable-consumer-groups \
--data-dir ./data# Terminal 1: Broker 1
cargo run -- --port 9092 --broker-id 1 --enable-replication --data-dir ./broker1
# Terminal 2: Broker 2
cargo run -- --port 9093 --broker-id 2 --enable-replication --data-dir ./broker2
# Terminal 3: Broker 3
cargo run -- --port 9094 --broker-id 3 --enable-replication --data-dir ./broker3// Producer Example
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class FluxMQProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
// High performance settings (MegaBatch configuration)
props.put("batch.size", "1048576"); // 1MB batch
props.put("linger.ms", "15");
props.put("compression.type", "lz4");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key", "Hello FluxMQ!");
producer.send(record).get();
System.out.println("Message sent successfully!");
} finally {
producer.close();
}
}
}from kafka import KafkaProducer, KafkaConsumer
# Producer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: v.encode('utf-8')
)
producer.send('my-topic', 'Hello FluxMQ!')
producer.flush()
# Consumer
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: m.decode('utf-8')
)
for message in consumer:
print(f"Received: {message.value}")
breakuse fluxmq_client::*;
#[tokio::main]
async fn main() -> Result<()> {
let producer = ProducerBuilder::new()
.brokers(vec!["localhost:9092"])
.build()
.await?;
let record = ProduceRecord::builder()
.topic("my-topic")
.key("user-123")
.value("Hello FluxMQ!")
.build();
let metadata = producer.send(record).await?;
println!("Message sent to partition {} at offset {}",
metadata.partition, metadata.offset);
Ok(())
}use fluxmq_client::*;
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<()> {
let consumer = ConsumerBuilder::new()
.brokers(vec!["localhost:9092"])
.group_id("my-consumer-group")
.topics(vec!["my-topic"])
.build()
.await?;
let mut stream = consumer.stream();
while let Some(record) = stream.next().await {
match record {
Ok(record) => {
println!("Received: key={:?}, value={}",
record.key, String::from_utf8_lossy(&record.value));
consumer.commit_sync().await?;
}
Err(e) => eprintln!("Error receiving message: {}", e),
}
}
Ok(())
}# Terminal 1: Start FluxMQ broker
cd core
RUSTFLAGS="-C target-cpu=native" cargo run --release -- --port 9092 --enable-consumer-groups
# Terminal 2: Run Java benchmark (601k+ msg/sec)
cd fluxmq-java-tests
mvn exec:java -Dexec.mainClass="com.fluxmq.tests.MegaBatchBenchmark"
# Terminal 3: Run simple Java test
mvn exec:java -Dexec.mainClass="com.fluxmq.tests.MinimalProducerTest"
# Or try Rust examples
cd fluxmq-client
cargo run --example simple_producer
cargo run --example simple_consumerUSAGE:
fluxmq [OPTIONS]
OPTIONS:
--host <HOST> Bind address [default: 0.0.0.0]
-p, --port <PORT> Port to listen on [default: 9092]
-l, --log-level <LOG_LEVEL> Log level [default: info]
--broker-id <BROKER_ID> Unique broker identifier [default: 0]
--enable-replication Enable replication features
--enable-consumer-groups Enable consumer group coordination
--recovery-mode Load existing data from disk on startup
--data-dir <DATA_DIR> Data storage directory [default: ./data]RUST_LOG=debug # Enable debug logging
FLUXMQ_DATA_DIR=/var/lib/fluxmq # Override data directorycargo testcargo test storage # Storage layer tests
cargo test consumer # Consumer group tests
cargo test replication # Replication tests
cargo test protocol # Protocol tests# One-click benchmark execution with file logging
./run_benchmark.sh
# Monitor benchmark progress
./monitor_logs.sh
# Clean up processes and logs
./cleanup.sh# 1. Start FluxMQ server
env RUSTFLAGS="-C target-cpu=native" ./target/release/fluxmq --port 9092 --enable-consumer-groups --log-level info > fluxmq_server.log 2>&1 &
# 2. Run Java benchmark with file logging
cd fluxmq-java-tests
java -cp "target/classes:target/dependency/*" com.fluxmq.tests.MegaBatchBenchmark localhost:9092 > java_benchmark_$(date +%Y%m%d_%H%M%S).log 2>&1 &
# 3. Monitor results
tail -f java_benchmark_*.logcargo test --release -- --ignored benchmark📖 For detailed benchmark execution guide: See BENCHMARK_EXECUTION_GUIDE.md
src/
├── main.rs # Application entry point
├── lib.rs # Library root
├── broker/ # Broker implementation
│ ├── handler.rs # Request handlers
│ └── server.rs # TCP server
├── storage/ # Storage layer
│ ├── log.rs # Append-only log files
│ ├── segment.rs # Log segment management
│ └── index.rs # Offset indexing
├── protocol/ # Network protocol
│ ├── messages.rs # Protocol messages
│ ├── codec.rs # Server-side codec
│ └── client_codec.rs # Client-side codec
├── replication/ # Replication system
│ ├── leader.rs # Leader state management
│ └── follower.rs # Follower synchronization
├── consumer/ # Consumer groups
│ └── coordinator.rs # Group coordinator
└── topic_manager.rs # Topic management
# Install Rust
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# Install development dependencies
cargo install cargo-audit cargo-clippy# Format code
cargo fmt
# Check for issues
cargo clippy
# Security audit
cargo audit
# Watch for changes
cargo watch -x check -x test- 100% Java Kafka Client Compatibility (apache-kafka-java 4.1+)
- 601k+ msg/sec Performance with MegaBatch optimization
- 20 Kafka APIs Implemented (Metadata, Produce, Fetch, Consumer Groups, Admin)
- Sequential I/O Optimization (20-40x HDD, 5-14x SSD improvement)
- Lock-Free Metrics System with atomic operations
- Ultra-Performance Storage (Memory-mapped I/O, SIMD processing)
- Enterprise Security (TLS/SSL, ACL, SASL authentication)
- Leader-Follower Replication with Raft-like consensus
- Advanced monitoring dashboard
- Kubernetes operator development
- Schema registry integration
- Additional client SDK support
- Log compaction
- Schema registry integration
- Kubernetes operator
- Web-based management UI
We welcome contributions! Please see CONTRIBUTING.md for guidelines.
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests
- Run the test suite
- Submit a pull request
This project is licensed under the MIT License - see the LICENSE file for details.
- Inspired by Apache Kafka's architecture
- Built with the amazing Rust ecosystem
- Special thanks to the Tokio team for async runtime
- 🐛 Issues: GitHub Issues
- 💬 Discussions: GitHub Discussions
- 📧 Email: hsng95@gmail.com
FluxMQ - High-performance message streaming, built with Rust ⚡️