From 55c4fac6297e064f2289d308535135af2d74bc55 Mon Sep 17 00:00:00 2001 From: mattsu Date: Mon, 9 Mar 2026 08:30:58 +0900 Subject: [PATCH 1/4] refactor(pr): stream simple non-file inputs Add a streaming code path for simple layouts so pipes and devices can be processed without buffering the full input in memory. Keep the import-only cleanup in the same commit to avoid review noise. --- src/uu/pr/src/pr.rs | 243 +++++++++++++++++++++++++++++++++++++++ tests/by-util/test_pr.rs | 20 ++++ 2 files changed, 263 insertions(+) diff --git a/src/uu/pr/src/pr.rs b/src/uu/pr/src/pr.rs index 0eb9b5f4773..68410b6d6f5 100644 --- a/src/uu/pr/src/pr.rs +++ b/src/uu/pr/src/pr.rs @@ -857,6 +857,245 @@ fn read_to_end(path: &str) -> Result, std::io::Error> { } } +fn should_use_streaming_pr(path: &str, options: &OutputOptions) -> bool { + let is_simple_layout = options.number.is_none() + && options.expand_tabs.is_none() + && options.column_mode_options.is_none() + && options.merge_files_print.is_none() + && !options.join_lines + && options.line_width.is_none() + && options.offset_spaces.is_empty(); + + if !is_simple_layout { + return false; + } + + path == FILE_STDIN || metadata(path).is_ok_and(|meta| !meta.file_type().is_file()) +} + +fn page_is_in_range(page: usize, options: &OutputOptions) -> bool { + options.start_page <= page && options.end_page.is_none_or(|end| page <= end) +} + +fn write_stream_page_header( + out: &mut impl Write, + options: &OutputOptions, + page: usize, +) -> Result<(), std::io::Error> { + let line_separator = options.line_separator.as_bytes(); + for line in header_content(options, page) { + out.write_all(line.as_bytes())?; + out.write_all(line_separator)?; + } + Ok(()) +} + +fn write_stream_page_trailer( + out: &mut impl Write, + options: &OutputOptions, + lines_in_page: usize, +) -> Result<(), std::io::Error> { + let content_line_separator = options.content_line_separator.as_bytes(); + if !options.form_feed_used { + // `print_page`/`write_columns` emits at most `lines_needed_per_page - 1` + // blank-content separators for non-form-feed mode. + let lines_needed_per_page = lines_to_read_for_page(options).saturating_sub(1); + for _ in lines_in_page..lines_needed_per_page { + out.write_all(content_line_separator)?; + } + } + + let line_separator = options.line_separator.as_bytes(); + for line in trailer_content(options) { + out.write_all(line.as_bytes())?; + out.write_all(line_separator)?; + } + out.write_all(options.page_separator_char.as_bytes())?; + Ok(()) +} + +fn finish_stream_page( + out: &mut impl Write, + options: &OutputOptions, + page: &mut usize, + page_active: &mut bool, + page_header_written: &mut bool, + lines_in_page: &mut usize, +) -> Result { + if *page_active && page_is_in_range(*page, options) { + if !*page_header_written { + write_stream_page_header(out, options, *page)?; + *page_header_written = true; + } + write_stream_page_trailer(out, options, *lines_in_page)?; + } + + *page += 1; + *page_active = false; + *page_header_written = false; + *lines_in_page = 0; + + Ok(options.end_page.is_some_and(|end| *page > end)) +} + +fn pr_stream_simple_with_reader( + mut reader: R, + options: &OutputOptions, +) -> Result { + let content_line_separator = options.content_line_separator.as_bytes(); + let lines_needed_per_page = lines_to_read_for_page(options); + + let out = stdout(); + let mut out = out.lock(); + + let mut page = 1; + let mut page_active = false; + let mut page_header_written = false; + let mut lines_in_page = 0usize; + let mut saw_non_delimiter_since_last = false; + let mut last_delimiter: Option = None; + + let mut buf = [0_u8; 8192]; + 'read_loop: loop { + let n = reader.read(&mut buf)?; + if n == 0 { + break; + } + + for &byte in &buf[..n] { + if byte == NL { + let ignore_empty_line = + !saw_non_delimiter_since_last && matches!(last_delimiter, Some(FF)); + + if !ignore_empty_line { + if !page_active { + page_active = true; + page_header_written = false; + } + + if page_is_in_range(page, options) { + if !page_header_written { + write_stream_page_header(&mut out, options, page)?; + page_header_written = true; + } + out.write_all(content_line_separator)?; + } + lines_in_page += 1; + } + + saw_non_delimiter_since_last = false; + last_delimiter = Some(NL); + + if page_active && lines_in_page >= lines_needed_per_page { + if finish_stream_page( + &mut out, + options, + &mut page, + &mut page_active, + &mut page_header_written, + &mut lines_in_page, + )? { + break 'read_loop; + } + } + continue; + } + + if byte == FF { + let ignore_empty_line = + !saw_non_delimiter_since_last && matches!(last_delimiter, Some(NL)); + + if !page_active { + page_active = true; + page_header_written = false; + } + + if page_is_in_range(page, options) && !page_header_written { + write_stream_page_header(&mut out, options, page)?; + page_header_written = true; + } + + if !ignore_empty_line { + if page_is_in_range(page, options) { + out.write_all(content_line_separator)?; + } + lines_in_page += 1; + } + + saw_non_delimiter_since_last = false; + last_delimiter = Some(FF); + + if finish_stream_page( + &mut out, + options, + &mut page, + &mut page_active, + &mut page_header_written, + &mut lines_in_page, + )? { + break 'read_loop; + } + continue; + } + + if !page_active { + page_active = true; + page_header_written = false; + } + + if page_is_in_range(page, options) { + if !page_header_written { + write_stream_page_header(&mut out, options, page)?; + page_header_written = true; + } + out.write_all(&[byte])?; + } + + saw_non_delimiter_since_last = true; + last_delimiter = None; + } + } + + if saw_non_delimiter_since_last { + if !page_active { + page_active = true; + page_header_written = false; + } + + if page_is_in_range(page, options) { + if !page_header_written { + write_stream_page_header(&mut out, options, page)?; + page_header_written = true; + } + out.write_all(content_line_separator)?; + } + lines_in_page += 1; + } + + if page_active { + let _ = finish_stream_page( + &mut out, + options, + &mut page, + &mut page_active, + &mut page_header_written, + &mut lines_in_page, + )?; + } + + Ok(0) +} + +fn pr_stream_simple(path: &str, options: &OutputOptions) -> Result { + if path == FILE_STDIN { + let stdin = stdin(); + pr_stream_simple_with_reader(stdin.lock(), options) + } else { + let file = std::fs::File::open(path)?; + pr_stream_simple_with_reader(file, options) + } +} + fn apply_expand_tab(chunk: &mut String, byte: u8, expand_options: &ExpandTabsOptions) { if byte == expand_options.input_char as u8 { // If the byte encountered is the input char we use width to calculate @@ -879,6 +1118,10 @@ fn apply_expand_tab(chunk: &mut String, byte: u8, expand_options: &ExpandTabsOpt } fn pr(path: &str, options: &OutputOptions) -> Result { + if should_use_streaming_pr(path, options) { + return pr_stream_simple(path, options); + } + // Read the entire contents of the file into a buffer. // // TODO Read incrementally. diff --git a/tests/by-util/test_pr.rs b/tests/by-util/test_pr.rs index 02735a2ab33..dd292bf7ddc 100644 --- a/tests/by-util/test_pr.rs +++ b/tests/by-util/test_pr.rs @@ -566,6 +566,26 @@ fn test_pr_char_device_dev_null() { new_ucmd!().arg("/dev/null").succeeds(); } +#[cfg(unix)] +#[test] +fn test_streaming_stdin_from_infinite_source() { + use std::fs::File; + use std::process::Stdio; + use std::time::Duration; + + let mut cmd = new_ucmd!(); + cmd.timeout(Duration::from_secs(5)); + + let mut child = cmd + .set_stdin(Stdio::from(File::open("/dev/zero").unwrap())) + .set_stdout(Stdio::piped()) + .run_no_wait(); + + // `pr` should start writing promptly and terminate quietly on a closed pipe. + child.close_stdout(); + child.wait().unwrap().fails_silently(); +} + #[test] fn test_b_flag_backwards_compat() { // -b is a no-op for backwards compatibility (column-down is now the default) From ecb48ae6b3cd114828502366c3dfa979efb49726 Mon Sep 17 00:00:00 2001 From: mattsu Date: Mon, 9 Mar 2026 08:31:05 +0900 Subject: [PATCH 2/4] fix(pr): match streaming output to buffered mode Tighten streaming mode selection for direct non-file inputs and align page trailer formatting with the existing buffered implementation. Update the regression test to exercise the supported char-device path. --- src/uu/pr/src/pr.rs | 15 +++++++++------ tests/by-util/test_pr.rs | 5 ++--- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/uu/pr/src/pr.rs b/src/uu/pr/src/pr.rs index 68410b6d6f5..57fdd82b119 100644 --- a/src/uu/pr/src/pr.rs +++ b/src/uu/pr/src/pr.rs @@ -870,7 +870,7 @@ fn should_use_streaming_pr(path: &str, options: &OutputOptions) -> bool { return false; } - path == FILE_STDIN || metadata(path).is_ok_and(|meta| !meta.file_type().is_file()) + path != FILE_STDIN && metadata(path).is_ok_and(|meta| !meta.file_type().is_file()) } fn page_is_in_range(page: usize, options: &OutputOptions) -> bool { @@ -897,18 +897,21 @@ fn write_stream_page_trailer( ) -> Result<(), std::io::Error> { let content_line_separator = options.content_line_separator.as_bytes(); if !options.form_feed_used { - // `print_page`/`write_columns` emits at most `lines_needed_per_page - 1` - // blank-content separators for non-form-feed mode. - let lines_needed_per_page = lines_to_read_for_page(options).saturating_sub(1); + // `print_page`/`write_columns` emits blank-content separators until + // the page reaches `lines_needed_per_page`. + let lines_needed_per_page = lines_to_read_for_page(options); for _ in lines_in_page..lines_needed_per_page { out.write_all(content_line_separator)?; } } let line_separator = options.line_separator.as_bytes(); - for line in trailer_content(options) { + let trailer = trailer_content(options); + for (index, line) in trailer.iter().enumerate() { out.write_all(line.as_bytes())?; - out.write_all(line_separator)?; + if index + 1 != trailer.len() { + out.write_all(line_separator)?; + } } out.write_all(options.page_separator_char.as_bytes())?; Ok(()) diff --git a/tests/by-util/test_pr.rs b/tests/by-util/test_pr.rs index dd292bf7ddc..a6e27741530 100644 --- a/tests/by-util/test_pr.rs +++ b/tests/by-util/test_pr.rs @@ -568,8 +568,7 @@ fn test_pr_char_device_dev_null() { #[cfg(unix)] #[test] -fn test_streaming_stdin_from_infinite_source() { - use std::fs::File; +fn test_streaming_char_device_from_infinite_source() { use std::process::Stdio; use std::time::Duration; @@ -577,7 +576,7 @@ fn test_streaming_stdin_from_infinite_source() { cmd.timeout(Duration::from_secs(5)); let mut child = cmd - .set_stdin(Stdio::from(File::open("/dev/zero").unwrap())) + .arg("/dev/zero") .set_stdout(Stdio::piped()) .run_no_wait(); From 0b330c074598038c15c306fc5283e9a4fd302b79 Mon Sep 17 00:00:00 2001 From: mattsu Date: Mon, 9 Mar 2026 08:31:12 +0900 Subject: [PATCH 3/4] fix(pr): validate UTF-8 while streaming Reject invalid UTF-8 incrementally in the streaming path, including truncated multibyte tails at EOF, and cover the failure mode with a FIFO-based regression test. Fold the spell-checker update into the same commit because it is introduced by the new test. --- src/uu/pr/src/pr.rs | 44 +++++++++++++++++++++++++++++++++------- tests/by-util/test_pr.rs | 26 +++++++++++++++++++++++- 2 files changed, 62 insertions(+), 8 deletions(-) diff --git a/src/uu/pr/src/pr.rs b/src/uu/pr/src/pr.rs index 57fdd82b119..3535fc96c2c 100644 --- a/src/uu/pr/src/pr.rs +++ b/src/uu/pr/src/pr.rs @@ -941,6 +941,34 @@ fn finish_stream_page( Ok(options.end_page.is_some_and(|end| *page > end)) } +fn validate_utf8_stream_chunk(utf8_tail: &mut Vec, chunk: &[u8]) -> Result<(), PrError> { + let mut combined; + let data = if utf8_tail.is_empty() { + chunk + } else { + combined = Vec::with_capacity(utf8_tail.len() + chunk.len()); + combined.extend_from_slice(utf8_tail); + combined.extend_from_slice(chunk); + combined.as_slice() + }; + + match std::str::from_utf8(data) { + Ok(_) => { + utf8_tail.clear(); + Ok(()) + } + Err(err) => { + if err.error_len().is_some() { + return Err(err.into()); + } + + utf8_tail.clear(); + utf8_tail.extend_from_slice(&data[err.valid_up_to()..]); + Ok(()) + } + } +} + fn pr_stream_simple_with_reader( mut reader: R, options: &OutputOptions, @@ -957,6 +985,7 @@ fn pr_stream_simple_with_reader( let mut lines_in_page = 0usize; let mut saw_non_delimiter_since_last = false; let mut last_delimiter: Option = None; + let mut utf8_tail = Vec::new(); let mut buf = [0_u8; 8192]; 'read_loop: loop { @@ -964,6 +993,7 @@ fn pr_stream_simple_with_reader( if n == 0 { break; } + validate_utf8_stream_chunk(&mut utf8_tail, &buf[..n])?; for &byte in &buf[..n] { if byte == NL { @@ -1059,6 +1089,11 @@ fn pr_stream_simple_with_reader( } } + if !utf8_tail.is_empty() { + let err = std::str::from_utf8(&utf8_tail).unwrap_err(); + return Err(err.into()); + } + if saw_non_delimiter_since_last { if !page_active { page_active = true; @@ -1090,13 +1125,8 @@ fn pr_stream_simple_with_reader( } fn pr_stream_simple(path: &str, options: &OutputOptions) -> Result { - if path == FILE_STDIN { - let stdin = stdin(); - pr_stream_simple_with_reader(stdin.lock(), options) - } else { - let file = std::fs::File::open(path)?; - pr_stream_simple_with_reader(file, options) - } + let file = std::fs::File::open(path)?; + pr_stream_simple_with_reader(file, options) } fn apply_expand_tab(chunk: &mut String, byte: u8, expand_options: &ExpandTabsOptions) { diff --git a/tests/by-util/test_pr.rs b/tests/by-util/test_pr.rs index a6e27741530..43d2c9ae24a 100644 --- a/tests/by-util/test_pr.rs +++ b/tests/by-util/test_pr.rs @@ -2,7 +2,7 @@ // // For the full copyright and license information, please view the LICENSE // file that was distributed with this source code. -// spell-checker:ignore (ToDO) Sdivide ading +// spell-checker:ignore (ToDO) Sdivide ading IRWXU use jiff::{Timestamp, ToSpan}; use regex::Regex; @@ -585,6 +585,30 @@ fn test_streaming_char_device_from_infinite_source() { child.wait().unwrap().fails_silently(); } +#[cfg(unix)] +#[test] +fn test_streaming_fifo_with_invalid_utf8_fails() { + use std::time::Duration; + + let (at, mut ucmd) = at_and_ucmd!(); + let fifo_path = at.plus_as_string("fifo"); + + nix::unistd::mkfifo(fifo_path.as_str(), nix::sys::stat::Mode::S_IRWXU).unwrap(); + + let writer_path = at.plus("fifo"); + let writer = std::thread::spawn(move || { + std::fs::write(writer_path, [0xFF_u8, b'\n']).unwrap(); + }); + + ucmd.timeout(Duration::from_secs(5)); + ucmd.arg("fifo") + .fails_with_code(1) + .stdout_is("") + .stderr_contains("invalid utf-8 sequence"); + + writer.join().unwrap(); +} + #[test] fn test_b_flag_backwards_compat() { // -b is a no-op for backwards compatibility (column-down is now the default) From 8a28cc6fe38445f2b38af003e264dc4c5cbd37dc Mon Sep 17 00:00:00 2001 From: mattsu Date: Mon, 9 Mar 2026 08:31:20 +0900 Subject: [PATCH 4/4] fix(pr): fall back when streaming pages hold no content Avoid the streaming path for non-file inputs when the requested layout leaves zero printable content lines, and add FIFO-vs-file regressions for those edge cases. Keep the unix-only test import cleanup with the helper it belongs to. --- src/uu/pr/src/pr.rs | 4 +++- tests/by-util/test_pr.rs | 44 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/src/uu/pr/src/pr.rs b/src/uu/pr/src/pr.rs index 3535fc96c2c..a413b2c3233 100644 --- a/src/uu/pr/src/pr.rs +++ b/src/uu/pr/src/pr.rs @@ -870,7 +870,9 @@ fn should_use_streaming_pr(path: &str, options: &OutputOptions) -> bool { return false; } - path != FILE_STDIN && metadata(path).is_ok_and(|meta| !meta.file_type().is_file()) + path != FILE_STDIN + && lines_to_read_for_page(options) > 0 + && metadata(path).is_ok_and(|meta| !meta.file_type().is_file()) } fn page_is_in_range(page: usize, options: &OutputOptions) -> bool { diff --git a/tests/by-util/test_pr.rs b/tests/by-util/test_pr.rs index 43d2c9ae24a..7670ea86639 100644 --- a/tests/by-util/test_pr.rs +++ b/tests/by-util/test_pr.rs @@ -7,6 +7,8 @@ use jiff::{Timestamp, ToSpan}; use regex::Regex; use std::fs::metadata; +#[cfg(unix)] +use uutests::at_and_ts; use uutests::util::UCommand; use uutests::{at_and_ucmd, new_ucmd}; @@ -45,6 +47,36 @@ fn valid_last_modified_template_vars(from: Timestamp) -> Vec