diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java index bce1820b482..b353a34eff5 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY; +import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; @@ -155,6 +156,9 @@ public final void setDecompressorRegistry(DecompressorRegistry decompressorRegis public final void start(ClientStreamListener listener) { transportState().setListener(listener); if (!useGet) { + // Capture the message encoding before headers are cleared, so we can warn + // if the server doesn't advertise support for it (issue #1804) + transportState().setSentMessageEncoding(headers.get(MESSAGE_ENCODING_KEY)); abstractClientStreamSink().writeHeaders(headers, null); headers = null; } @@ -224,6 +228,8 @@ protected abstract static class TransportState extends AbstractStream.TransportS private ClientStreamListener listener; private boolean fullStreamDecompression; private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance(); + /** The message encoding sent by the client, or null if identity/none. */ + @Nullable private String sentMessageEncoding; private boolean deframerClosed = false; private Runnable deframerClosedTask; @@ -261,6 +267,16 @@ private void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) checkNotNull(decompressorRegistry, "decompressorRegistry"); } + /** + * Sets the message encoding that the client is using for outbound messages. + * Used to warn if the server doesn't advertise support for this encoding. + * + * @param messageEncoding the encoding name (e.g., "gzip"), or null for identity + */ + protected void setSentMessageEncoding(@Nullable String messageEncoding) { + this.sentMessageEncoding = messageEncoding; + } + @VisibleForTesting public final void setListener(ClientStreamListener listener) { checkState(this.listener == null, "Already called setListener"); @@ -342,6 +358,19 @@ protected void inboundHeadersReceived(Metadata headers) { } } + // Warn if client sent compressed messages but server didn't advertise support + if (sentMessageEncoding != null) { + byte[] acceptEncoding = headers.get(MESSAGE_ACCEPT_ENCODING_KEY); + if (acceptEncoding == null) { + log.log( + Level.FINE, + "Server did not include grpc-accept-encoding header in response. " + + "Client sent messages with encoding [{0}]. " + + "The server may not support this encoding.", + sentMessageEncoding); + } + } + listener().headersRead(headers); } diff --git a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java index 8f14b74035c..444bbe99c4e 100644 --- a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java @@ -54,7 +54,13 @@ import java.io.IOException; import java.io.InputStream; import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.logging.Handler; +import java.util.logging.Level; +import java.util.logging.LogRecord; +import java.util.logging.Logger; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -527,6 +533,140 @@ public void resetOnReadyThreshold() { assertNull(options.clearOnReadyThreshold().getOnReadyThreshold()); } + @Test + public void inboundHeadersReceived_warnsWhenServerDoesNotAdvertiseAcceptEncoding() { + // Set up log capture + Logger logger = Logger.getLogger(AbstractClientStream.class.getName()); + Level originalLevel = logger.getLevel(); + List logs = new ArrayList<>(); + Handler handler = new Handler() { + @Override + public void publish(LogRecord record) { + logs.add(record); + } + + @Override + public void flush() {} + + @Override + public void close() {} + }; + logger.addHandler(handler); + logger.setLevel(Level.FINE); + + try { + AbstractClientStream stream = + new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer); + stream.start(mockListener); + + // Simulate that client sent gzip-compressed messages + stream.transportState().setSentMessageEncoding("gzip"); + + // Server responds without grpc-accept-encoding header + Metadata headers = new Metadata(); + stream.transportState().inboundHeadersReceived(headers); + + // Verify warning was logged + verify(mockListener).headersRead(headers); + assertThat(logs).hasSize(1); + LogRecord record = logs.get(0); + assertThat(record.getLevel()).isEqualTo(Level.FINE); + assertThat(record.getMessage()).contains("grpc-accept-encoding"); + // The parameter {0} contains the encoding + assertThat(record.getParameters()).asList().contains("gzip"); + } finally { + logger.removeHandler(handler); + logger.setLevel(originalLevel); + } + } + + @Test + public void inboundHeadersReceived_noWarningWhenServerAdvertisesAcceptEncoding() { + // Set up log capture + Logger logger = Logger.getLogger(AbstractClientStream.class.getName()); + Level originalLevel = logger.getLevel(); + List logs = new ArrayList<>(); + Handler handler = new Handler() { + @Override + public void publish(LogRecord record) { + logs.add(record); + } + + @Override + public void flush() {} + + @Override + public void close() {} + }; + logger.addHandler(handler); + logger.setLevel(Level.FINE); + + try { + AbstractClientStream stream = + new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer); + stream.start(mockListener); + + // Simulate that client sent gzip-compressed messages + stream.transportState().setSentMessageEncoding("gzip"); + + // Server responds with grpc-accept-encoding header + Metadata headers = new Metadata(); + headers.put( + GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY, + "gzip".getBytes(java.nio.charset.StandardCharsets.US_ASCII)); + stream.transportState().inboundHeadersReceived(headers); + + // Verify no warning was logged + verify(mockListener).headersRead(headers); + assertThat(logs).isEmpty(); + } finally { + logger.removeHandler(handler); + logger.setLevel(originalLevel); + } + } + + @Test + public void inboundHeadersReceived_noWarningWhenClientDidNotUseCompression() { + // Set up log capture + Logger logger = Logger.getLogger(AbstractClientStream.class.getName()); + Level originalLevel = logger.getLevel(); + List logs = new ArrayList<>(); + Handler handler = new Handler() { + @Override + public void publish(LogRecord record) { + logs.add(record); + } + + @Override + public void flush() {} + + @Override + public void close() {} + }; + logger.addHandler(handler); + logger.setLevel(Level.FINE); + + try { + AbstractClientStream stream = + new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer); + stream.start(mockListener); + + // Client did not set any encoding (using identity/no compression) + // sentMessageEncoding is null by default + + // Server responds without grpc-accept-encoding header + Metadata headers = new Metadata(); + stream.transportState().inboundHeadersReceived(headers); + + // Verify no warning was logged (client didn't use compression) + verify(mockListener).headersRead(headers); + assertThat(logs).isEmpty(); + } finally { + logger.removeHandler(handler); + logger.setLevel(originalLevel); + } + } + /** * No-op base class for testing. */