Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions include/tpch/async_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,26 @@ class AsyncIOContext {
*/
int queued_count() const;

/**
* Submit all queued SQEs and wait for at least min_complete CQEs in a single syscall.
* More efficient than separate submit_queued() + wait_completions() for the common
* "submit one chunk and wait for oldest" pattern.
*
* @param min_complete Minimum number of CQEs to wait for (default 1)
* @return Number of CQEs drained
*/
int submit_and_wait(int min_complete = 1);

/**
* Probe the block device backing `path` via sysfs to determine an optimal
* io_uring queue depth. Returns nr_requests/2 clamped to [8, 128].
* Falls back to 64 when sysfs is unavailable (e.g. tmpfs, WSL2 /tmp).
*
* @param path Any path on the target filesystem (file need not exist yet)
* @return Calibrated queue depth
*/
static uint32_t calibrate_queue_depth(const char* path);

/**
* Set callback for completion events.
*
Expand Down Expand Up @@ -211,6 +231,8 @@ class AsyncIOContext {
void queue_write(int fd, const void* buf, std::size_t count, off_t offset, std::uint64_t user_data);
int submit_queued() { return 0; }
int queued_count() const { return queued_; }
int submit_and_wait(int min_complete = 1) { (void)min_complete; return 0; }
static uint32_t calibrate_queue_depth(const char* path) { (void)path; return 64; }
void set_completion_callback(CompletionCallback cb) { (void)cb; }
int process_completions() { return 0; }
void register_buffers(const std::vector<iovec>& buffers) { (void)buffers; }
Expand Down
13 changes: 12 additions & 1 deletion include/tpch/orc_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,27 @@ class ORCWriter : public WriterInterface {
*/
void close() override;

/**
* Enable io_uring-backed output stream for ORC disk writes (Linux only).
* Replaces the default writeLocalFile() stream with a double-buffered io_uring writer.
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ORCWriter io_uring docstring says the stream is "double-buffered", but OrcIoUringStream uses an 8-buffer staging ring. Update the comment so it reflects the actual design.

Suggested change
* Replaces the default writeLocalFile() stream with a double-buffered io_uring writer.
* Replaces the default writeLocalFile() stream with an io_uring writer that uses an
* 8-buffer staging ring for multi-buffered asynchronous writes.

Copilot uses AI. Check for mistakes.
* Must be called before the first write_batch().
*
* @param enable true to enable io_uring write path
*/
void enable_io_uring(bool enable);

private:
std::string filepath_;
std::shared_ptr<arrow::RecordBatch> first_batch_;
bool schema_locked_ = false;
bool use_io_uring_ = false;

// Opaque ORC implementations (void* to avoid exposing ORC headers)
// In the implementation file, these are cast to their actual types
void* orc_writer_; // orc::Writer*
void* orc_output_stream_; // unique_ptr<OrcFile::OutStream>
void* orc_output_stream_; // unique_ptr<OrcFile::OutStream> (synchronous path)
void* orc_type_; // unique_ptr<orc::Type>
void* orc_io_uring_stream_; // OrcIoUringStream* (io_uring path)
};

} // namespace tpch
Expand Down
10 changes: 10 additions & 0 deletions include/tpch/parquet_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ class ParquetWriter : public WriterInterface {
*/
void enable_streaming_write(bool use_threads = true);

/**
* Enable io_uring-backed output stream for streaming writes (Linux only).
* Only effective when streaming mode is also enabled via enable_streaming_write().
* Replaces the default FileOutputStream with an io_uring double-buffered writer.
*
* @param enable true to enable io_uring write path
*/
Comment on lines +96 to +99
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ParquetWriter io_uring docstring says the implementation is "double-buffered", but IoUringOutputStream is implemented with an 8-buffer staging ring. Update the comment to match the actual buffering strategy to avoid misleading future maintainers.

Suggested change
* Replaces the default FileOutputStream with an io_uring double-buffered writer.
*
* @param enable true to enable io_uring write path
*/
* Replaces the default FileOutputStream with an io_uring-backed writer that uses
* an 8-buffer staging ring for asynchronous writes.
*
* @param enable true to enable io_uring write path
*/

Copilot uses AI. Check for mistakes.
void enable_io_uring(bool enable);

private:
std::string filepath_;
std::shared_ptr<arrow::RecordBatch> first_batch_;
Expand All @@ -107,6 +116,7 @@ class ParquetWriter : public WriterInterface {
// Streaming write mode (Phase 14.2)
bool streaming_mode_ = false;
bool use_threads_ = true;
bool use_io_uring_ = false;
std::unique_ptr<parquet::arrow::FileWriter> parquet_file_writer_;

// Initialize the Parquet FileWriter for streaming mode
Expand Down
161 changes: 152 additions & 9 deletions src/async/io_uring_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,123 @@
#include <stdexcept>
#include <cstring>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/sysmacros.h>
#include <climits>
#include <cstdio>
#include <algorithm>
#include <mutex>
#include <string>

namespace tpch {

// ── Sysfs queue-depth calibration ────────────────────────────────────────────
//
// Mirror of the Rust sysfs_calibrate_qd() in io_uring_store.rs:
// 1. stat(path_or_ancestor) → st_dev → major:minor
// 2. realpath(/sys/dev/block/MAJOR:MINOR) → canonical sysfs dir
// 3. Walk up looking for queue/nr_requests
// 4. Return nr_requests/2 clamped to [8, 128]; fallback 64

static uint32_t sysfs_calibrate_qd(const char* path) {
struct stat st;

// Walk up to an existing ancestor (target file may not exist yet).
std::string p = path ? path : "/";
while (!p.empty() && stat(p.c_str(), &st) != 0) {
size_t pos = p.rfind('/');
if (pos == 0) { p = "/"; break; }
if (pos == std::string::npos) break;
p.resize(pos);
}
if (stat(p.c_str(), &st) != 0) return 64;

// Build /sys/dev/block/MAJOR:MINOR symlink path.
char link[256];
snprintf(link, sizeof(link), "/sys/dev/block/%u:%u",
major(st.st_dev), minor(st.st_dev));

char canon[PATH_MAX];
if (realpath(link, canon) == nullptr) return 64;
Comment on lines +45 to +46
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io_uring_context.cpp calls realpath() but does not include the header that declares it (<stdlib.h> / ). This can fail to compile on stricter toolchains; add the appropriate include explicitly.

Copilot uses AI. Check for mistakes.

// Walk up from canonical sysfs path to find queue/nr_requests.
std::string cur = canon;
while (cur.size() > 1) {
std::string nr_req_path = cur + "/queue/nr_requests";
FILE* f = fopen(nr_req_path.c_str(), "r");
if (f) {
unsigned int n = 0;
fscanf(f, "%u", &n);
fclose(f);
if (n > 0) {
uint32_t qd = static_cast<uint32_t>(n) / 2;
if (qd < 8) qd = 8;
if (qd > 128) qd = 128;
return qd;
}
}
size_t slash = cur.rfind('/');
if (slash == std::string::npos || slash == 0) break;
cur.resize(slash);
}
return 64; // fallback: /tmp or other pseudo-fs
}

// ── Process-global anchor ring (IORING_SETUP_ATTACH_WQ) ─────────────────────
//
// All rings after the first one attach to this anchor's kernel async-worker
// thread pool, reducing scheduler pressure on WSL2/Hyper-V IOThread.
// The anchor io_uring is intentionally leaked (never freed) so its fd stays
// open for the process lifetime.

static io_uring* g_anchor_ring = nullptr;
static int g_anchor_fd = -1;
static std::once_flag g_anchor_once;

static void init_anchor(uint32_t qd) {
g_anchor_ring = new io_uring;
if (io_uring_queue_init(qd, g_anchor_ring, 0) == 0) {
g_anchor_fd = g_anchor_ring->ring_fd;
} else {
delete g_anchor_ring;
g_anchor_ring = nullptr;
g_anchor_fd = -1;
}
}

static int get_anchor_fd(uint32_t qd) {
std::call_once(g_anchor_once, init_anchor, qd);
return g_anchor_fd;
}

AsyncIOContext::AsyncIOContext(const AsyncIOConfig& config)
: queue_depth_(config.queue_depth), pending_(0) {
// Allocate io_uring ring structure
ring_ = new io_uring;

// Setup initialization parameters for SQPOLL if requested
struct io_uring_params params = {};
if (config.use_sqpoll) {
// IORING_SETUP_SQPOLL: kernel thread polls submission queue
// This reduces syscalls but requires CAP_SYS_NICE capability
params.flags |= IORING_SETUP_SQPOLL;
params.sq_thread_idle = 2000; // 2 second idle timeout
params.sq_thread_idle = 2000;
}

// Attach to shared kernel worker pool (mirrors Rust IORING_SETUP_ATTACH_WQ).
// Skip when SQPOLL is requested (incompatible combination).
int anchor = config.use_sqpoll ? -1 : get_anchor_fd(config.queue_depth);
if (anchor >= 0) {
params.flags |= IORING_SETUP_ATTACH_WQ;
params.wq_fd = static_cast<uint32_t>(anchor);
}

// Initialize the io_uring ring with params
int ret = io_uring_queue_init_params(config.queue_depth,
static_cast<io_uring*>(ring_),
&params);
if (ret < 0) {
// Retry without ATTACH_WQ on kernels that don't support it.
params.flags &= ~static_cast<unsigned>(IORING_SETUP_ATTACH_WQ);
ret = io_uring_queue_init_params(config.queue_depth,
static_cast<io_uring*>(ring_),
&params);
}
if (ret < 0) {
delete static_cast<io_uring*>(ring_);
throw std::runtime_error("Failed to initialize io_uring: " + std::string(strerror(-ret)));
Expand All @@ -36,11 +131,23 @@ AsyncIOContext::AsyncIOContext(const AsyncIOConfig& config)

AsyncIOContext::AsyncIOContext(uint32_t queue_depth)
: queue_depth_(queue_depth), pending_(0) {
// Allocate io_uring ring structure
ring_ = new io_uring;

// Initialize the io_uring ring with default params (no SQPOLL)
int ret = io_uring_queue_init(queue_depth, static_cast<io_uring*>(ring_), 0);
// Attach to shared kernel worker pool (mirrors Rust IORING_SETUP_ATTACH_WQ).
int anchor = get_anchor_fd(queue_depth);
int ret = -1;
if (anchor >= 0) {
struct io_uring_params params = {};
params.flags = IORING_SETUP_ATTACH_WQ;
params.wq_fd = static_cast<uint32_t>(anchor);
ret = io_uring_queue_init_params(queue_depth,
static_cast<io_uring*>(ring_),
&params);
}
if (ret < 0) {
// Fallback: plain ring without shared worker pool.
ret = io_uring_queue_init(queue_depth, static_cast<io_uring*>(ring_), 0);
}
if (ret < 0) {
delete static_cast<io_uring*>(ring_);
throw std::runtime_error("Failed to initialize io_uring: " + std::string(strerror(-ret)));
Expand Down Expand Up @@ -275,6 +382,42 @@ bool AsyncIOContext::has_registered_buffers() const {
return !registered_buffers_.empty();
}

int AsyncIOContext::submit_and_wait(int min_complete) {
if (queued_ == 0 && pending_ == 0) return 0;

auto ring = static_cast<io_uring*>(ring_);
int to_submit = queued_;
int to_wait = (min_complete < pending_ + queued_) ? min_complete : (pending_ + queued_);
if (to_wait < 1) to_wait = 1;

// Single syscall: submit all prepared SQEs + wait for to_wait CQEs.
int ret = io_uring_submit_and_wait(ring, static_cast<unsigned>(to_wait));
if (ret < 0) {
throw std::runtime_error("io_uring submit_and_wait failed: " + std::string(strerror(-ret)));
}
pending_ += to_submit;
queued_ = 0;

// Drain all available CQEs (may be more than min_complete).
struct io_uring_cqe* cqe = nullptr;
unsigned head;
int completed = 0;
io_uring_for_each_cqe(ring, head, cqe) {
if (cqe->res < 0) {
throw std::runtime_error("I/O error in submit_and_wait: " + std::string(strerror(-cqe->res)));
}
completed++;
}
io_uring_cq_advance(ring, completed);
pending_ -= completed;
return completed;
}

// static
uint32_t AsyncIOContext::calibrate_queue_depth(const char* path) {
return sysfs_calibrate_qd(path);
}

} // namespace tpch

#else
Expand Down
44 changes: 43 additions & 1 deletion src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ struct Options {
long lance_stream_queue = 16;
std::string lance_stats_level;
double lance_cardinality_sample_rate = 1.0; // Phase 3.1: Sampling-based cardinality
bool parquet_io_uring = false;
bool orc_io_uring = false;
};

constexpr int OPT_LANCE_ROWS_PER_FILE = 1000;
Expand All @@ -69,6 +71,8 @@ constexpr int OPT_LANCE_STREAM_QUEUE = 1004;
constexpr int OPT_LANCE_STATS_LEVEL = 1005;
constexpr int OPT_LANCE_CARDINALITY_SAMPLE_RATE = 1006; // Phase 3.1
constexpr int OPT_LANCE_IO_URING = 1007;
constexpr int OPT_PARQUET_IO_URING = 1008;
constexpr int OPT_ORC_IO_URING = 1009;

void print_usage(const char* prog) {
std::cout << "Usage: " << prog << " [options]\n"
Expand Down Expand Up @@ -110,6 +114,10 @@ void print_usage(const char* prog) {
<< " Controls HyperLogLog sampling: 1.0=100% (default),\n"
<< " 0.5=50%, 0.1=10%. Smaller rates = faster writes.\n"
<< " --lance-io-uring Use io_uring for Lance disk writes (Linux only)\n"
#endif
#ifdef TPCH_ENABLE_ASYNC_IO
<< " --parquet-io-uring Use io_uring for Parquet streaming writes (requires --streaming, Linux only)\n"
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage text for --parquet-io-uring says it "requires --streaming", but there is no --streaming CLI option in this program. This makes the help output misleading; update the message to reference the actual prerequisite(s) (e.g., the Parquet streaming mode/zero-copy mode that triggers enable_streaming_write).

Suggested change
<< " --parquet-io-uring Use io_uring for Parquet streaming writes (requires --streaming, Linux only)\n"
<< " --parquet-io-uring Use io_uring for Parquet streaming writes (Linux only)\n"

Copilot uses AI. Check for mistakes.
<< " --orc-io-uring Use io_uring for ORC disk writes (Linux only)\n"
#endif
<< " --verbose Verbose output\n"
<< " --help Show this help message\n";
Expand Down Expand Up @@ -139,7 +147,9 @@ Options parse_args(int argc, char* argv[]) {
{"lance-io-uring", no_argument, nullptr, OPT_LANCE_IO_URING},
#endif
#ifdef TPCH_ENABLE_ASYNC_IO
{"async-io", no_argument, nullptr, 'a'},
{"async-io", no_argument, nullptr, 'a'},
{"parquet-io-uring", no_argument, nullptr, OPT_PARQUET_IO_URING},
{"orc-io-uring", no_argument, nullptr, OPT_ORC_IO_URING},
#endif
{"verbose", no_argument, nullptr, 'v'},
{"help", no_argument, nullptr, 'h'},
Expand Down Expand Up @@ -210,6 +220,12 @@ Options parse_args(int argc, char* argv[]) {
case 'a':
opts.async_io = true;
break;
case OPT_PARQUET_IO_URING:
opts.parquet_io_uring = true;
break;
case OPT_ORC_IO_URING:
opts.orc_io_uring = true;
break;
#endif
case 'v':
opts.verbose = true;
Expand Down Expand Up @@ -1447,6 +1463,32 @@ int main(int argc, char* argv[]) {
}
#endif

// Wire up Parquet and ORC io_uring flags
#ifdef TPCH_ENABLE_ASYNC_IO
if (opts.parquet_io_uring) {
if (auto* pw = dynamic_cast<tpch::ParquetWriter*>(writer.get())) {
if (!opts.zero_copy) {
std::cerr << "Warning: --parquet-io-uring has no effect without --zero-copy (streaming mode)\n";
} else {
pw->enable_io_uring(true);
Comment on lines +1469 to +1473

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Ensure --parquet-io-uring activates the streaming writer path

The CLI flag only toggles ParquetWriter::enable_io_uring(true), but Parquet's io_uring stream is used only inside init_file_writer() during streaming mode. Since this call site never enables streaming for the normal --zero-copy flow, --parquet-io-uring is effectively a no-op in the documented usage path and users do not get the requested I/O backend.

Useful? React with 👍 / 👎.

if (opts.verbose) {
std::cout << "Parquet io_uring write path enabled\n";
}
Comment on lines +1470 to +1476
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--parquet-io-uring currently only calls ParquetWriter::enable_io_uring(true) when --zero-copy is set, but ParquetWriter only switches to the io_uring OutputStream in init_file_writer() (streaming mode). In the current CLI flow, --zero-copy generation paths do not enable Parquet streaming mode (enable_streaming_write), so this flag can be a no-op. Either ensure streaming mode is enabled when --parquet-io-uring is requested, or change the gating/warning so it matches the actual conditions under which the io_uring stream is used.

Suggested change
if (!opts.zero_copy) {
std::cerr << "Warning: --parquet-io-uring has no effect without --zero-copy (streaming mode)\n";
} else {
pw->enable_io_uring(true);
if (opts.verbose) {
std::cout << "Parquet io_uring write path enabled\n";
}
// Ensure Parquet streaming mode is enabled so the io_uring OutputStream is used
pw->enable_streaming_write(true);
pw->enable_io_uring(true);
if (opts.verbose) {
std::cout << "Parquet io_uring write path enabled (streaming mode)\n";

Copilot uses AI. Check for mistakes.
}
}
}
#ifdef TPCH_ENABLE_ORC
if (opts.orc_io_uring) {
if (auto* ow = dynamic_cast<tpch::ORCWriter*>(writer.get())) {
ow->enable_io_uring(true);
if (opts.verbose) {
std::cout << "ORC io_uring write path enabled\n";
}
}
}
#endif
#endif

// Set async context if available
if (async_context) {
writer->set_async_context(async_context);
Expand Down
Loading