diff --git a/include/tpch/async_io.hpp b/include/tpch/async_io.hpp index ba88daf..07a3bc0 100644 --- a/include/tpch/async_io.hpp +++ b/include/tpch/async_io.hpp @@ -116,6 +116,26 @@ class AsyncIOContext { */ int queued_count() const; + /** + * Submit all queued SQEs and wait for at least min_complete CQEs in a single syscall. + * More efficient than separate submit_queued() + wait_completions() for the common + * "submit one chunk and wait for oldest" pattern. + * + * @param min_complete Minimum number of CQEs to wait for (default 1) + * @return Number of CQEs drained + */ + int submit_and_wait(int min_complete = 1); + + /** + * Probe the block device backing `path` via sysfs to determine an optimal + * io_uring queue depth. Returns nr_requests/2 clamped to [8, 128]. + * Falls back to 64 when sysfs is unavailable (e.g. tmpfs, WSL2 /tmp). + * + * @param path Any path on the target filesystem (file need not exist yet) + * @return Calibrated queue depth + */ + static uint32_t calibrate_queue_depth(const char* path); + /** * Set callback for completion events. * @@ -211,6 +231,8 @@ class AsyncIOContext { void queue_write(int fd, const void* buf, std::size_t count, off_t offset, std::uint64_t user_data); int submit_queued() { return 0; } int queued_count() const { return queued_; } + int submit_and_wait(int min_complete = 1) { (void)min_complete; return 0; } + static uint32_t calibrate_queue_depth(const char* path) { (void)path; return 64; } void set_completion_callback(CompletionCallback cb) { (void)cb; } int process_completions() { return 0; } void register_buffers(const std::vector& buffers) { (void)buffers; } diff --git a/include/tpch/orc_writer.hpp b/include/tpch/orc_writer.hpp index a0c341e..1a98a51 100644 --- a/include/tpch/orc_writer.hpp +++ b/include/tpch/orc_writer.hpp @@ -40,16 +40,27 @@ class ORCWriter : public WriterInterface { */ void close() override; + /** + * Enable io_uring-backed output stream for ORC disk writes (Linux only). + * Replaces the default writeLocalFile() stream with a double-buffered io_uring writer. + * Must be called before the first write_batch(). + * + * @param enable true to enable io_uring write path + */ + void enable_io_uring(bool enable); + private: std::string filepath_; std::shared_ptr first_batch_; bool schema_locked_ = false; + bool use_io_uring_ = false; // Opaque ORC implementations (void* to avoid exposing ORC headers) // In the implementation file, these are cast to their actual types void* orc_writer_; // orc::Writer* - void* orc_output_stream_; // unique_ptr + void* orc_output_stream_; // unique_ptr (synchronous path) void* orc_type_; // unique_ptr + void* orc_io_uring_stream_; // OrcIoUringStream* (io_uring path) }; } // namespace tpch diff --git a/include/tpch/parquet_writer.hpp b/include/tpch/parquet_writer.hpp index cda5a1e..a6369ac 100644 --- a/include/tpch/parquet_writer.hpp +++ b/include/tpch/parquet_writer.hpp @@ -90,6 +90,15 @@ class ParquetWriter : public WriterInterface { */ void enable_streaming_write(bool use_threads = true); + /** + * Enable io_uring-backed output stream for streaming writes (Linux only). + * Only effective when streaming mode is also enabled via enable_streaming_write(). + * Replaces the default FileOutputStream with an io_uring double-buffered writer. + * + * @param enable true to enable io_uring write path + */ + void enable_io_uring(bool enable); + private: std::string filepath_; std::shared_ptr first_batch_; @@ -107,6 +116,7 @@ class ParquetWriter : public WriterInterface { // Streaming write mode (Phase 14.2) bool streaming_mode_ = false; bool use_threads_ = true; + bool use_io_uring_ = false; std::unique_ptr parquet_file_writer_; // Initialize the Parquet FileWriter for streaming mode diff --git a/src/async/io_uring_context.cpp b/src/async/io_uring_context.cpp index 93c9559..0743c81 100644 --- a/src/async/io_uring_context.cpp +++ b/src/async/io_uring_context.cpp @@ -6,28 +6,123 @@ #include #include #include +#include +#include +#include +#include #include +#include +#include namespace tpch { +// ── Sysfs queue-depth calibration ──────────────────────────────────────────── +// +// Mirror of the Rust sysfs_calibrate_qd() in io_uring_store.rs: +// 1. stat(path_or_ancestor) → st_dev → major:minor +// 2. realpath(/sys/dev/block/MAJOR:MINOR) → canonical sysfs dir +// 3. Walk up looking for queue/nr_requests +// 4. Return nr_requests/2 clamped to [8, 128]; fallback 64 + +static uint32_t sysfs_calibrate_qd(const char* path) { + struct stat st; + + // Walk up to an existing ancestor (target file may not exist yet). + std::string p = path ? path : "/"; + while (!p.empty() && stat(p.c_str(), &st) != 0) { + size_t pos = p.rfind('/'); + if (pos == 0) { p = "/"; break; } + if (pos == std::string::npos) break; + p.resize(pos); + } + if (stat(p.c_str(), &st) != 0) return 64; + + // Build /sys/dev/block/MAJOR:MINOR symlink path. + char link[256]; + snprintf(link, sizeof(link), "/sys/dev/block/%u:%u", + major(st.st_dev), minor(st.st_dev)); + + char canon[PATH_MAX]; + if (realpath(link, canon) == nullptr) return 64; + + // Walk up from canonical sysfs path to find queue/nr_requests. + std::string cur = canon; + while (cur.size() > 1) { + std::string nr_req_path = cur + "/queue/nr_requests"; + FILE* f = fopen(nr_req_path.c_str(), "r"); + if (f) { + unsigned int n = 0; + fscanf(f, "%u", &n); + fclose(f); + if (n > 0) { + uint32_t qd = static_cast(n) / 2; + if (qd < 8) qd = 8; + if (qd > 128) qd = 128; + return qd; + } + } + size_t slash = cur.rfind('/'); + if (slash == std::string::npos || slash == 0) break; + cur.resize(slash); + } + return 64; // fallback: /tmp or other pseudo-fs +} + +// ── Process-global anchor ring (IORING_SETUP_ATTACH_WQ) ───────────────────── +// +// All rings after the first one attach to this anchor's kernel async-worker +// thread pool, reducing scheduler pressure on WSL2/Hyper-V IOThread. +// The anchor io_uring is intentionally leaked (never freed) so its fd stays +// open for the process lifetime. + +static io_uring* g_anchor_ring = nullptr; +static int g_anchor_fd = -1; +static std::once_flag g_anchor_once; + +static void init_anchor(uint32_t qd) { + g_anchor_ring = new io_uring; + if (io_uring_queue_init(qd, g_anchor_ring, 0) == 0) { + g_anchor_fd = g_anchor_ring->ring_fd; + } else { + delete g_anchor_ring; + g_anchor_ring = nullptr; + g_anchor_fd = -1; + } +} + +static int get_anchor_fd(uint32_t qd) { + std::call_once(g_anchor_once, init_anchor, qd); + return g_anchor_fd; +} + AsyncIOContext::AsyncIOContext(const AsyncIOConfig& config) : queue_depth_(config.queue_depth), pending_(0) { - // Allocate io_uring ring structure ring_ = new io_uring; - // Setup initialization parameters for SQPOLL if requested struct io_uring_params params = {}; if (config.use_sqpoll) { - // IORING_SETUP_SQPOLL: kernel thread polls submission queue - // This reduces syscalls but requires CAP_SYS_NICE capability params.flags |= IORING_SETUP_SQPOLL; - params.sq_thread_idle = 2000; // 2 second idle timeout + params.sq_thread_idle = 2000; + } + + // Attach to shared kernel worker pool (mirrors Rust IORING_SETUP_ATTACH_WQ). + // Skip when SQPOLL is requested (incompatible combination). + int anchor = config.use_sqpoll ? -1 : get_anchor_fd(config.queue_depth); + if (anchor >= 0) { + params.flags |= IORING_SETUP_ATTACH_WQ; + params.wq_fd = static_cast(anchor); } - // Initialize the io_uring ring with params int ret = io_uring_queue_init_params(config.queue_depth, static_cast(ring_), ¶ms); + if (ret < 0) { + // Retry without ATTACH_WQ on kernels that don't support it. + params.flags &= ~static_cast(IORING_SETUP_ATTACH_WQ); + ret = io_uring_queue_init_params(config.queue_depth, + static_cast(ring_), + ¶ms); + } if (ret < 0) { delete static_cast(ring_); throw std::runtime_error("Failed to initialize io_uring: " + std::string(strerror(-ret))); @@ -36,11 +131,23 @@ AsyncIOContext::AsyncIOContext(const AsyncIOConfig& config) AsyncIOContext::AsyncIOContext(uint32_t queue_depth) : queue_depth_(queue_depth), pending_(0) { - // Allocate io_uring ring structure ring_ = new io_uring; - // Initialize the io_uring ring with default params (no SQPOLL) - int ret = io_uring_queue_init(queue_depth, static_cast(ring_), 0); + // Attach to shared kernel worker pool (mirrors Rust IORING_SETUP_ATTACH_WQ). + int anchor = get_anchor_fd(queue_depth); + int ret = -1; + if (anchor >= 0) { + struct io_uring_params params = {}; + params.flags = IORING_SETUP_ATTACH_WQ; + params.wq_fd = static_cast(anchor); + ret = io_uring_queue_init_params(queue_depth, + static_cast(ring_), + ¶ms); + } + if (ret < 0) { + // Fallback: plain ring without shared worker pool. + ret = io_uring_queue_init(queue_depth, static_cast(ring_), 0); + } if (ret < 0) { delete static_cast(ring_); throw std::runtime_error("Failed to initialize io_uring: " + std::string(strerror(-ret))); @@ -275,6 +382,42 @@ bool AsyncIOContext::has_registered_buffers() const { return !registered_buffers_.empty(); } +int AsyncIOContext::submit_and_wait(int min_complete) { + if (queued_ == 0 && pending_ == 0) return 0; + + auto ring = static_cast(ring_); + int to_submit = queued_; + int to_wait = (min_complete < pending_ + queued_) ? min_complete : (pending_ + queued_); + if (to_wait < 1) to_wait = 1; + + // Single syscall: submit all prepared SQEs + wait for to_wait CQEs. + int ret = io_uring_submit_and_wait(ring, static_cast(to_wait)); + if (ret < 0) { + throw std::runtime_error("io_uring submit_and_wait failed: " + std::string(strerror(-ret))); + } + pending_ += to_submit; + queued_ = 0; + + // Drain all available CQEs (may be more than min_complete). + struct io_uring_cqe* cqe = nullptr; + unsigned head; + int completed = 0; + io_uring_for_each_cqe(ring, head, cqe) { + if (cqe->res < 0) { + throw std::runtime_error("I/O error in submit_and_wait: " + std::string(strerror(-cqe->res))); + } + completed++; + } + io_uring_cq_advance(ring, completed); + pending_ -= completed; + return completed; +} + +// static +uint32_t AsyncIOContext::calibrate_queue_depth(const char* path) { + return sysfs_calibrate_qd(path); +} + } // namespace tpch #else diff --git a/src/main.cpp b/src/main.cpp index ea2edc3..618427a 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -59,6 +59,8 @@ struct Options { long lance_stream_queue = 16; std::string lance_stats_level; double lance_cardinality_sample_rate = 1.0; // Phase 3.1: Sampling-based cardinality + bool parquet_io_uring = false; + bool orc_io_uring = false; }; constexpr int OPT_LANCE_ROWS_PER_FILE = 1000; @@ -69,6 +71,8 @@ constexpr int OPT_LANCE_STREAM_QUEUE = 1004; constexpr int OPT_LANCE_STATS_LEVEL = 1005; constexpr int OPT_LANCE_CARDINALITY_SAMPLE_RATE = 1006; // Phase 3.1 constexpr int OPT_LANCE_IO_URING = 1007; +constexpr int OPT_PARQUET_IO_URING = 1008; +constexpr int OPT_ORC_IO_URING = 1009; void print_usage(const char* prog) { std::cout << "Usage: " << prog << " [options]\n" @@ -110,6 +114,10 @@ void print_usage(const char* prog) { << " Controls HyperLogLog sampling: 1.0=100% (default),\n" << " 0.5=50%, 0.1=10%. Smaller rates = faster writes.\n" << " --lance-io-uring Use io_uring for Lance disk writes (Linux only)\n" +#endif +#ifdef TPCH_ENABLE_ASYNC_IO + << " --parquet-io-uring Use io_uring for Parquet streaming writes (requires --streaming, Linux only)\n" + << " --orc-io-uring Use io_uring for ORC disk writes (Linux only)\n" #endif << " --verbose Verbose output\n" << " --help Show this help message\n"; @@ -139,7 +147,9 @@ Options parse_args(int argc, char* argv[]) { {"lance-io-uring", no_argument, nullptr, OPT_LANCE_IO_URING}, #endif #ifdef TPCH_ENABLE_ASYNC_IO - {"async-io", no_argument, nullptr, 'a'}, + {"async-io", no_argument, nullptr, 'a'}, + {"parquet-io-uring", no_argument, nullptr, OPT_PARQUET_IO_URING}, + {"orc-io-uring", no_argument, nullptr, OPT_ORC_IO_URING}, #endif {"verbose", no_argument, nullptr, 'v'}, {"help", no_argument, nullptr, 'h'}, @@ -210,6 +220,12 @@ Options parse_args(int argc, char* argv[]) { case 'a': opts.async_io = true; break; + case OPT_PARQUET_IO_URING: + opts.parquet_io_uring = true; + break; + case OPT_ORC_IO_URING: + opts.orc_io_uring = true; + break; #endif case 'v': opts.verbose = true; @@ -1447,6 +1463,32 @@ int main(int argc, char* argv[]) { } #endif + // Wire up Parquet and ORC io_uring flags +#ifdef TPCH_ENABLE_ASYNC_IO + if (opts.parquet_io_uring) { + if (auto* pw = dynamic_cast(writer.get())) { + if (!opts.zero_copy) { + std::cerr << "Warning: --parquet-io-uring has no effect without --zero-copy (streaming mode)\n"; + } else { + pw->enable_io_uring(true); + if (opts.verbose) { + std::cout << "Parquet io_uring write path enabled\n"; + } + } + } + } +#ifdef TPCH_ENABLE_ORC + if (opts.orc_io_uring) { + if (auto* ow = dynamic_cast(writer.get())) { + ow->enable_io_uring(true); + if (opts.verbose) { + std::cout << "ORC io_uring write path enabled\n"; + } + } + } +#endif +#endif + // Set async context if available if (async_context) { writer->set_async_context(async_context); diff --git a/src/writers/orc_writer.cpp b/src/writers/orc_writer.cpp index 9701490..0ec82b6 100644 --- a/src/writers/orc_writer.cpp +++ b/src/writers/orc_writer.cpp @@ -2,6 +2,10 @@ #include #include #include +#include +#include +#include +#include // Minimal Arrow includes to avoid protobuf symbol pollution // These specific headers should NOT pull in protobuf infrastructure @@ -12,6 +16,132 @@ #include #include "tpch/orc_writer.hpp" +#include "tpch/async_io.hpp" + +#ifdef TPCH_ENABLE_ASYNC_IO + +namespace tpch { + +/** + * ORC OutputStream backed by io_uring for async disk writes. + * + * Matches the Rust io_uring_store design: + * - 512 KB chunks (same as Rust CHUNK_SIZE) + * - 8-buffer circular ring: up to 7 SQEs in-flight while filling the 8th + * - submit_and_wait(1) in the drain path: single syscall instead of two + * - sysfs-calibrated queue depth (nr_requests/2, clamped [8,128]) + * - IORING_SETUP_ATTACH_WQ shared worker pool (via AsyncIOContext ctor) + */ +class OrcIoUringStream : public orc::OutputStream { + // 512 KB per SQE — matches Rust CHUNK_SIZE + static constexpr size_t kChunkSize = 512 * 1024; + // Circular pool of staging buffers. + // At steady state kNumBufs-1 = 7 SQEs are in-flight while we fill the 8th. + static constexpr int kNumBufs = 8; + + std::string filepath_; + int fd_ = -1; + uint64_t file_offset_ = 0; + + std::array, kNumBufs> pool_; + int write_idx_ = 0; // slot currently being filled + int read_idx_ = 0; // oldest submitted slot (FIFO order) + + std::unique_ptr context_; + +public: + explicit OrcIoUringStream(const std::string& filepath) + : filepath_(filepath) { + fd_ = ::open(filepath.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644); + if (fd_ < 0) { + throw std::runtime_error("OrcIoUringStream: open failed: " + std::string(strerror(errno))); + } + // Calibrate queue depth from the target block device via sysfs. + uint32_t qd = AsyncIOContext::calibrate_queue_depth(filepath.c_str()); + context_ = std::make_unique(qd); + for (auto& b : pool_) b.reserve(kChunkSize); + } + + ~OrcIoUringStream() override { + try { close(); } catch (...) {} + } + + void write(const void* buf, size_t length) override { + if (fd_ < 0) { + return; // Silently ignore writes after close (double-close guard) + } + const uint8_t* ptr = static_cast(buf); + while (length > 0) { + auto& cur = pool_[write_idx_]; + size_t space = kChunkSize - cur.size(); + size_t to_copy = std::min(length, space); + cur.insert(cur.end(), ptr, ptr + to_copy); + ptr += to_copy; + length -= to_copy; + if (cur.size() >= kChunkSize) { + submit_head(); + } + } + } + + void close() override { + if (fd_ < 0) return; + + // Submit any partial buffer. + if (!pool_[write_idx_].empty()) { + submit_head(); + } + // Drain all remaining in-flight SQEs. + if (context_->queued_count() > 0) { + context_->submit_queued(); + } + while (context_->pending_count() > 0) { + context_->wait_completions(1); + pool_[read_idx_].clear(); + read_idx_ = (read_idx_ + 1) % kNumBufs; + } + ::close(fd_); + fd_ = -1; + } + + uint64_t getLength() const override { + return file_offset_ + static_cast(pool_[write_idx_].size()); + } + + uint64_t getNaturalWriteSize() const override { + return kChunkSize; + } + + const std::string& getName() const override { + return filepath_; + } + +private: + // Submit pool_[write_idx_] to io_uring, then advance write_idx_. + // When the pool is full, drain the oldest slot with submit_and_wait(1). + void submit_head() { + auto& cur = pool_[write_idx_]; + + context_->queue_write(fd_, cur.data(), cur.size(), + static_cast(file_offset_), 0); + file_offset_ += cur.size(); + + if (context_->pending_count() + 1 >= kNumBufs) { + // Pool full: submit new SQE + wait for oldest in 1 syscall. + context_->submit_and_wait(1); + pool_[read_idx_].clear(); + read_idx_ = (read_idx_ + 1) % kNumBufs; + } else { + context_->submit_queued(); + } + + write_idx_ = (write_idx_ + 1) % kNumBufs; + } +}; + +} // namespace tpch + +#endif // TPCH_ENABLE_ASYNC_IO namespace tpch { @@ -169,7 +299,8 @@ void copy_array_to_orc_column( } // anonymous namespace ORCWriter::ORCWriter(const std::string& filepath) - : filepath_(filepath), orc_writer_(nullptr), orc_output_stream_(nullptr), orc_type_(nullptr) { + : filepath_(filepath), orc_writer_(nullptr), orc_output_stream_(nullptr), + orc_type_(nullptr), orc_io_uring_stream_(nullptr) { // Constructor doesn't create writer yet - we wait for first batch to get schema } @@ -198,6 +329,14 @@ ORCWriter::~ORCWriter() { delete reinterpret_cast*>(orc_type_); orc_type_ = nullptr; } + + // Delete io_uring stream (if used) +#ifdef TPCH_ENABLE_ASYNC_IO + if (orc_io_uring_stream_) { + delete reinterpret_cast(orc_io_uring_stream_); + orc_io_uring_stream_ = nullptr; + } +#endif } void ORCWriter::write_batch(const std::shared_ptr& batch) { @@ -218,20 +357,36 @@ void ORCWriter::write_batch(const std::shared_ptr& batch) { auto orc_type_local = orc::Type::buildTypeFromString(orc_schema_str); orc_type_ = new std::unique_ptr(std::move(orc_type_local)); - // Create output file stream using ORC factory function - must be stored as member to stay alive - auto out_stream_local = orc::writeLocalFile(filepath_); - orc_output_stream_ = new std::unique_ptr(std::move(out_stream_local)); - // Create writer options orc::WriterOptions writer_options; writer_options.setStripeSize(64 * 1024 * 1024); // 64MB stripes writer_options.setRowIndexStride(10000); + // Always attempt dictionary encoding for strings (threshold = 1.0 = always). + // TPC-H string columns have 2-40 unique values, so dictionary is always beneficial. + writer_options.setDictionaryKeySizeThreshold(1.0); + + auto* orc_type_ptr = reinterpret_cast*>(orc_type_); + +#ifdef TPCH_ENABLE_ASYNC_IO + if (use_io_uring_) { + // io_uring path: OrcIoUringStream owns the fd and the ring + auto* uring_stream = new OrcIoUringStream(filepath_); + orc_io_uring_stream_ = uring_stream; + auto writer = orc::createWriter(**orc_type_ptr, uring_stream, writer_options); + orc_writer_ = writer.release(); + } else { +#endif + // Create output file stream using ORC factory function - must be stored as member to stay alive + auto out_stream_local = orc::writeLocalFile(filepath_); + orc_output_stream_ = new std::unique_ptr(std::move(out_stream_local)); // Create ORC writer using factory function auto* out_stream_ptr = reinterpret_cast*>(orc_output_stream_); - auto* orc_type_ptr = reinterpret_cast*>(orc_type_); auto writer = orc::createWriter(**orc_type_ptr, out_stream_ptr->get(), writer_options); orc_writer_ = writer.release(); +#ifdef TPCH_ENABLE_ASYNC_IO + } +#endif } catch (const std::exception& e) { schema_locked_ = false; @@ -292,6 +447,10 @@ void ORCWriter::write_batch(const std::shared_ptr& batch) { } } +void ORCWriter::enable_io_uring(bool enable) { + use_io_uring_ = enable; +} + void ORCWriter::close() { if (orc_writer_) { try { diff --git a/src/writers/parquet_writer.cpp b/src/writers/parquet_writer.cpp index 0e9b5ab..6b4a03e 100644 --- a/src/writers/parquet_writer.cpp +++ b/src/writers/parquet_writer.cpp @@ -14,6 +14,135 @@ #include #include +#ifdef TPCH_ENABLE_ASYNC_IO + +namespace tpch { + +/** + * Arrow OutputStream backed by io_uring for async disk writes. + * + * Matches the Rust io_uring_store design: + * - 512 KB chunks (same as Rust CHUNK_SIZE) + * - 8-buffer circular ring: up to 7 SQEs in-flight while filling the 8th + * - submit_and_wait(1) in the drain path: single syscall instead of two + * - sysfs-calibrated queue depth (nr_requests/2, clamped [8,128]) + * - IORING_SETUP_ATTACH_WQ shared worker pool (via AsyncIOContext ctor) + */ +class IoUringOutputStream : public arrow::io::OutputStream { + // 512 KB per SQE — matches Rust CHUNK_SIZE + static constexpr size_t kChunkSize = 512 * 1024; + // Circular pool of staging buffers. + // At steady state kNumBufs-1 = 7 SQEs are in-flight while we fill the 8th. + static constexpr int kNumBufs = 8; + + std::string filepath_; + int fd_ = -1; + int64_t file_offset_ = 0; + bool closed_ = false; + + std::array, kNumBufs> pool_; + int write_idx_ = 0; // slot currently being filled + int read_idx_ = 0; // oldest submitted slot (FIFO order) + + std::unique_ptr context_; + +public: + explicit IoUringOutputStream(const std::string& filepath) + : filepath_(filepath) { + fd_ = ::open(filepath.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644); + if (fd_ < 0) { + throw std::runtime_error("IoUringOutputStream: open failed: " + std::string(strerror(errno))); + } + // Calibrate queue depth from the target block device via sysfs. + uint32_t qd = AsyncIOContext::calibrate_queue_depth(filepath.c_str()); + context_ = std::make_unique(qd); + for (auto& b : pool_) b.reserve(kChunkSize); + } + + ~IoUringOutputStream() override { + if (!closed_) { + try { Close(); } catch (...) {} + } + } + + arrow::Status Write(const void* data, int64_t nbytes) override { + const uint8_t* ptr = static_cast(data); + while (nbytes > 0) { + auto& cur = pool_[write_idx_]; + size_t space = kChunkSize - cur.size(); + size_t to_copy = std::min(static_cast(nbytes), space); + cur.insert(cur.end(), ptr, ptr + to_copy); + ptr += to_copy; + nbytes -= static_cast(to_copy); + if (cur.size() >= kChunkSize) { + ARROW_RETURN_NOT_OK(submit_head()); + } + } + return arrow::Status::OK(); + } + + arrow::Status Flush() override { + // Submit any partial buffer. + if (!pool_[write_idx_].empty()) { + ARROW_RETURN_NOT_OK(submit_head()); + } + // Drain all remaining in-flight SQEs. + if (context_->queued_count() > 0) { + context_->submit_queued(); + } + while (context_->pending_count() > 0) { + context_->wait_completions(1); + pool_[read_idx_].clear(); + read_idx_ = (read_idx_ + 1) % kNumBufs; + } + return arrow::Status::OK(); + } + + arrow::Status Close() override { + if (closed_) return arrow::Status::OK(); + ARROW_RETURN_NOT_OK(Flush()); + ::close(fd_); + fd_ = -1; + closed_ = true; + return arrow::Status::OK(); + } + + bool closed() const override { return closed_; } + + arrow::Result Tell() const override { + return file_offset_ + static_cast(pool_[write_idx_].size()); + } + +private: + // Submit pool_[write_idx_] to io_uring, then advance write_idx_. + // + // Invariant: (write_idx_ - read_idx_ + kNumBufs) % kNumBufs == pending_count() + // When the pool is full (pending == kNumBufs-1), drain the oldest slot first + // using submit_and_wait(1) — one syscall for both submit and wait. + arrow::Status submit_head() { + auto& cur = pool_[write_idx_]; + + context_->queue_write(fd_, cur.data(), cur.size(), file_offset_, 0); + file_offset_ += static_cast(cur.size()); + + if (context_->pending_count() + 1 >= kNumBufs) { + // Pool full: submit new SQE + wait for oldest completion in 1 syscall. + context_->submit_and_wait(1); + pool_[read_idx_].clear(); + read_idx_ = (read_idx_ + 1) % kNumBufs; + } else { + context_->submit_queued(); + } + + write_idx_ = (write_idx_ + 1) % kNumBufs; + return arrow::Status::OK(); + } +}; + +} // namespace tpch + +#endif // TPCH_ENABLE_ASYNC_IO + namespace tpch { ParquetWriter::ParquetWriter( @@ -115,6 +244,10 @@ void ParquetWriter::enable_streaming_write(bool use_threads) { use_threads_ = use_threads; } +void ParquetWriter::enable_io_uring(bool enable) { + use_io_uring_ = enable; +} + void ParquetWriter::write_managed_batch(const ManagedRecordBatch& managed_batch) { if (closed_) { throw std::runtime_error("Cannot write to a closed Parquet writer"); @@ -176,11 +309,20 @@ void ParquetWriter::init_file_writer() { ->build(); // Create output stream - auto outfile_result = arrow::io::FileOutputStream::Open(filepath_); - if (!outfile_result.ok()) { - throw std::runtime_error("Failed to open file: " + outfile_result.status().message()); + std::shared_ptr outfile; +#ifdef TPCH_ENABLE_ASYNC_IO + if (use_io_uring_) { + outfile = std::make_shared(filepath_); + } else { +#endif + auto outfile_result = arrow::io::FileOutputStream::Open(filepath_); + if (!outfile_result.ok()) { + throw std::runtime_error("Failed to open file: " + outfile_result.status().message()); + } + outfile = outfile_result.ValueOrDie(); +#ifdef TPCH_ENABLE_ASYNC_IO } - auto outfile = outfile_result.ValueOrDie(); +#endif // Create FileWriter for streaming RecordBatches auto writer_result = parquet::arrow::FileWriter::Open(