Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ name = "wreq_ruby"
magnus = { version = "0.8", features = ["bytes"] }
rb-sys = { version = "0.9.110", default-features = false }
tokio = { version = "1", features = ["full"] }
wreq = { version = "6.0.0-rc.24", features = [
"query",
"form",
tokio-util = { version = "0.7.17", default-features = false }
wreq = { version = "6.0.0-rc.25", features = [
"json",
"socks",
"stream",
Expand Down Expand Up @@ -52,6 +51,3 @@ incremental = false
lto = "fat"
opt-level = 3
strip = true

[patch.crates-io]
wreq = { git = "https://github.com/0x676e67/wreq" }
30 changes: 25 additions & 5 deletions examples/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,33 @@

require_relative "../lib/wreq"


# Make a request
client = Wreq::Client.new
response = client.get("https://httpbin.io/stream/20")

# Get the streaming body receiver
puts "\n=== Streaming Response Body ==="
response.chunks do |chunk|
puts chunk
sleep 0.1 # Simulate processing time
# Create a cancellation token
token = Wreq::CancellationToken.new

count = 0

# Start a thread to process streaming chunks
stream_thread = Thread.new do
begin
response.chunks(token) do |chunk|
count += 1
puts chunk
sleep 0.1 # Simulate processing time
end
rescue => e
puts "[Streaming interrupted: #{e.class}: #{e.message}]"
end
end

# Main thread: cancel after 1 second
sleep 1
token.cancel
puts "\n[Cancelled by main thread!]"

stream_thread.join
puts "\nChunks received before cancellation: #{count} (should be less than 20)"
27 changes: 27 additions & 0 deletions lib/wreq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,33 @@

unless defined?(Wreq)
module Wreq
# CancellationToken allows Ruby code to cooperatively cancel long-running Rust async tasks.
#
# This class is a binding to a Rust-side cancellation token, enabling Ruby to signal cancellation
# to HTTP requests, streaming, or other operations running in Rust. When `cancel` is called, all
# Rust tasks observing this token will be notified and can abort promptly.
#
# @example
# token = Wreq::CancellationToken.new
# # Pass token to a Wreq request or stream
# token.cancel # Signal cancellation from Ruby
#
# The actual implementation and state are managed by the Rust extension.
class CancellationToken
# Create a new cancellation token.
#
# @return [Wreq::CancellationToken]
def self.new
end

# Signal cancellation to all Rust tasks observing this token.
#
# @return [void]
def cancel
end
end

# This is a placeholder. The actual value is set by the Rust implementation.
VERSION = nil

# Send an HTTP request.
Expand Down
7 changes: 6 additions & 1 deletion lib/wreq_ruby/response.rb
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ def json
# by yielding each chunk of the body as it arrives, without loading
# the entire response into memory.
#
# @param token [Wreq::CancellationToken, nil] Optional cancellation token for cooperative cancellation.
# If provided, streaming will be interrupted if the token is cancelled from Ruby.
# @return An iterator over response body chunks (binary String)
# @yield [chunk] Each chunk of the response body as a binary String
# @example Save response to file
Expand All @@ -136,9 +138,12 @@ def json
# total = 0
# response.chunks { |chunk| total += chunk.bytesize }
# puts "Downloaded #{total} bytes"
# @example With cancellation
# token = Wreq::CancellationToken.new
# response.chunks(token) { |chunk| ... }
#
# Note: The returned Receiver is only for reading response bodies, not for uploads.
def chunks
def chunks(token = nil)
end

# Close the response and free associated resources.
Expand Down
54 changes: 36 additions & 18 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,26 @@ use crate::{
extractor::Extractor,
gvl,
http::Method,
rt::CancellationToken,
};

macro_rules! request {
($args:expr, $required:ty) => {{
let args = magnus::scan_args::scan_args::<
$required,
(Option<magnus::typed_data::Obj<CancellationToken>>,),
(),
(),
magnus::RHash,
(),
>($args)?;
let token = args.optional.0.as_deref().cloned();
let required = args.required;
let request = crate::client::req::Request::new(&ruby!(), args.keywords)?;
(token, required, request)
}};
}

/// A builder for `Client`.
#[derive(Default, Deserialize)]
struct Builder {
Expand Down Expand Up @@ -312,64 +330,64 @@ impl Client {
/// Send a HTTP request.
#[inline]
pub fn request(rb_self: &Self, args: &[Value]) -> Result<Response, magnus::Error> {
let ((method, url), request) = extract_request!(args, (Obj<Method>, String));
execute_request(rb_self.0.clone(), *method, url, request)
let (token, (method, url), request) = request!(args, (Obj<Method>, String));
execute_request(token, rb_self.0.clone(), *method, url, request)
}

/// Send a GET request.
#[inline]
pub fn get(rb_self: &Self, args: &[Value]) -> Result<Response, magnus::Error> {
let ((url,), request) = extract_request!(args, (String,));
execute_request(rb_self.0.clone(), Method::GET, url, request)
let (token, (url,), request) = request!(args, (String,));
execute_request(token, rb_self.0.clone(), Method::GET, url, request)
}

/// Send a POST request.
#[inline]
pub fn post(rb_self: &Self, args: &[Value]) -> Result<Response, magnus::Error> {
let ((url,), request) = extract_request!(args, (String,));
execute_request(rb_self.0.clone(), Method::POST, url, request)
let (token, (url,), request) = request!(args, (String,));
execute_request(token, rb_self.0.clone(), Method::POST, url, request)
}

/// Send a PUT request.
#[inline]
pub fn put(rb_self: &Self, args: &[Value]) -> Result<Response, magnus::Error> {
let ((url,), request) = extract_request!(args, (String,));
execute_request(rb_self.0.clone(), Method::PUT, url, request)
let (token, (url,), request) = request!(args, (String,));
execute_request(token, rb_self.0.clone(), Method::PUT, url, request)
}

/// Send a DELETE request.
#[inline]
pub fn delete(rb_self: &Self, args: &[Value]) -> Result<Response, magnus::Error> {
let ((url,), request) = extract_request!(args, (String,));
execute_request(rb_self.0.clone(), Method::DELETE, url, request)
let (token, (url,), request) = request!(args, (String,));
execute_request(token, rb_self.0.clone(), Method::DELETE, url, request)
}

/// Send a HEAD request.
#[inline]
pub fn head(rb_self: &Self, args: &[Value]) -> Result<Response, magnus::Error> {
let ((url,), request) = extract_request!(args, (String,));
execute_request(rb_self.0.clone(), Method::HEAD, url, request)
let (token, (url,), request) = request!(args, (String,));
execute_request(token, rb_self.0.clone(), Method::HEAD, url, request)
}

/// Send an OPTIONS request.
#[inline]
pub fn options(rb_self: &Self, args: &[Value]) -> Result<Response, magnus::Error> {
let ((url,), request) = extract_request!(args, (String,));
execute_request(rb_self.0.clone(), Method::OPTIONS, url, request)
let (token, (url,), request) = request!(args, (String,));
execute_request(token, rb_self.0.clone(), Method::OPTIONS, url, request)
}

/// Send a TRACE request.
#[inline]
pub fn trace(rb_self: &Self, args: &[Value]) -> Result<Response, magnus::Error> {
let ((url,), request) = extract_request!(args, (String,));
execute_request(rb_self.0.clone(), Method::TRACE, url, request)
let (token, (url,), request) = request!(args, (String,));
execute_request(token, rb_self.0.clone(), Method::TRACE, url, request)
}

/// Send a PATCH request.
#[inline]
pub fn patch(rb_self: &Self, args: &[Value]) -> Result<Response, magnus::Error> {
let ((url,), request) = extract_request!(args, (String,));
execute_request(rb_self.0.clone(), Method::PATCH, url, request)
let (token, (url,), request) = request!(args, (String,));
execute_request(token, rb_self.0.clone(), Method::PATCH, url, request)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/client/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl From<Body> for wreq::Body {
pub fn include(ruby: &Ruby, gem_module: &RModule) -> Result<(), Error> {
let sender_class = gem_module.define_class("BodySender", ruby.class_object())?;
sender_class.define_singleton_method("new", function!(BodySender::new, -1))?;
sender_class.define_method("push", method!(BodySender::push, 1))?;
sender_class.define_method("push", method!(BodySender::push, -1))?;
sender_class.define_method("close", magnus::method!(BodySender::close, 0))?;
Ok(())
}
31 changes: 22 additions & 9 deletions src/client/body/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ use tokio::sync::{

use crate::{
error::{memory_error, mpsc_send_error_to_magnus},
rt,
rt::{self, CancellationToken},
};

/// A receiver for streaming HTTP response bodies.
pub struct BodyReceiver(Mutex<Pin<Box<dyn Stream<Item = wreq::Result<Bytes>> + Send>>>);
pub struct BodyReceiver {
token: Option<CancellationToken>,
inner: Mutex<Pin<Box<dyn Stream<Item = wreq::Result<Bytes>> + Send>>>,
}

/// A sender for streaming HTTP request bodies.
#[magnus::wrap(class = "Wreq::BodySender", free_immediately, size)]
Expand All @@ -34,17 +37,23 @@ struct InnerBodySender {
impl BodyReceiver {
/// Create a new [`BodyReceiver`] instance.
#[inline]
pub fn new(stream: impl Stream<Item = wreq::Result<Bytes>> + Send + 'static) -> BodyReceiver {
BodyReceiver(Mutex::new(Box::pin(stream)))
pub fn new(
token: Option<CancellationToken>,
stream: impl Stream<Item = wreq::Result<Bytes>> + Send + 'static,
) -> BodyReceiver {
BodyReceiver {
token,
inner: Mutex::new(Box::pin(stream)),
}
}
}

impl Iterator for BodyReceiver {
type Item = Bytes;

fn next(&mut self) -> Option<Self::Item> {
rt::maybe_block_on(async {
self.0
rt::maybe_block_on(self.token.as_ref(), async {
self.inner
.lock()
.await
.as_mut()
Expand Down Expand Up @@ -74,11 +83,15 @@ impl BodySender {
}

/// Ruby: `push(data)` where data is String or bytes
pub fn push(rb_self: &Self, data: RString) -> Result<(), Error> {
let bytes = data.to_bytes();
pub fn push(rb_self: &Self, args: &[Value]) -> Result<(), Error> {
let (data, token) = cannel_token!(args, (RString,));
let bytes = data.0.to_bytes();
let inner = rb_self.0.read().unwrap();
if let Some(ref tx) = inner.tx {
rt::try_block_on(tx.send(bytes).map_err(mpsc_send_error_to_magnus))?;
rt::try_block_on(
token.as_ref(),
tx.send(bytes).map_err(mpsc_send_error_to_magnus),
)?;
}
Ok(())
}
Expand Down
9 changes: 5 additions & 4 deletions src/client/req.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{net::IpAddr, time::Duration};

use http::{HeaderValue, header};
use magnus::{RHash, TryConvert, typed_data::Obj, value::ReprValue};
use magnus::{RHash, Ruby, TryConvert, typed_data::Obj, value::ReprValue};
use serde::Deserialize;
use wreq::{
Client, Proxy, Version,
Expand All @@ -15,7 +15,7 @@ use crate::{
error::wreq_error_to_magnus,
extractor::Extractor,
http::Method,
rt,
rt::{self, CancellationToken},
};

/// The parameters for a request.
Expand Down Expand Up @@ -104,7 +104,7 @@ pub struct Request {

impl Request {
/// Create a new [`Request`] from Ruby keyword arguments.
pub fn new(ruby: &magnus::Ruby, hash: RHash) -> Result<Self, magnus::Error> {
pub fn new(ruby: &Ruby, hash: RHash) -> Result<Self, magnus::Error> {
let kwargs = hash.as_value();
let mut builder: Self = serde_magnus::deserialize(ruby, kwargs)?;

Expand Down Expand Up @@ -139,12 +139,13 @@ impl Request {
}

pub fn execute_request<U: AsRef<str>>(
token: Option<CancellationToken>,
client: Client,
method: Method,
url: U,
mut request: Request,
) -> Result<Response, magnus::Error> {
rt::try_block_on(async move {
rt::try_block_on(token.as_ref(), async move {
let mut builder = client.request(method.into_ffi(), url.as_ref());

// Emulation options.
Expand Down
Loading