From f028ee0619c56d5db2e9314f12d847d4ed6e2de3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Th=C3=A9riault?= Date: Wed, 11 Feb 2026 16:57:45 -0500 Subject: [PATCH 1/2] make log record type generic --- lambda-extension/src/telemetry.rs | 46 +++++++++++++++++++++++++------ 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/lambda-extension/src/telemetry.rs b/lambda-extension/src/telemetry.rs index a7760892..dbf7ed8e 100644 --- a/lambda-extension/src/telemetry.rs +++ b/lambda-extension/src/telemetry.rs @@ -11,23 +11,23 @@ use tracing::{error, trace}; /// Payload received from the Telemetry API #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] -pub struct LambdaTelemetry { +pub struct LambdaTelemetry { /// Time when the telemetry was generated pub time: DateTime, /// Telemetry record entry #[serde(flatten)] - pub record: LambdaTelemetryRecord, + pub record: LambdaTelemetryRecord, } /// Record in a LambdaTelemetry entry #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] #[serde(tag = "type", content = "record", rename_all = "lowercase")] -pub enum LambdaTelemetryRecord { +pub enum LambdaTelemetryRecord { /// Function log records - Function(String), + Function(L), /// Extension log records - Extension(String), + Extension(L), /// Platform init start record #[serde(rename = "platform.initStart", rename_all = "camelCase")] @@ -319,12 +319,12 @@ mod deserialization_tests { use chrono::{TimeDelta, TimeZone}; macro_rules! deserialize_tests { - ($($name:ident: $value:expr,)*) => { + ($($name:ident$(<$log:ty>)?: $value:expr,)*) => { $( #[test] fn $name() { let (input, expected) = $value; - let actual = serde_json::from_str::(&input).expect("unable to deserialize"); + let actual = serde_json::from_str::)?>(&input).expect("unable to deserialize"); assert!(actual.record == expected); } @@ -339,12 +339,24 @@ mod deserialization_tests { LambdaTelemetryRecord::Function("hello world".to_string()), ), + // function (json) + function_json: ( + r#"{"time": "2020-08-20T12:31:32.123Z","type": "function", "record": true}"#, + LambdaTelemetryRecord::Function(true), + ), + // extension extension: ( r#"{"time": "2020-08-20T12:31:32.123Z","type": "extension", "record": "hello world"}"#, LambdaTelemetryRecord::Extension("hello world".to_string()), ), + // extension (json) + extension_json: ( + r#"{"time": "2020-08-20T12:31:32.123Z","type": "extension", "record": true}"#, + LambdaTelemetryRecord::Extension(true), + ), + // platform.start platform_start: ( r#"{"time":"2022-10-21T14:05:03.165Z","type":"platform.start","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","version":"$LATEST","tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}"#, @@ -477,11 +489,11 @@ mod serialization_tests { use super::*; macro_rules! serialize_tests { - ($($name:ident: $value:expr,)*) => { + ($($name:ident$(<$log:ty>)?: $value:expr,)*) => { $( #[test] fn $name() { - let (input, expected) = $value; + let (input, expected): (LambdaTelemetry$(<$log>)?, &str) = $value; let actual = serde_json::to_string(&input).expect("unable to serialize"); println!("Input: {:?}\n", input); println!("Expected:\n {:?}\n", expected); @@ -502,6 +514,14 @@ mod serialization_tests { }, r#"{"time":"2023-11-28T12:00:09Z","type":"function","record":"hello world"}"#, ), + // function (json) + function_json: ( + LambdaTelemetry { + time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(), + record: LambdaTelemetryRecord::Function(true), + }, + r#"{"time":"2023-11-28T12:00:09Z","type":"function","record":true}"#, + ), // extension extension: ( LambdaTelemetry { @@ -510,6 +530,14 @@ mod serialization_tests { }, r#"{"time":"2023-11-28T12:00:09Z","type":"extension","record":"hello world"}"#, ), + // extension (json) + extension_json: ( + LambdaTelemetry { + time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(), + record: LambdaTelemetryRecord::Extension(true), + }, + r#"{"time":"2023-11-28T12:00:09Z","type":"extension","record":true}"#, + ), //platform.Start platform_start: ( LambdaTelemetry{ From 6b36b4526b3eda07a2a78fb749967a1d0e16129f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Th=C3=A9riault?= Date: Wed, 11 Feb 2026 18:04:53 -0500 Subject: [PATCH 2/2] propagate log record type generic --- lambda-extension/src/extension.rs | 34 +++++++++++++++++++++---------- lambda-extension/src/telemetry.rs | 9 ++++---- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/lambda-extension/src/extension.rs b/lambda-extension/src/extension.rs index e7d83847..cac1e604 100644 --- a/lambda-extension/src/extension.rs +++ b/lambda-extension/src/extension.rs @@ -4,7 +4,7 @@ use hyper::{body::Incoming, server::conn::http1, service::service_fn}; use hyper_util::rt::tokio::TokioIo; use lambda_runtime_api_client::Client; -use serde::Deserialize; +use serde::{de::DeserializeOwned, Deserialize}; use std::{ convert::Infallible, fmt, @@ -83,14 +83,6 @@ where L::Error: Into + fmt::Debug, L::MakeError: Into + fmt::Debug, L::Future: Send, - - // Fixme: 'static bound might be too restrictive - T: MakeService<(), Vec, Response = ()> + Send + Sync + 'static, - T::Service: Service, Response = ()> + Send + Sync, - >>::Future: Send + 'a, - T::Error: Into + fmt::Debug, - T::MakeError: Into + fmt::Debug, - T::Future: Send, { /// Create a new [`Extension`] with a given extension name pub fn with_extension_name(self, extension_name: &'a str) -> Self { @@ -232,7 +224,17 @@ where /// Lambda lifecycle operations to register the extension. When implementing an internal Lambda /// extension, it is safe to call `lambda_runtime::run` once the future returned by this /// function resolves. - pub async fn register(self) -> Result, Error> { + pub async fn register(self) -> Result, Error> + where + // Fixme: 'static bound might be too restrictive + T: MakeService<(), Vec>, Response = ()> + Send + Sync + 'static, + T::Service: Service>, Response = ()> + Send + Sync, + >>>::Future: Send + 'a, + T::Error: Into + fmt::Debug, + T::MakeError: Into + fmt::Debug, + T::Future: Send, + TL: DeserializeOwned + Send + 'static, + { let client = &Client::builder().build()?; let register_res = register(client, self.extension_name, self.events).await?; @@ -340,7 +342,17 @@ where } /// Execute the given extension. - pub async fn run(self) -> Result<(), Error> { + pub async fn run(self) -> Result<(), Error> + where + // Fixme: 'static bound might be too restrictive + T: MakeService<(), Vec>, Response = ()> + Send + Sync + 'static, + T::Service: Service>, Response = ()> + Send + Sync, + >>>::Future: Send + 'a, + T::Error: Into + fmt::Debug, + T::MakeError: Into + fmt::Debug, + T::Future: Send, + TL: DeserializeOwned + Send + 'static, + { self.register().await?.run().await } } diff --git a/lambda-extension/src/telemetry.rs b/lambda-extension/src/telemetry.rs index dbf7ed8e..47a00fc5 100644 --- a/lambda-extension/src/telemetry.rs +++ b/lambda-extension/src/telemetry.rs @@ -3,7 +3,7 @@ use http::{Request, Response}; use http_body_util::BodyExt; use hyper::body::Incoming; use lambda_runtime_api_client::body::Body; -use serde::{Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::{boxed::Box, fmt, sync::Arc}; use tokio::sync::Mutex; use tower::Service; @@ -269,14 +269,15 @@ pub struct RuntimeDoneMetrics { /// /// This takes an `hyper::Request` and transforms it into `Vec` for the /// underlying `Service` to process. -pub(crate) async fn telemetry_wrapper( +pub(crate) async fn telemetry_wrapper( service: Arc>, req: Request, ) -> Result, Box> where - S: Service, Response = ()>, + S: Service>, Response = ()>, S::Error: Into> + fmt::Debug, S::Future: Send, + L: DeserializeOwned, { trace!("Received telemetry request"); // Parse the request body as a Vec @@ -291,7 +292,7 @@ where } }; - let telemetry: Vec = match serde_json::from_slice(&body.to_bytes()) { + let telemetry: Vec> = match serde_json::from_slice(&body.to_bytes()) { Ok(telemetry) => telemetry, Err(e) => { error!("Error parsing telemetry: {}", e);