-
Notifications
You must be signed in to change notification settings - Fork 5.7k
Open
Description
here is my example code to play with folly coroutine and reactor to emulate sqe process
#include <folly/experimental/coro/Task.h>
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Collect.h>
#include <folly/io/async/EventBase.h>
#include <queue>
#include <thread>
#include <chrono>
#include <iostream>
#include <mutex>
#include <unordered_map>
#include <atomic>
#include <vector>
#include <algorithm>
#include <iomanip>
// Forward declarations
class LatencyTrackingReactor;
// Latency tracking structure for detailed breakdown
struct LatencyMetrics {
std::chrono::steady_clock::time_point request_created;
std::chrono::steady_clock::time_point sqe_submitted;
std::chrono::steady_clock::time_point processing_started;
std::chrono::steady_clock::time_point processing_completed;
std::chrono::steady_clock::time_point coroutine_resumed;
// Calculated latencies in microseconds
double submission_latency_us() const {
if (request_created.time_since_epoch().count() == 0 || sqe_submitted.time_since_epoch().count() == 0)
return 0.0;
return std::chrono::duration<double, std::micro>(sqe_submitted - request_created).count();
}
double queue_latency_us() const {
if (sqe_submitted.time_since_epoch().count() == 0 || processing_started.time_since_epoch().count() == 0)
return 0.0;
return std::chrono::duration<double, std::micro>(processing_started - sqe_submitted).count();
}
double processing_latency_us() const {
if (processing_started.time_since_epoch().count() == 0 || processing_completed.time_since_epoch().count() == 0)
return 0.0;
return std::chrono::duration<double, std::micro>(processing_completed - processing_started).count();
}
double resume_latency_us() const {
if (processing_completed.time_since_epoch().count() == 0 || coroutine_resumed.time_since_epoch().count() == 0)
return 0.0;
return std::chrono::duration<double, std::micro>(coroutine_resumed - processing_completed).count();
}
double end_to_end_latency_us() const {
if (request_created.time_since_epoch().count() == 0 || coroutine_resumed.time_since_epoch().count() == 0)
return 0.0;
return std::chrono::duration<double, std::micro>(coroutine_resumed - request_created).count();
}
};
// Submission Queue Entry with latency tracking
struct SQE {
uint64_t request_id;
std::string path;
size_t size;
off_t offset;
std::chrono::steady_clock::time_point submit_time;
SQE(uint64_t id, const std::string& p, size_t s, off_t o)
: request_id(id), path(p), size(s), offset(o), submit_time(std::chrono::steady_clock::now()) {}
};
// Completion Queue Entry
struct CQE {
uint64_t request_id;
int result;
std::chrono::steady_clock::time_point completion_time;
CQE(uint64_t id, int res)
: request_id(id), result(res), completion_time(std::chrono::steady_clock::now()) {}
};
// External awaitable with latency tracking
class IoAwaitable {
public:
IoAwaitable(LatencyTrackingReactor* reactor, uint64_t request_id, const std::string& path, size_t size, off_t offset);
bool await_ready() noexcept { return false; }
void await_suspend(std::coroutine_handle<> handle) noexcept;
int await_resume() noexcept;
void set_result(int result) noexcept { result_ = result; }
uint64_t get_request_id() const noexcept { return request_id_; }
LatencyMetrics get_metrics() const noexcept { return metrics_; }
private:
LatencyTrackingReactor* reactor_;
uint64_t request_id_;
std::string path_;
size_t size_;
off_t offset_;
int result_{0};
std::coroutine_handle<> handle_;
LatencyMetrics metrics_;
};
// High-performance NVMe Reactor with latency tracking
class LatencyTrackingReactor {
public:
LatencyTrackingReactor(size_t queue_depth = 1024)
: queue_depth_(queue_depth), next_request_id_(1), running_(false) {
std::cout << "LatencyTrackingReactor: Initialized with queue_depth=" << queue_depth << std::endl;
}
~LatencyTrackingReactor() {
stop();
}
void run() {
std::cout << "LatencyTrackingReactor: Starting reactor loop" << std::endl;
running_ = true;
// Start the polling task
schedule_reactor_poll();
// Run the reactor loop forever
event_base_.loopForever();
std::cout << "LatencyTrackingReactor: Reactor loop ended" << std::endl;
}
void stop() {
running_ = false;
event_base_.terminateLoopSoon();
}
folly::EventBase* getExecutor() const {
return const_cast<folly::EventBase*>(&event_base_);
}
uint64_t generate_request_id() {
return next_request_id_++;
}
// Submit I/O request to SQE
void submit_io_request(uint64_t request_id, const std::string& path, size_t size, off_t offset, std::coroutine_handle<> handle) {
auto now = std::chrono::steady_clock::now();
std::cout << "DEBUG: submit_io_request called for request #" << request_id << std::endl;
{
std::lock_guard<std::mutex> lock(sqe_mutex_);
submission_queue_.emplace(request_id, path, size, offset);
pending_handles_[request_id] = handle;
}
// Track submission latency and increment counter
{
std::lock_guard<std::mutex> lock(cqe_mutex_);
latency_metrics_[request_id].sqe_submitted = now;
total_submitted_++;
}
std::cout << "DEBUG: submitted request #" << request_id << ", total submitted: " << total_submitted_ << std::endl;
}
// Get completed I/O result and metrics
std::pair<int, LatencyMetrics> get_io_result_with_metrics(uint64_t request_id) {
std::lock_guard<std::mutex> lock(cqe_mutex_);
auto result_it = completion_results_.find(request_id);
auto metrics_it = latency_metrics_.find(request_id);
if (result_it != completion_results_.end() && metrics_it != latency_metrics_.end()) {
int result = result_it->second;
LatencyMetrics metrics = metrics_it->second;
completion_results_.erase(result_it);
latency_metrics_.erase(metrics_it);
return {result, metrics};
}
return {-1, {}};
}
void record_request_created(uint64_t request_id) {
std::lock_guard<std::mutex> lock(cqe_mutex_);
latency_metrics_[request_id].request_created = std::chrono::steady_clock::now();
}
void record_coroutine_resumed(uint64_t request_id) {
std::lock_guard<std::mutex> lock(cqe_mutex_);
auto it = latency_metrics_.find(request_id);
if (it != latency_metrics_.end()) {
it->second.coroutine_resumed = std::chrono::steady_clock::now();
total_resumed_++;
}
}
size_t get_pending_count() const {
std::lock_guard<std::mutex> lock(sqe_mutex_);
return submission_queue_.size();
}
size_t get_completed_count() const {
std::lock_guard<std::mutex> lock(cqe_mutex_);
return completion_results_.size();
}
// Debug counters
size_t get_total_submitted() const {
std::lock_guard<std::mutex> lock(cqe_mutex_);
return total_submitted_;
}
size_t get_total_resumed() const {
std::lock_guard<std::mutex> lock(cqe_mutex_);
return total_resumed_;
}
private:
void schedule_reactor_poll() {
if (!running_) return;
// Schedule the actual polling work on the reactor's EventBase
event_base_.runInEventBaseThread([this]() {
poll_io_completion();
// Reschedule immediately for busy polling
if (running_) {
schedule_reactor_poll();
}
});
}
void poll_io_completion() {
static size_t loop_count = 0;
static size_t processed_requests = 0;
loop_count++;
// Process ALL pending I/O requests in one batch for maximum throughput
constexpr size_t MAX_COMPLETIONS_PER_POLL = 4096;
std::vector<SQE> completed_requests;
{
std::lock_guard<std::mutex> lock(sqe_mutex_);
size_t completions = 0;
size_t initial_queue_size = submission_queue_.size();
if (initial_queue_size > 0) {
std::cout << "DEBUG: poll_io_completion found " << initial_queue_size << " requests in queue" << std::endl;
}
// Take up to MAX_COMPLETIONS_PER_POLL requests from SQE
while (!submission_queue_.empty() && completions < MAX_COMPLETIONS_PER_POLL) {
completed_requests.push_back(submission_queue_.front());
submission_queue_.pop();
completions++;
}
if (completions > 0) {
std::cout << "DEBUG: took " << completions << " requests from SQE for processing" << std::endl;
}
}
// Process all requests with latency tracking
auto processing_start = std::chrono::steady_clock::now();
for (const auto& sqe : completed_requests) {
// Record processing start time
{
std::lock_guard<std::mutex> lock(cqe_mutex_);
auto it = latency_metrics_.find(sqe.request_id);
if (it != latency_metrics_.end()) {
it->second.processing_started = processing_start;
}
}
// Simulate I/O completion (immediate for latency testing)
int result = static_cast<int>(sqe.size);
auto processing_end = std::chrono::steady_clock::now();
// Add to completion queue with latency tracking
{
std::lock_guard<std::mutex> lock(cqe_mutex_);
completion_queue_.emplace(sqe.request_id, result);
completion_results_[sqe.request_id] = result;
// Record processing completion time
auto it = latency_metrics_.find(sqe.request_id);
if (it != latency_metrics_.end()) {
it->second.processing_completed = processing_end;
}
}
processed_requests++;
}
// Poll CQE and resume ALL coroutines at once
std::vector<CQE> cqe_completions;
{
std::lock_guard<std::mutex> lock(cqe_mutex_);
while (!completion_queue_.empty()) {
cqe_completions.push_back(completion_queue_.front());
completion_queue_.pop();
}
}
// Resume all coroutines for completed requests
for (const auto& cqe : cqe_completions) {
std::coroutine_handle<> handle;
{
std::lock_guard<std::mutex> lock(sqe_mutex_);
auto it = pending_handles_.find(cqe.request_id);
if (it != pending_handles_.end()) {
handle = it->second;
pending_handles_.erase(it);
}
}
if (handle && !handle.done()) {
handle.resume();
}
}
// More frequent status logging for debugging (every 1000 iterations for small test)
if (loop_count % 1000 == 0) {
std::cout << "Reactor poll #" << loop_count
<< " - pending: " << get_pending_count()
<< ", processed: " << processed_requests
<< ", completed_requests.size(): " << completed_requests.size() << std::endl;
}
}
const size_t queue_depth_;
std::atomic<uint64_t> next_request_id_;
std::atomic<bool> running_;
// Folly EventBase reactor
folly::EventBase event_base_;
// Submission Queue (SQE)
std::queue<SQE> submission_queue_;
std::unordered_map<uint64_t, std::coroutine_handle<>> pending_handles_;
mutable std::mutex sqe_mutex_;
// Completion Queue (CQE)
std::queue<CQE> completion_queue_;
std::unordered_map<uint64_t, int> completion_results_;
mutable std::mutex cqe_mutex_;
// Latency tracking
std::unordered_map<uint64_t, LatencyMetrics> latency_metrics_;
// Debug counters
size_t total_submitted_ = 0;
size_t total_resumed_ = 0;
};
// IoAwaitable implementation
IoAwaitable::IoAwaitable(LatencyTrackingReactor* reactor, uint64_t request_id, const std::string& path, size_t size, off_t offset)
: reactor_(reactor), request_id_(request_id), path_(path), size_(size), offset_(offset) {
reactor_->record_request_created(request_id);
}
void IoAwaitable::await_suspend(std::coroutine_handle<> handle) noexcept {
handle_ = handle;
reactor_->submit_io_request(request_id_, path_, size_, offset_, handle);
}
int IoAwaitable::await_resume() noexcept {
// Record coroutine resume time BEFORE retrieving metrics
reactor_->record_coroutine_resumed(request_id_);
auto [result, metrics] = reactor_->get_io_result_with_metrics(request_id_);
metrics_ = metrics;
return result;
}
// High-level async read function with latency tracking
folly::coro::Task<std::pair<std::string, LatencyMetrics>> async_read_with_latency(LatencyTrackingReactor& reactor, const std::string& path, size_t size, off_t offset = 0) {
uint64_t request_id = reactor.generate_request_id();
// Create awaitable and submit I/O
IoAwaitable awaitable(&reactor, request_id, path, size, offset);
int bytes_read = co_await awaitable;
LatencyMetrics metrics = awaitable.get_metrics();
if (bytes_read > 0) {
std::string result(bytes_read, 'A' + (request_id % 26));
co_return std::make_pair(result, metrics);
} else {
co_return std::make_pair(std::string{}, metrics);
}
}
// Latency statistics calculator
struct LatencyStats {
double min_us, max_us, avg_us, p50_us, p95_us, p99_us;
LatencyStats(const std::vector<double>& latencies) {
if (latencies.empty()) {
min_us = max_us = avg_us = p50_us = p95_us = p99_us = 0.0;
return;
}
std::vector<double> sorted = latencies;
std::sort(sorted.begin(), sorted.end());
min_us = sorted.front();
max_us = sorted.back();
double sum = 0.0;
for (double val : sorted) sum += val;
avg_us = sum / sorted.size();
p50_us = sorted[sorted.size() * 50 / 100];
p95_us = sorted[sorted.size() * 95 / 100];
p99_us = sorted[sorted.size() * 99 / 100];
}
};
void print_latency_breakdown(const std::vector<LatencyMetrics>& metrics) {
std::cout << "\n=== LATENCY BREAKDOWN ANALYSIS ===" << std::endl;
std::cout << "Total requests: " << metrics.size() << std::endl;
// Extract latency components
std::vector<double> submission_latencies, queue_latencies, processing_latencies, resume_latencies, e2e_latencies;
for (const auto& m : metrics) {
submission_latencies.push_back(m.submission_latency_us());
queue_latencies.push_back(m.queue_latency_us());
processing_latencies.push_back(m.processing_latency_us());
resume_latencies.push_back(m.resume_latency_us());
e2e_latencies.push_back(m.end_to_end_latency_us());
}
// Calculate statistics
LatencyStats submission_stats(submission_latencies);
LatencyStats queue_stats(queue_latencies);
LatencyStats processing_stats(processing_latencies);
LatencyStats resume_stats(resume_latencies);
LatencyStats e2e_stats(e2e_latencies);
// Print formatted results
std::cout << std::fixed << std::setprecision(2);
std::cout << "\nSubmission Latency (request -> SQE):" << std::endl;
std::cout << " Min: " << submission_stats.min_us << "μs, Max: " << submission_stats.max_us << "μs, Avg: " << submission_stats.avg_us << "μs" << std::endl;
std::cout << " P50: " << submission_stats.p50_us << "μs, P95: " << submission_stats.p95_us << "μs, P99: " << submission_stats.p99_us << "μs" << std::endl;
std::cout << "\nQueue Latency (SQE -> processing):" << std::endl;
std::cout << " Min: " << queue_stats.min_us << "μs, Max: " << queue_stats.max_us << "μs, Avg: " << queue_stats.avg_us << "μs" << std::endl;
std::cout << " P50: " << queue_stats.p50_us << "μs, P95: " << queue_stats.p95_us << "μs, P99: " << queue_stats.p99_us << "μs" << std::endl;
std::cout << "\nProcessing Latency (I/O simulation):" << std::endl;
std::cout << " Min: " << processing_stats.min_us << "μs, Max: " << processing_stats.max_us << "μs, Avg: " << processing_stats.avg_us << "μs" << std::endl;
std::cout << " P50: " << processing_stats.p50_us << "μs, P95: " << processing_stats.p95_us << "μs, P99: " << processing_stats.p99_us << "μs" << std::endl;
std::cout << "\nResume Latency (completion -> coroutine resume):" << std::endl;
std::cout << " Min: " << resume_stats.min_us << "μs, Max: " << resume_stats.max_us << "μs, Avg: " << resume_stats.avg_us << "μs" << std::endl;
std::cout << " P50: " << resume_stats.p50_us << "μs, P95: " << resume_stats.p95_us << "μs, P99: " << resume_stats.p99_us << "μs" << std::endl;
std::cout << "\nEnd-to-End Latency (total):" << std::endl;
std::cout << " Min: " << e2e_stats.min_us << "μs, Max: " << e2e_stats.max_us << "μs, Avg: " << e2e_stats.avg_us << "μs" << std::endl;
std::cout << " P50: " << e2e_stats.p50_us << "μs, P95: " << e2e_stats.p95_us << "μs, P99: " << e2e_stats.p99_us << "μs" << std::endl;
std::cout << "\n=== PERFORMANCE METRICS ===" << std::endl;
std::cout << "Average end-to-end latency: " << e2e_stats.avg_us << "μs" << std::endl;
std::cout << "95th percentile latency: " << e2e_stats.p95_us << "μs" << std::endl;
std::cout << "99th percentile latency: " << e2e_stats.p99_us << "μs" << std::endl;
}
int main() {
try {
std::cout << "=== FOLLY REACTOR LATENCY BREAKDOWN DEMO ===" << std::endl;
std::cout << "Starting high-performance latency analysis with 50,000 coroutines" << std::endl;
// Create reactor
LatencyTrackingReactor reactor(2048);
// Start reactor on separate thread
std::thread reactor_thread([&reactor]() {
reactor.run();
});
// Give reactor time to start
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// Create many concurrent I/O operations
constexpr size_t NUM_IOS = 10;
std::cout << "Creating " << NUM_IOS << " concurrent I/O operations..." << std::endl;
std::vector<folly::coro::Task<std::pair<std::string, LatencyMetrics>>> tasks;
tasks.reserve(NUM_IOS);
for (size_t i = 0; i < NUM_IOS; ++i) {
std::string path = "/nvme/device" + std::to_string(i % 8);
tasks.push_back(async_read_with_latency(reactor, path, 4096, i * 4096));
}
std::cout << "Starting concurrent execution..." << std::endl;
// Execute all I/O operations concurrently
auto start_time = std::chrono::steady_clock::now();
auto results = folly::coro::blockingWait(folly::coro::collectAllRange(std::move(tasks)));
auto end_time = std::chrono::steady_clock::now();
auto duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();
std::cout << "All " << NUM_IOS << " I/O operations completed in " << duration_ms << "ms" << std::endl;
// Extract latency metrics
std::vector<LatencyMetrics> all_metrics;
size_t total_bytes = 0;
for (const auto& result : results) {
total_bytes += result.first.size();
all_metrics.push_back(result.second);
}
std::cout << "Total bytes processed: " << total_bytes << " bytes (" << (total_bytes / 1024 / 1024) << " MB)" << std::endl;
// Avoid division by zero
if (duration_ms > 0) {
std::cout << "Average throughput: " << (total_bytes / 1024 / 1024) / (duration_ms / 1000.0) << " MB/s" << std::endl;
std::cout << "Average IOPS: " << (NUM_IOS * 1000) / duration_ms << " IOPS" << std::endl;
} else {
std::cout << "Average throughput: Operations completed too quickly to measure" << std::endl;
std::cout << "Average IOPS: Operations completed too quickly to measure" << std::endl;
}
// Print debug counters
std::cout << "\n=== DEBUG COUNTERS ===" << std::endl;
std::cout << "Total submitted: " << reactor.get_total_submitted() << std::endl;
std::cout << "Total resumed: " << reactor.get_total_resumed() << std::endl;
std::cout << "Missing resumes: " << (reactor.get_total_submitted() - reactor.get_total_resumed()) << std::endl;
// Print detailed latency breakdown
print_latency_breakdown(all_metrics);
std::cout << "\n=== FINAL REACTOR STATE ===" << std::endl;
std::cout << "Final pending count: " << reactor.get_pending_count() << std::endl;
std::cout << "Final completed count: " << reactor.get_completed_count() << std::endl;
std::cout << "\nStopping reactor..." << std::endl;
reactor.stop();
// Wait for reactor thread to finish
reactor_thread.join();
std::cout << "Latency analysis completed successfully!" << std::endl;
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
return 1;
}
return 0;
}
if only 10 requests, latency is OK
=== LATENCY BREAKDOWN ANALYSIS ===
Total requests: 10
Submission Latency (request -> SQE):
Min: 0.21μs, Max: 1.47μs, Avg: 0.75μs
P50: 0.57μs, P95: 1.47μs, P99: 1.47μs
Queue Latency (SQE -> processing):
Min: 17.70μs, Max: 67.55μs, Avg: 49.21μs
P50: 50.54μs, P95: 67.55μs, P99: 67.55μs
Processing Latency (I/O simulation):
Min: 0.36μs, Max: 1.51μs, Avg: 0.68μs
P50: 0.65μs, P95: 1.51μs, P99: 1.51μs
Resume Latency (completion -> coroutine resume):
Min: 8.38μs, Max: 348.13μs, Avg: 176.02μs
P50: 200.74μs, P95: 348.13μs, P99: 348.13μs
End-to-End Latency (total):
Min: 58.84μs, Max: 397.17μs, Avg: 226.66μs
P50: 234.84μs, P95: 397.17μs, P99: 397.17μs
=== PERFORMANCE METRICS ===
Average end-to-end latency: 226.66μs
95th percentile latency: 397.17μs
99th percentile latency: 397.17μs
=== FINAL REACTOR STATE ===
Final pending count: 0
Final completed count: 0
Stopping reactor...
LatencyTrackingReactor: Reactor loop ended
Latency analysis completed successfully!
if we run 50k requests, latency is very bad on resume part. this is schedule problem? any way to improve?
(folly) root@salab-smcamd01:/mnt/nvme2n1d/wayne/follyplay/build# ./latency_demo
=== FOLLY REACTOR LATENCY BREAKDOWN DEMO ===
LatencyTrackingReactor: Initialized with queue_depth=2048
LatencyTrackingReactor: Starting reactor loop
Reactor poll #50000 - pending: 0, processed: 0, completed_requests.size(): 0
Reactor poll #100000 - pending: 0, processed: 0, completed_requests.size(): 0
Reactor poll #150000 - pending: 0, processed: 0, completed_requests.size(): 0
Reactor poll #200000 - pending: 0, processed: 0, completed_requests.size(): 0
Starting high-performance latency analysis with 50000 coroutines
Creating 50000 concurrent I/O operations...
Starting concurrent execution...
Reactor poll #250000 - pending: 0, processed: 0, completed_requests.size(): 0
Reactor poll #300000 - pending: 0, processed: 50000, completed_requests.size(): 0
Reactor poll #350000 - pending: 0, processed: 50000, completed_requests.size(): 0
Reactor poll #400000 - pending: 0, processed: 50000, completed_requests.size(): 0
All 50000 I/O operations completed in 274ms
Total bytes processed: 204800000 bytes (195 MB)
Average throughput: 711.679 MB/s
Average IOPS: 182481 IOPS
=== DEBUG COUNTERS ===
Total submitted: 50000
Total resumed: 50000
Missing resumes: 0
=== LATENCY BREAKDOWN ANALYSIS ===
Total requests: 50000
Submission Latency (request -> SQE):
Min: 0.08μs, Max: 1835.43μs, Avg: 0.60μs
P50: 0.10μs, P95: 3.04μs, P99: 4.64μs
Queue Latency (SQE -> processing):
Min: 0.65μs, Max: 19892.17μs, Avg: 6798.51μs
P50: 5212.95μs, P95: 17793.28μs, P99: 19489.11μs
Processing Latency (I/O simulation):
Min: 0.03μs, Max: 7769.89μs, Avg: 1637.70μs
P50: 1075.51μs, P95: 4925.02μs, P99: 7071.74μs
Resume Latency (completion -> coroutine resume):
Min: 82898.25μs, Max: 164352.14μs, Avg: 123682.17μs
P50: 123375.34μs, P95: 158832.19μs, P99: 162984.48μs
End-to-End Latency (total):
Min: 82906.04μs, Max: 177379.23μs, Avg: 132118.98μs
P50: 130851.37μs, P95: 173396.01μs, P99: 176834.68μs
=== PERFORMANCE METRICS ===
Average end-to-end latency: 132118.98μs
95th percentile latency: 173396.01μs
99th percentile latency: 176834.68μs
=== FINAL REACTOR STATE ===
Final pending count: 0
Final completed count: 0
Stopping reactor...
LatencyTrackingReactor: Reactor loop ended
Latency analysis completed successfully!
(folly) root@salab-smcamd01:/mnt/nvme2n1d/wayne/follyplay/build#
Metadata
Metadata
Assignees
Labels
No labels