From abc49319252363ad66da27e5e4ab773c5d031df9 Mon Sep 17 00:00:00 2001 From: 0x676e67 Date: Tue, 23 Dec 2025 13:02:47 +0800 Subject: [PATCH 1/3] feat: Fiber interruptibility support --- Cargo.toml | 6 ++-- lib/wreq.rb | 27 ++++++++++++++++++ src/gvl.rs | 62 +++++++++------------------------------- src/lib.rs | 1 + src/rt.rs | 55 +++++++++++++++++++++++++++++++++--- test/interrupt_test.rb | 64 ++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 158 insertions(+), 57 deletions(-) create mode 100644 test/interrupt_test.rb diff --git a/Cargo.toml b/Cargo.toml index f4526ea..3f452ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,8 @@ name = "wreq_ruby" magnus = { version = "0.8", features = ["bytes"] } rb-sys = { version = "0.9.110", default-features = false } tokio = { version = "1", features = ["full"] } -wreq = { version = "6.0.0-rc.23", features = [ +tokio-util = { version = "0.7.17", default-features = false } +wreq = { version = "6.0.0-rc.24", features = [ "json", "socks", "stream", @@ -50,6 +51,3 @@ incremental = false lto = "fat" opt-level = 3 strip = true - -[patch.crates-io] -wreq = { git = "https://github.com/0x676e67/wreq" } diff --git a/lib/wreq.rb b/lib/wreq.rb index 25be38f..bf631ab 100644 --- a/lib/wreq.rb +++ b/lib/wreq.rb @@ -18,6 +18,33 @@ unless defined?(Wreq) module Wreq + # CancellationToken allows Ruby code to cooperatively cancel long-running Rust async tasks. + # + # This class is a binding to a Rust-side cancellation token, enabling Ruby to signal cancellation + # to HTTP requests, streaming, or other operations running in Rust. When `cancel` is called, all + # Rust tasks observing this token will be notified and can abort promptly. + # + # @example + # token = Wreq::CancellationToken.new + # # Pass token to a Wreq request or stream + # token.cancel # Signal cancellation from Ruby + # + # The actual implementation and state are managed by the Rust extension. + class CancellationToken + # Create a new cancellation token. + # + # @return [Wreq::CancellationToken] + def self.new + end + + # Signal cancellation to all Rust tasks observing this token. + # + # @return [void] + def cancel + end + end + + # This is a placeholder. The actual value is set by the Rust implementation. VERSION = nil # Send an HTTP request. diff --git a/src/gvl.rs b/src/gvl.rs index 1573618..5ef7ff1 100644 --- a/src/gvl.rs +++ b/src/gvl.rs @@ -4,7 +4,8 @@ use std::{ffi::c_void, mem::MaybeUninit, ptr::null_mut}; use rb_sys::rb_thread_call_without_gvl; -use tokio::sync::watch; + +use crate::rt::CancellationToken; /// Container for safely passing closure and result through C callback. struct Args { @@ -12,47 +13,8 @@ struct Args { result: MaybeUninit, } -/// Cancellation flag for thread interruption support. -#[derive(Clone)] -pub struct CancelFlag { - rx: watch::Receiver, -} - -struct CancelSender { - tx: watch::Sender, -} - -impl CancelSender { - fn new() -> (Self, CancelFlag) { - let (tx, rx) = watch::channel(false); - (Self { tx }, CancelFlag { rx }) - } - - fn cancel(&self) { - let _ = self.tx.send(true); - } -} - -impl CancelFlag { - /// Wait until cancellation is signaled (zero-latency, no polling). - pub async fn cancelled(&self) { - let mut rx = self.rx.clone(); - if *rx.borrow_and_update() { - return; - } - loop { - if rx.changed().await.is_err() { - return; - } - if *rx.borrow() { - return; - } - } - } -} - struct UnblockData { - sender: CancelSender, + token: CancellationToken, } unsafe extern "C" fn call_without_gvl(arg: *mut c_void) -> *mut c_void @@ -73,7 +35,7 @@ where unsafe extern "C" fn unblock_func(arg: *mut c_void) { if !arg.is_null() { let data = unsafe { &*(arg as *const UnblockData) }; - data.sender.cancel(); + data.token.cancel(); } } @@ -110,31 +72,33 @@ where /// This results in all Ruby threads being suspended indefinitely. pub fn nogvl_cancellable(func: F) -> R where - F: FnOnce(CancelFlag) -> R, + F: FnOnce(CancellationToken) -> R, R: Sized, { - let (sender, flag) = CancelSender::new(); - let unblock_data = UnblockData { sender }; + let token = CancellationToken::new(); + let unblock_data = UnblockData { + token: token.clone(), + }; struct Wrapper { func: Option, - flag: CancelFlag, + token: CancellationToken, result: MaybeUninit, } let mut wrapper = Wrapper { func: Some(func), - flag, + token, result: MaybeUninit::uninit(), }; unsafe extern "C" fn call_with_flag(arg: *mut c_void) -> *mut c_void where - F: FnOnce(CancelFlag) -> R, + F: FnOnce(CancellationToken) -> R, { let wrapper = unsafe { &mut *(arg as *mut Wrapper) }; if let Some(func) = wrapper.func.take() { - wrapper.result.write(func(wrapper.flag.clone())); + wrapper.result.write(func(wrapper.token.clone())); } null_mut() } diff --git a/src/lib.rs b/src/lib.rs index 2a8cc2b..4da409c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -93,5 +93,6 @@ fn init(ruby: &Ruby) -> Result<(), Error> { client::include(ruby, &gem_module)?; emulation::include(ruby, &gem_module)?; error::include(ruby); + rt::include(ruby, &gem_module)?; Ok(()) } diff --git a/src/rt.rs b/src/rt.rs index 1bd71f5..05becf2 100644 --- a/src/rt.rs +++ b/src/rt.rs @@ -1,5 +1,8 @@ use std::sync::LazyLock; +use magnus::{ + DataTypeFunctions, Error, Module, Object, RModule, Ruby, TypedData, function, method, +}; use tokio::runtime::{Builder, Runtime}; use crate::{error::interrupt_error, gvl}; @@ -17,11 +20,11 @@ pub fn try_block_on(future: F) -> F::Output where F: Future>, { - gvl::nogvl_cancellable(|flag| { + gvl::nogvl_cancellable(|token| { RUNTIME.block_on(async move { tokio::select! { biased; - _ = flag.cancelled() => Err(interrupt_error()), + _ = token.cancelled() => Err(interrupt_error()), result = future => result, } }) @@ -35,13 +38,57 @@ pub fn maybe_block_on(future: F) -> F::Output where F: Future>, { - gvl::nogvl_cancellable(|flag| { + gvl::nogvl_cancellable(|token| { RUNTIME.block_on(async move { tokio::select! { biased; - _ = flag.cancelled() => None, + _ = token.cancelled() => None, result = future => result, } }) }) } + +/// A cancellation token for cooperative cancellation of Rust async tasks from Ruby. +/// +/// This type wraps [`tokio_util::sync::CancellationToken`] and is exposed to Ruby as +/// `Wreq::CancellationToken`. It allows Ruby code to signal cancellation to long-running or +/// blocking Rust tasks, enabling graceful interruption. +/// +/// Typical usage: +/// - Ruby creates a `Wreq::CancellationToken` and passes it to a Rust-backed async operation. +/// - Rust code checks or awaits the token to detect cancellation and aborts work if cancelled. +/// - Calling `cancel` from Ruby triggers cancellation for all tasks observing this token or its +/// clones. +/// +/// This is especially useful for interrupting HTTP requests, streaming, or other operations that +/// may need to be stopped from Ruby. +#[derive(Clone, DataTypeFunctions, TypedData)] +#[magnus(class = "Wreq::CancellationToken", free_immediately, size)] +pub struct CancellationToken(tokio_util::sync::CancellationToken); + +impl CancellationToken { + /// Create a new [`CancellationToken`]. + #[inline] + pub fn new() -> Self { + Self(tokio_util::sync::CancellationToken::new()) + } + + /// Signal cancellation to all tasks observing this token. + #[inline] + pub fn cancel(&self) { + self.0.cancel() + } + + #[inline] + async fn cancelled(&self) { + self.0.cancelled().await + } +} + +pub fn include(ruby: &Ruby, gem_module: &RModule) -> Result<(), Error> { + let headers_class = gem_module.define_class("CancellationToken", ruby.class_object())?; + headers_class.define_singleton_method("new", function!(CancellationToken::new, 0))?; + headers_class.define_method("cancel", method!(CancellationToken::cancel, 0))?; + Ok(()) +} diff --git a/test/interrupt_test.rb b/test/interrupt_test.rb new file mode 100644 index 0000000..5c7eac6 --- /dev/null +++ b/test/interrupt_test.rb @@ -0,0 +1,64 @@ +require "test_helper" + +class ThreadInterruptTest < Minitest::Test + HANGING_URL = "http://10.255.255.1:12345/" # Non-routable, hangs forever + SLOW_BODY_URL = "https://httpbin.io/drip?duration=5&numbytes=5" # 5s slow body + + def test_connect_phase_interrupt + thread = Thread.new do + begin + Wreq.get(HANGING_URL) + rescue => e + e + end + end + sleep 2 + thread.kill + result = thread.join(5) + assert result, "Thread should be killed and joined" + end + + def test_connect_with_timeout_interrupt + thread = Thread.new do + begin + Wreq.get(HANGING_URL, timeout: 60) + rescue => e + e + end + end + sleep 2 + thread.kill + result = thread.join(5) + assert result, "Thread should be killed and joined" + end + + def test_body_reading_interrupt + thread = Thread.new do + resp = Wreq.get(SLOW_BODY_URL) + begin + resp.text + rescue => e + e + end + end + sleep 2 + thread.kill + result = thread.join(5) + assert result, "Thread should be killed and joined" + end + + def test_body_streaming_interrupt + thread = Thread.new do + resp = Wreq.get(SLOW_BODY_URL) + begin + resp.chunks { |chunk| chunk } + rescue => e + e + end + end + sleep 2 + thread.kill + result = thread.join(5) + assert result, "Thread should be killed and joined" + end +end From 2fb3a72b68640e689e8048edb1849165398de900 Mon Sep 17 00:00:00 2001 From: 0x676e67 Date: Sat, 27 Dec 2025 17:04:00 +0800 Subject: [PATCH 2/3] init --- examples/stream.rb | 30 ++++++++++++--- lib/wreq_ruby/response.rb | 7 +++- src/client.rs | 54 +++++++++++++++++--------- src/client/body.rs | 2 +- src/client/body/stream.rs | 31 ++++++++++----- src/client/req.rs | 9 +++-- src/client/resp.rs | 61 ++++++++++++++++++----------- src/lib.rs | 3 +- src/macros.rs | 9 ----- src/rt.rs | 80 ++++++++++++++++++++++++++++++++------- 10 files changed, 202 insertions(+), 84 deletions(-) diff --git a/examples/stream.rb b/examples/stream.rb index c10b9bf..6a48d57 100755 --- a/examples/stream.rb +++ b/examples/stream.rb @@ -2,13 +2,33 @@ require_relative "../lib/wreq" + # Make a request client = Wreq::Client.new response = client.get("https://httpbin.io/stream/20") -# Get the streaming body receiver -puts "\n=== Streaming Response Body ===" -response.chunks do |chunk| - puts chunk - sleep 0.1 # Simulate processing time +# Create a cancellation token +token = Wreq::CancellationToken.new + +count = 0 + +# Start a thread to process streaming chunks +stream_thread = Thread.new do + begin + response.chunks(token) do |chunk| + count += 1 + puts chunk + sleep 0.1 # Simulate processing time + end + rescue => e + puts "[Streaming interrupted: #{e.class}: #{e.message}]" + end end + +# Main thread: cancel after 1 second +sleep 1 +token.cancel +puts "\n[Cancelled by main thread!]" + +stream_thread.join +puts "\nChunks received before cancellation: #{count} (should be less than 20)" \ No newline at end of file diff --git a/lib/wreq_ruby/response.rb b/lib/wreq_ruby/response.rb index 9c8d4f9..6eeecaa 100644 --- a/lib/wreq_ruby/response.rb +++ b/lib/wreq_ruby/response.rb @@ -126,6 +126,8 @@ def json # by yielding each chunk of the body as it arrives, without loading # the entire response into memory. # + # @param token [Wreq::CancellationToken, nil] Optional cancellation token for cooperative cancellation. + # If provided, streaming will be interrupted if the token is cancelled from Ruby. # @return An iterator over response body chunks (binary String) # @yield [chunk] Each chunk of the response body as a binary String # @example Save response to file @@ -136,9 +138,12 @@ def json # total = 0 # response.chunks { |chunk| total += chunk.bytesize } # puts "Downloaded #{total} bytes" + # @example With cancellation + # token = Wreq::CancellationToken.new + # response.chunks(token) { |chunk| ... } # # Note: The returned Receiver is only for reading response bodies, not for uploads. - def chunks + def chunks(token = nil) end # Close the response and free associated resources. diff --git a/src/client.rs b/src/client.rs index b128874..9b4fec5 100644 --- a/src/client.rs +++ b/src/client.rs @@ -23,8 +23,26 @@ use crate::{ extractor::Extractor, gvl, http::Method, + rt::CancellationToken, }; +macro_rules! request { + ($args:expr, $required:ty) => {{ + let args = magnus::scan_args::scan_args::< + $required, + (Option>,), + (), + (), + magnus::RHash, + (), + >($args)?; + let token = args.optional.0.as_deref().cloned(); + let required = args.required; + let request = crate::client::req::Request::new(&ruby!(), args.keywords)?; + (token, required, request) + }}; +} + /// A builder for `Client`. #[derive(Default, Deserialize)] struct Builder { @@ -312,64 +330,64 @@ impl Client { /// Send a HTTP request. #[inline] pub fn request(rb_self: &Self, args: &[Value]) -> Result { - let ((method, url), request) = extract_request!(args, (Obj, String)); - execute_request(rb_self.0.clone(), *method, url, request) + let (token, (method, url), request) = request!(args, (Obj, String)); + execute_request(token, rb_self.0.clone(), *method, url, request) } /// Send a GET request. #[inline] pub fn get(rb_self: &Self, args: &[Value]) -> Result { - let ((url,), request) = extract_request!(args, (String,)); - execute_request(rb_self.0.clone(), Method::GET, url, request) + let (token, (url,), request) = request!(args, (String,)); + execute_request(token, rb_self.0.clone(), Method::GET, url, request) } /// Send a POST request. #[inline] pub fn post(rb_self: &Self, args: &[Value]) -> Result { - let ((url,), request) = extract_request!(args, (String,)); - execute_request(rb_self.0.clone(), Method::POST, url, request) + let (token, (url,), request) = request!(args, (String,)); + execute_request(token, rb_self.0.clone(), Method::POST, url, request) } /// Send a PUT request. #[inline] pub fn put(rb_self: &Self, args: &[Value]) -> Result { - let ((url,), request) = extract_request!(args, (String,)); - execute_request(rb_self.0.clone(), Method::PUT, url, request) + let (token, (url,), request) = request!(args, (String,)); + execute_request(token, rb_self.0.clone(), Method::PUT, url, request) } /// Send a DELETE request. #[inline] pub fn delete(rb_self: &Self, args: &[Value]) -> Result { - let ((url,), request) = extract_request!(args, (String,)); - execute_request(rb_self.0.clone(), Method::DELETE, url, request) + let (token, (url,), request) = request!(args, (String,)); + execute_request(token, rb_self.0.clone(), Method::DELETE, url, request) } /// Send a HEAD request. #[inline] pub fn head(rb_self: &Self, args: &[Value]) -> Result { - let ((url,), request) = extract_request!(args, (String,)); - execute_request(rb_self.0.clone(), Method::HEAD, url, request) + let (token, (url,), request) = request!(args, (String,)); + execute_request(token, rb_self.0.clone(), Method::HEAD, url, request) } /// Send an OPTIONS request. #[inline] pub fn options(rb_self: &Self, args: &[Value]) -> Result { - let ((url,), request) = extract_request!(args, (String,)); - execute_request(rb_self.0.clone(), Method::OPTIONS, url, request) + let (token, (url,), request) = request!(args, (String,)); + execute_request(token, rb_self.0.clone(), Method::OPTIONS, url, request) } /// Send a TRACE request. #[inline] pub fn trace(rb_self: &Self, args: &[Value]) -> Result { - let ((url,), request) = extract_request!(args, (String,)); - execute_request(rb_self.0.clone(), Method::TRACE, url, request) + let (token, (url,), request) = request!(args, (String,)); + execute_request(token, rb_self.0.clone(), Method::TRACE, url, request) } /// Send a PATCH request. #[inline] pub fn patch(rb_self: &Self, args: &[Value]) -> Result { - let ((url,), request) = extract_request!(args, (String,)); - execute_request(rb_self.0.clone(), Method::PATCH, url, request) + let (token, (url,), request) = request!(args, (String,)); + execute_request(token, rb_self.0.clone(), Method::PATCH, url, request) } } diff --git a/src/client/body.rs b/src/client/body.rs index e8bbb4a..0fe4378 100644 --- a/src/client/body.rs +++ b/src/client/body.rs @@ -51,7 +51,7 @@ impl From for wreq::Body { pub fn include(ruby: &Ruby, gem_module: &RModule) -> Result<(), Error> { let sender_class = gem_module.define_class("BodySender", ruby.class_object())?; sender_class.define_singleton_method("new", function!(BodySender::new, -1))?; - sender_class.define_method("push", method!(BodySender::push, 1))?; + sender_class.define_method("push", method!(BodySender::push, -1))?; sender_class.define_method("close", magnus::method!(BodySender::close, 0))?; Ok(()) } diff --git a/src/client/body/stream.rs b/src/client/body/stream.rs index dbf13b4..dc25191 100644 --- a/src/client/body/stream.rs +++ b/src/client/body/stream.rs @@ -14,11 +14,14 @@ use tokio::sync::{ use crate::{ error::{memory_error, mpsc_send_error_to_magnus}, - rt, + rt::{self, CancellationToken}, }; /// A receiver for streaming HTTP response bodies. -pub struct BodyReceiver(Mutex> + Send>>>); +pub struct BodyReceiver { + token: Option, + inner: Mutex> + Send>>>, +} /// A sender for streaming HTTP request bodies. #[magnus::wrap(class = "Wreq::BodySender", free_immediately, size)] @@ -34,8 +37,14 @@ struct InnerBodySender { impl BodyReceiver { /// Create a new [`BodyReceiver`] instance. #[inline] - pub fn new(stream: impl Stream> + Send + 'static) -> BodyReceiver { - BodyReceiver(Mutex::new(Box::pin(stream))) + pub fn new( + token: Option, + stream: impl Stream> + Send + 'static, + ) -> BodyReceiver { + BodyReceiver { + token, + inner: Mutex::new(Box::pin(stream)), + } } } @@ -43,8 +52,8 @@ impl Iterator for BodyReceiver { type Item = Bytes; fn next(&mut self) -> Option { - rt::maybe_block_on(async { - self.0 + rt::maybe_block_on(self.token.as_ref(), async { + self.inner .lock() .await .as_mut() @@ -74,11 +83,15 @@ impl BodySender { } /// Ruby: `push(data)` where data is String or bytes - pub fn push(rb_self: &Self, data: RString) -> Result<(), Error> { - let bytes = data.to_bytes(); + pub fn push(rb_self: &Self, args: &[Value]) -> Result<(), Error> { + let (data, token) = cannel_token!(args, (RString,)); + let bytes = data.0.to_bytes(); let inner = rb_self.0.read().unwrap(); if let Some(ref tx) = inner.tx { - rt::try_block_on(tx.send(bytes).map_err(mpsc_send_error_to_magnus))?; + rt::try_block_on( + token.as_ref(), + tx.send(bytes).map_err(mpsc_send_error_to_magnus), + )?; } Ok(()) } diff --git a/src/client/req.rs b/src/client/req.rs index 6de2abd..a1e9abd 100644 --- a/src/client/req.rs +++ b/src/client/req.rs @@ -1,7 +1,7 @@ use std::{net::IpAddr, time::Duration}; use http::{HeaderValue, header}; -use magnus::{RHash, TryConvert, typed_data::Obj, value::ReprValue}; +use magnus::{RHash, Ruby, TryConvert, typed_data::Obj, value::ReprValue}; use serde::Deserialize; use wreq::{ Client, Proxy, Version, @@ -15,7 +15,7 @@ use crate::{ error::wreq_error_to_magnus, extractor::Extractor, http::Method, - rt, + rt::{self, CancellationToken}, }; /// The parameters for a request. @@ -104,7 +104,7 @@ pub struct Request { impl Request { /// Create a new [`Request`] from Ruby keyword arguments. - pub fn new(ruby: &magnus::Ruby, hash: RHash) -> Result { + pub fn new(ruby: &Ruby, hash: RHash) -> Result { let kwargs = hash.as_value(); let mut builder: Self = serde_magnus::deserialize(ruby, kwargs)?; @@ -139,12 +139,13 @@ impl Request { } pub fn execute_request>( + token: Option, client: Client, method: Method, url: U, mut request: Request, ) -> Result { - rt::try_block_on(async move { + rt::try_block_on(token.as_ref(), async move { let mut builder = client.request(method.into_ffi(), url.as_ref()); // Emulation options. diff --git a/src/client/resp.rs b/src/client/resp.rs index 3e5b1e7..38f5376 100644 --- a/src/client/resp.rs +++ b/src/client/resp.rs @@ -15,7 +15,7 @@ use crate::{ gvl, header::Headers, http::{StatusCode, Version}, - rt, + rt::{self, CancellationToken}, }; /// A response from a request. @@ -64,7 +64,11 @@ impl Response { } /// Internal method to get the wreq::Response, optionally streaming the body. - fn response(&self, stream: bool) -> Result { + fn response( + &self, + token: Option<&CancellationToken>, + stream: bool, + ) -> Result { let build_response = |body: wreq::Body| -> wreq::Response { let mut response = HttpResponse::new(body); *response.version_mut() = self.version.into_ffi(); @@ -81,6 +85,7 @@ impl Response { Ok(build_response(body)) } else { let bytes = rt::try_block_on( + token, BodyExt::collect(body) .map_ok(|buf| buf.to_bytes()) .map_err(wreq_error_to_magnus), @@ -168,31 +173,42 @@ impl Response { } /// Get the response body as bytes. - pub fn bytes(&self) -> Result { - let response = self.response(false)?; - rt::try_block_on(response.bytes().map_err(wreq_error_to_magnus)) + pub fn bytes(&self, args: &[Value]) -> Result { + let token = cannel_token!(args); + let response = self.response(token.as_ref(), false)?; + rt::try_block_on( + token.as_ref(), + response.bytes().map_err(wreq_error_to_magnus), + ) } /// Get the response body as a UTF-8 string. - pub fn text(&self) -> Result { - let response = self.response(false)?; - rt::try_block_on(response.text().map_err(wreq_error_to_magnus)) + pub fn text(&self, args: &[Value]) -> Result { + let token = cannel_token!(args); + let response = self.response(token.as_ref(), false)?; + rt::try_block_on( + token.as_ref(), + response.text().map_err(wreq_error_to_magnus), + ) } /// Get the full response text given a specific encoding. - pub fn text_with_charset(&self, default_encoding: String) -> Result { - let response = self.response(false)?; + pub fn text_with_charset(&self, args: &[Value]) -> Result { + let (default_encoding, token) = cannel_token!(args, (String,)); + let response = self.response(token.as_ref(), false)?; rt::try_block_on( + token.as_ref(), response - .text_with_charset(default_encoding) + .text_with_charset(default_encoding.0) .map_err(wreq_error_to_magnus), ) } /// Get the response body as JSON. - pub fn json(ruby: &Ruby, rb_self: &Self) -> Result { - let response = rb_self.response(false)?; - rt::try_block_on(async move { + pub fn json(ruby: &Ruby, rb_self: &Self, args: &[Value]) -> Result { + let token = cannel_token!(args); + let response = rb_self.response(token.as_ref(), false)?; + rt::try_block_on(token.as_ref(), async move { let json = response .json::() .await @@ -202,10 +218,11 @@ impl Response { } /// Get a chunk iterator for the response body. - pub fn chunks(&self) -> Result, Error> { - self.response(true) + pub fn chunks(&self, args: &[Value]) -> Result, Error> { + let token = cannel_token!(args); + self.response(token.as_ref(), true) .map(wreq::Response::bytes_stream) - .map(BodyReceiver::new) + .map(|stream| BodyReceiver::new(token, stream)) .map(Yield::Iter) } @@ -237,14 +254,14 @@ pub fn include(ruby: &Ruby, gem_module: &RModule) -> Result<(), Error> { response_class.define_method("headers", magnus::method!(Response::headers, 0))?; response_class.define_method("local_addr", magnus::method!(Response::local_addr, 0))?; response_class.define_method("remote_addr", magnus::method!(Response::remote_addr, 0))?; - response_class.define_method("bytes", magnus::method!(Response::bytes, 0))?; - response_class.define_method("text", magnus::method!(Response::text, 0))?; + response_class.define_method("bytes", magnus::method!(Response::bytes, -1))?; + response_class.define_method("text", magnus::method!(Response::text, -1))?; response_class.define_method( "text_with_charset", - magnus::method!(Response::text_with_charset, 1), + magnus::method!(Response::text_with_charset, -1), )?; - response_class.define_method("json", magnus::method!(Response::json, 0))?; - response_class.define_method("chunks", magnus::method!(Response::chunks, 0))?; + response_class.define_method("json", magnus::method!(Response::json, -1))?; + response_class.define_method("chunks", magnus::method!(Response::chunks, -1))?; response_class.define_method("close", magnus::method!(Response::close, 0))?; Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index 4da409c..c3cfeaa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,8 @@ #[macro_use] mod macros; +#[macro_use] +mod rt; mod client; mod cookie; mod emulation; @@ -10,7 +12,6 @@ mod extractor; mod gvl; mod header; mod http; -mod rt; use magnus::{Error, Module, Ruby, Value}; diff --git a/src/macros.rs b/src/macros.rs index 3a8e414..705be1b 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -107,12 +107,3 @@ macro_rules! ruby { magnus::Ruby::get().expect("Failed to get Ruby VM instance") }; } - -macro_rules! extract_request { - ($args:expr, $required:ty) => {{ - let args = magnus::scan_args::scan_args::<$required, (), (), (), magnus::RHash, ()>($args)?; - let required = args.required; - let request = crate::client::req::Request::new(&ruby!(), args.keywords)?; - (required, request) - }}; -} diff --git a/src/rt.rs b/src/rt.rs index 05becf2..61d2399 100644 --- a/src/rt.rs +++ b/src/rt.rs @@ -14,36 +14,88 @@ static RUNTIME: LazyLock = LazyLock::new(|| { .expect("Failed to initialize Tokio runtime") }); +macro_rules! cannel_token { + ($args:ident) => { + magnus::scan_args::scan_args::< + (), + (Option>,), + (), + (), + (), + (), + >($args) + .and_then(|args| Ok(args.optional.0.as_deref().cloned()))? + }; + ($args:expr, $required:ty) => {{ + let args = magnus::scan_args::scan_args::< + $required, + (Option>,), + (), + (), + (), + (), + >($args)?; + let required = args.required; + let token = args.optional.0.as_deref().cloned(); + (required, token) + }}; +} + /// Block on a future to completion on the global Tokio runtime, -/// with support for cancellation via the provided `CancelFlag`. -pub fn try_block_on(future: F) -> F::Output +/// with support for cancellation via the provided [`CancellationToken`]. +pub fn try_block_on(token: Option<&CancellationToken>, future: F) -> F::Output where F: Future>, { - gvl::nogvl_cancellable(|token| { + gvl::nogvl_cancellable(|ct| { RUNTIME.block_on(async move { - tokio::select! { - biased; - _ = token.cancelled() => Err(interrupt_error()), - result = future => result, + match token { + Some(token) => { + tokio::select! { + biased; + _ = ct.cancelled() => Err(interrupt_error()), + _ = token.cancelled() => Err(interrupt_error()), + result = future => result, + } + } + None => { + tokio::select! { + biased; + _ = ct.cancelled() => Err(interrupt_error()), + result = future => result, + } + } } }) }) } /// Block on a future to completion on the global Tokio runtime, -/// returning `None` if cancelled via the provided `CancelFlag`. +/// returning `None` if cancelled via the provided [`CancellationToken`]. #[inline] -pub fn maybe_block_on(future: F) -> F::Output +pub fn maybe_block_on(token: Option<&CancellationToken>, future: F) -> F::Output where F: Future>, { - gvl::nogvl_cancellable(|token| { + gvl::nogvl_cancellable(|ct| { RUNTIME.block_on(async move { - tokio::select! { - biased; - _ = token.cancelled() => None, - result = future => result, + match token { + Some(token) => { + tokio::select! { + biased; + + _ = ct.cancelled() => None, + _ = token.cancelled() => None, + result = future => result, + } + } + None => { + tokio::select! { + biased; + _ = ct.cancelled() => None, + result = future => result, + } + } } }) }) From ab04d8cfc48a338a626f52baca9f4c5f6a078a35 Mon Sep 17 00:00:00 2001 From: 0x676e67 Date: Sun, 28 Dec 2025 22:23:11 +0800 Subject: [PATCH 3/3] fix build --- Cargo.toml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1fe55a9..2c61580 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,9 +18,7 @@ magnus = { version = "0.8", features = ["bytes"] } rb-sys = { version = "0.9.110", default-features = false } tokio = { version = "1", features = ["full"] } tokio-util = { version = "0.7.17", default-features = false } -wreq = { version = "6.0.0-rc.24", features = [ - "query", - "form", +wreq = { version = "6.0.0-rc.25", features = [ "json", "socks", "stream",