diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java index 6904340ac74..5578af75b4a 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java @@ -101,7 +101,7 @@ private GrpcOpenTelemetry(Builder builder) { this.optionalLabels = ImmutableList.copyOf(builder.optionalLabels); this.openTelemetryMetricsModule = new OpenTelemetryMetricsModule( STOPWATCH_SUPPLIER, resource, optionalLabels, builder.plugins, - builder.targetFilter); + builder.targetFilter, openTelemetrySdk.getPropagators()); this.openTelemetryTracingModule = new OpenTelemetryTracingModule(openTelemetrySdk); this.sink = new OpenTelemetryMetricSink(meter, enableMetrics, disableDefault, optionalLabels); } diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index b05884305dc..34f23c90fef 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -49,6 +49,7 @@ import io.opentelemetry.api.baggage.Baggage; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.ContextPropagators; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -103,22 +104,27 @@ final class OpenTelemetryMetricsModule { @Nullable private final TargetFilter targetAttributeFilter; + private final ContextPropagators contextPropagators; + OpenTelemetryMetricsModule(Supplier stopwatchSupplier, OpenTelemetryMetricsResource resource, - Collection optionalLabels, List plugins) { - this(stopwatchSupplier, resource, optionalLabels, plugins, null); + Collection optionalLabels, List plugins, + ContextPropagators contextPropagators) { + this(stopwatchSupplier, resource, optionalLabels, plugins, null, contextPropagators); } OpenTelemetryMetricsModule(Supplier stopwatchSupplier, OpenTelemetryMetricsResource resource, Collection optionalLabels, List plugins, - @Nullable TargetFilter targetAttributeFilter) { + @Nullable TargetFilter targetAttributeFilter, + ContextPropagators contextPropagators) { this.resource = checkNotNull(resource, "resource"); this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey()); this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey()); this.plugins = ImmutableList.copyOf(plugins); this.targetAttributeFilter = targetAttributeFilter; + this.contextPropagators = checkNotNull(contextPropagators, "contextPropagators"); } @VisibleForTesting @@ -159,8 +165,7 @@ static String recordMethodName(String fullMethodName, boolean isGeneratedMethod) return isGeneratedMethod ? fullMethodName : "other"; } - private static Context otelContextWithBaggage() { - Baggage baggage = BAGGAGE_KEY.get(); + private static Context otelContextWithBaggage(@Nullable Baggage baggage) { if (baggage == null) { return Context.current(); } @@ -206,16 +211,19 @@ private static final class ClientTracer extends ClientStreamTracer { volatile String backendService; long attemptNanos; Code statusCode; + final Baggage baggage; ClientTracer(CallAttemptsTracerFactory attemptsState, OpenTelemetryMetricsModule module, StreamInfo info, String target, String fullMethodName, - List streamPlugins) { + List streamPlugins, + Baggage baggage) { this.attemptsState = attemptsState; this.module = module; this.info = info; this.target = target; this.fullMethodName = fullMethodName; this.streamPlugins = streamPlugins; + this.baggage = baggage; this.stopwatch = module.stopwatchSupplier.get().start(); } @@ -282,7 +290,7 @@ public void streamClosed(Status status) { } void recordFinishedAttempt() { - Context otelContext = otelContextWithBaggage(); + Context otelContext = otelContextWithBaggage(baggage); AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder() .put(METHOD_KEY, fullMethodName) .put(TARGET_KEY, target) @@ -301,6 +309,10 @@ void recordFinishedAttempt() { } builder.put(BACKEND_SERVICE_KEY, savedBackendService); } + for (java.util.Map.Entry entry : + baggage.asMap().entrySet()) { + builder.put(entry.getKey(), entry.getValue().getValue()); + } for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) { plugin.addLabels(builder); } @@ -342,6 +354,7 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory private int activeStreams; @GuardedBy("lock") private boolean finishedCallToBeRecorded; + private final Baggage baggage; CallAttemptsTracerFactory( OpenTelemetryMetricsModule module, @@ -354,6 +367,10 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory this.callPlugins = checkNotNull(callPlugins, "callPlugins"); this.attemptDelayStopwatch = module.stopwatchSupplier.get(); this.callStopWatch = module.stopwatchSupplier.get().start(); + Baggage currentBaggage = BAGGAGE_KEY.get(); + this.baggage = currentBaggage == null + ? Baggage.fromContext(Context.current()) + : currentBaggage; io.opentelemetry.api.common.Attributes attribute = io.opentelemetry.api.common.Attributes.of( METHOD_KEY, fullMethodName, @@ -407,7 +424,8 @@ private ClientTracer newClientTracer(StreamInfo info) { } streamPlugins = Collections.unmodifiableList(streamPlugins); } - return new ClientTracer(this, module, info, target, fullMethodName, streamPlugins); + return new ClientTracer(this, module, info, target, fullMethodName, streamPlugins, + baggage); } // Called whenever each attempt is ended. @@ -448,7 +466,7 @@ void callEnded(Status status) { } void recordFinishedCall() { - Context otelContext = otelContextWithBaggage(); + Context otelContext = otelContextWithBaggage(baggage); if (attemptsPerCall.get() == 0) { ClientTracer tracer = newClientTracer(null); tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS); @@ -463,6 +481,12 @@ void recordFinishedCall() { METHOD_KEY, fullMethodName, TARGET_KEY, target ); + AttributesBuilder baseAttributesBuilder = baseAttributes.toBuilder(); + for (java.util.Map.Entry entry + : baggage.asMap().entrySet()) { + baseAttributesBuilder.put(entry.getKey(), entry.getValue().getValue()); + } + baseAttributes = baseAttributesBuilder.build(); // Duration if (module.resource.clientCallDurationCounter() != null) { @@ -554,11 +578,14 @@ private static final class ServerTracer extends ServerStreamTracer { private volatile long outboundWireSize; private volatile long inboundWireSize; + private final Baggage baggage; + ServerTracer(OpenTelemetryMetricsModule module, String fullMethodName, - List streamPlugins) { + List streamPlugins, @Nullable Baggage baggage) { this.module = checkNotNull(module, "module"); this.fullMethodName = fullMethodName; this.streamPlugins = checkNotNull(streamPlugins, "streamPlugins"); + this.baggage = baggage; this.stopwatch = module.stopwatchSupplier.get().start(); } @@ -606,7 +633,9 @@ public void inboundWireSize(long bytes) { */ @Override public void streamClosed(Status status) { - Context otelContext = otelContextWithBaggage(); + Baggage grpcContextBaggage = BAGGAGE_KEY.get(); + Baggage baggage = grpcContextBaggage != null ? grpcContextBaggage : this.baggage; + Context otelContext = otelContextWithBaggage(baggage); if (streamClosedUpdater != null) { if (streamClosedUpdater.getAndSet(this, 1) != 0) { return; @@ -622,6 +651,12 @@ public void streamClosed(Status status) { AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder() .put(METHOD_KEY, recordMethodName(fullMethodName, isGeneratedMethod)) .put(STATUS_KEY, status.getCode().toString()); + if (baggage != null) { + for (java.util.Map.Entry entry + : baggage.asMap().entrySet()) { + builder.put(entry.getKey(), entry.getValue().getValue()); + } + } for (OpenTelemetryPlugin.ServerStreamPlugin plugin : streamPlugins) { plugin.addLabels(builder); } @@ -657,7 +692,11 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata } streamPlugins = Collections.unmodifiableList(streamPluginsMutable); } - return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins); + Context context = contextPropagators.getTextMapPropagator().extract( + Context.current(), headers, MetadataGetter.getInstance()); + Baggage baggage = Baggage.fromContext(context); + return new ServerTracer( + OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins, baggage); } } diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index 391f94cefea..d8c953dd753 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -65,6 +65,7 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.context.Scope; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; @@ -1245,7 +1246,8 @@ public void clientLocalityMetrics_present() { OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), emptyList()); + fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), emptyList(), + openTelemetryTesting.getOpenTelemetry().getPropagators()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); @@ -1313,7 +1315,8 @@ public void clientLocalityMetrics_missing() { OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), emptyList()); + fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), emptyList(), + openTelemetryTesting.getOpenTelemetry().getPropagators()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); @@ -1378,7 +1381,8 @@ public void clientBackendServiceMetrics_present() { enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"), - emptyList()); + emptyList(), + openTelemetryTesting.getOpenTelemetry().getPropagators()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); @@ -1447,7 +1451,8 @@ public void clientBackendServiceMetrics_missing() { enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"), - emptyList()); + emptyList(), + openTelemetryTesting.getOpenTelemetry().getPropagators()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); @@ -1629,11 +1634,62 @@ public void serverBasicMetrics() { } + @Test + public void clientBaggagePropagationToMetrics() { + // Create module and tracer factory + // We use a custom resource with a mock counter to check the Context passed to + // record() + DoubleHistogram mockClientAttemptDurationCounter = org.mockito.Mockito + .mock(DoubleHistogram.class); + OpenTelemetryMetricsResource customResource = OpenTelemetryMetricsResource.builder() + .clientAttemptDurationCounter(mockClientAttemptDurationCounter) + .build(); + + OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( + fakeClock.getStopwatchSupplier(), customResource, emptyList(), emptyList(), + openTelemetryTesting.getOpenTelemetry().getPropagators()); + + // Define the test baggage and create a Context with it + Baggage testBaggage = Baggage.builder() + .put("user-id", "42") + .build(); + io.opentelemetry.context.Context otelContext = io.opentelemetry.context.Context + .current().with(testBaggage); + + // Create Tracer Factory within the Scope of the Context (simulating + // application thread) + CallAttemptsTracerFactory tracerFactory; + try (Scope scope = otelContext.makeCurrent()) { + tracerFactory = new CallAttemptsTracerFactory( + module, "target", method.getFullMethodName(), emptyList()); + } + + // 4. Create a stream tracer (simulating an attempt) + ClientStreamTracer.StreamInfo streamInfo = ClientStreamTracer.StreamInfo.newBuilder().build(); + ClientStreamTracer tracer = tracerFactory.newClientStreamTracer(streamInfo, new Metadata()); + + // 5. Trigger metric recording + tracer.streamClosed(Status.OK); + + // Verify the record call and capture the OTel Context + verify(mockClientAttemptDurationCounter).record( + anyDouble(), + any(io.opentelemetry.api.common.Attributes.class), + contextCaptor.capture()); + + // Assert on the captured OTel Context + io.opentelemetry.context.Context capturedOtelContext = contextCaptor.getValue(); + Baggage capturedBaggage = Baggage.fromContext(capturedOtelContext); + + assertEquals("42", capturedBaggage.getEntryValue("user-id")); + } + @Test public void serverBaggagePropagationToMetrics() { // 1. Create module and tracer factory using the mock resource OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList()); + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), + openTelemetryTesting.getOpenTelemetry().getPropagators()); ServerStreamTracer.Factory tracerFactory = module.getServerTracerFactory(); ServerStreamTracer tracer = tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata()); @@ -1802,13 +1858,15 @@ public void targetAttributeFilter_rejectsTarget_mapsToOther() { private OpenTelemetryMetricsModule newOpenTelemetryMetricsModule( OpenTelemetryMetricsResource resource) { return new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList()); + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), + openTelemetryTesting.getOpenTelemetry().getPropagators()); } private OpenTelemetryMetricsModule newOpenTelemetryMetricsModule( OpenTelemetryMetricsResource resource, TargetFilter filter) { return new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), filter); + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), filter, + openTelemetryTesting.getOpenTelemetry().getPropagators()); } static class CallInfo extends ServerCallInfo { @@ -1848,7 +1906,8 @@ public void serverBaggagePropagation_EndToEnd() throws Exception { OpenTelemetry otel = openTelemetryTesting.getOpenTelemetry(); OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(otel); OpenTelemetryMetricsModule metricsModule = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList()); + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), + openTelemetryTesting.getOpenTelemetry().getPropagators()); // 2. Create Server with *both* tracer factories server = InProcessServerBuilder.forName(serverName)