这是indexloc提供的服务,不要输入任何密码
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
9494f51
new decompressor api
yawkat Sep 16, 2025
540f9d4
copy docs
yawkat Sep 16, 2025
7d4ecca
tests
yawkat Sep 16, 2025
8867041
license headers
yawkat Sep 16, 2025
66d2efb
javadoc for BackpressureDecompressionHandler
yawkat Sep 16, 2025
5329d06
checkstyle
yawkat Sep 16, 2025
e32821a
use proper cumulation
yawkat Sep 16, 2025
0b7ed5a
CR
yawkat Sep 29, 2025
52f7cec
input validation CR
yawkat Oct 2, 2025
9279fc0
Move to builder, add maxMessagesPerRead
yawkat Oct 2, 2025
82d47eb
Merge branch '4.2' into decompressor-api
yawkat Oct 2, 2025
4c93a92
Merge branch '4.2' into decompressor-api
yawkat Oct 2, 2025
b6d11c1
license header
yawkat Oct 2, 2025
6c8cef4
checkstyle
yawkat Oct 2, 2025
106b12c
fix empty buffer handling
yawkat Oct 2, 2025
109594e
fix leaks
yawkat Oct 2, 2025
166f799
zlib improvements
yawkat Oct 7, 2025
f9552a1
Add backpressure-capable http decompression handler
yawkat Oct 7, 2025
4e35ad3
move alloc param to build method so that it can be supplied in handle…
yawkat Oct 7, 2025
5f336e5
deprecate old decoders
yawkat Oct 7, 2025
a744250
checkstyle
yawkat Oct 7, 2025
4c4706f
reflect config
yawkat Oct 7, 2025
e466385
rename decompressor handler tests
yawkat Oct 8, 2025
5d7f757
add EndOfContentEvent to the decompression handler
yawkat Oct 8, 2025
d3e32fa
Update junit & fix auto closing the resource scope
yawkat Oct 8, 2025
425a0f8
Add tests and fixes to ensure all decompressors handle end-of-input c…
yawkat Oct 8, 2025
4d542be
New HttpDecompressionHandler implementation
yawkat Oct 9, 2025
7b062d4
use builder
yawkat Oct 9, 2025
eb461ad
fixes
yawkat Oct 9, 2025
b0f624a
pipelining test
yawkat Oct 9, 2025
f02a0d1
improve exception handling
yawkat Oct 9, 2025
b529a4e
style
yawkat Oct 9, 2025
3077ce3
fix tests
yawkat Oct 9, 2025
d6fab75
fix reflect config
yawkat Oct 9, 2025
75f6e0e
fix test
yawkat Oct 9, 2025
b5921c9
checkstyle
yawkat Oct 9, 2025
c76d53b
Add bytesPerRead limit
yawkat Oct 10, 2025
d6ad840
Create a new BackpressureGauge API for reuse in HTTP/2 and in the nor…
yawkat Oct 10, 2025
55ec3e4
Move backpressure logic to common base class
yawkat Oct 10, 2025
4e0f4c9
Http2FrameListener implementation
yawkat Oct 10, 2025
3b9c006
fix ref count
yawkat Oct 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,339 @@
/*
* Copyright 2025 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.RecyclableArrayList;

/**
* This is a base class for backpressure-aware decompressing handler. The purpose of this class is to share code
* between the "normal" decompressing handler and the HTTP/1.1 decompressor. This class can handle a single compressed
* stream or multiple consecutive ones (as in HTTP/1.1 connections), and it can handle framing messages.
*/
public abstract class AbstractBackpressureDecompressionHandler extends ChannelDuplexHandler {
/**
* The decompressor for the current message.
*/
private Decompressor decompressor;

private RecyclableArrayList heldBack;

private boolean reading;

private boolean discardRemainingContent;

private boolean anyMessageWrittenSinceReadStart;

private final BackpressureGauge backpressureGauge;

protected AbstractBackpressureDecompressionHandler(Builder builder) {
this.backpressureGauge = builder.backpressureGaugeBuilder.build();
}

private Decompressor.Status decompressorStatus(ChannelHandlerContext ctx) {
assert decompressor != null;
try {
return decompressor.status();
} catch (Exception e) {
handleDecompressorException(ctx, e);
return Decompressor.Status.COMPLETE;
}
}

private void handleDecompressorException(ChannelHandlerContext ctx, Exception e) {
try {
decompressor.close();
} catch (Exception f) {
e.addSuppressed(f);
}
decompressor = null;
discardRemainingContent = true;
ctx.fireExceptionCaught(e);
fireEndOfOutput(ctx);
}

@Override
public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!reading) {
reading = true;
anyMessageWrittenSinceReadStart = false;
}

if (heldBack != null) {
heldBack.add(msg);
return;
}

channelRead0(ctx, msg);
}

/**
* Handle an input message from {@link #channelRead(ChannelHandlerContext, Object)}. This method is only called
* when the decompressor is capable of accepting input ({@link #channelReadBytes} or
* {@link #channelReadEndOfInput}).
*/
protected abstract void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception;

/**
* Feed some input data to the decompressor. Must be called at most once from {@link #channelRead0}, and only from
* there.
*/
protected final void channelReadBytes(ChannelHandlerContext ctx, ByteBuf msg) {
if (decompressor == null) {
if (msg.isReadable() && !discardRemainingContent) {
msg.release();
throw new DecompressionException("Additional input after compressed data");
}
msg.release();
} else {
assert decompressor.status() == Decompressor.Status.NEED_INPUT : "heldBack should be set";
if (msg.isReadable()) {
boolean failed = false;
try {
decompressor.addInput(msg);
} catch (Exception e) {
handleDecompressorException(ctx, e);
failed = true;
}
if (!failed) {
forwardOutput(ctx);
}
} else {
msg.release();
}
}
}

/**
* Signal end of input from {@link #channelRead0}. Must be called at most once, and only from that method. Must not
* be called if {@link #channelReadBytes} has been called–if you need both, please go through {@link #channelRead}
* twice instead, so that the message can be delayed if necessary.
*/
protected final void channelReadEndOfInput(ChannelHandlerContext ctx) {
if (decompressor == null) {
// already done
return;
}
if (decompressorStatus(ctx) == Decompressor.Status.NEED_INPUT) {
decompressor.endOfInput();
forwardOutput(ctx);
} else {
throw new IllegalStateException(
"Please don't call channelReadEndOfInput immediately after channelReadBytes, go through " +
"channelRead again instead.");
}
}

/**
* Called when the end of decompressed output is reached. This can also happen as a result of a decompression
* exception.
*/
protected abstract void fireEndOfOutput(ChannelHandlerContext ctx);

/**
* Start decompressing a message.
*
* @param builder The decompression format
*/
protected final void beginDecompression(
ChannelHandlerContext ctx, Decompressor.AbstractDecompressorBuilder builder) {
decompressor = builder.build(ctx.alloc());
backpressureGauge.relieveBackpressure();
discardRemainingContent = false;
anyMessageWrittenSinceReadStart = true;
}

/**
* Send a framing message downstream, i.e. {@link BackpressureGauge#countNonByteMessage()}. You can also use this
* to send byte messages when processing non-compressed input.
*/
protected final void fireFramingMessage(ChannelHandlerContext ctx, Object msg) {
backpressureGauge.countNonByteMessage();
anyMessageWrittenSinceReadStart = true;
ctx.fireChannelRead(msg);
}

@Override
public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
do {
if (!anyMessageWrittenSinceReadStart) {
// we didn't forward any messages, so we need to ask upstream for more
reading = false;
if (!isAutoRead(ctx)) {
ctx.read();
}
return;
}

// accept that we might not have hit the target.
backpressureGauge.increaseBackpressure();

ctx.fireChannelReadComplete();
reading = false;

} while (fulfillDemandOutsideRead(ctx));
}

/**
* @return {@code true} if {@link #channelReadComplete(ChannelHandlerContext)} should be called next. This is to
* avoid recursion
*/
private boolean fulfillDemandOutsideRead(ChannelHandlerContext ctx) throws Exception {
assert !reading;

if (decompressor == null) {
return false;
}
if (downstreamMessageLimitExceeded(ctx)) {
return false;
}

RecyclableArrayList heldBack = this.heldBack;
if (heldBack == null) {
if (!isAutoRead(ctx)) {
ctx.read();
}
return false;
}

reading = true;
anyMessageWrittenSinceReadStart = false;
forwardOutput(ctx);
if (decompressor == null || decompressorStatus(ctx) != Decompressor.Status.NEED_OUTPUT) {
this.heldBack = null;
if (heldBack.isEmpty() && !anyMessageWrittenSinceReadStart) {
heldBack.recycle();
return false;
} else {
// this sets reading = true
for (Object msg : heldBack) {
channelRead(ctx, msg);
}
heldBack.recycle();
return true; // channelReadComplete(ctx)
}
} else {
if (anyMessageWrittenSinceReadStart) {
return true; // channelReadComplete(ctx)
}
ctx.read();
return false;
}
}

@SuppressWarnings("BooleanMethodIsAlwaysInverted")
private static boolean isAutoRead(ChannelHandlerContext ctx) {
return ctx.channel().config().isAutoRead();
}

private boolean downstreamMessageLimitExceeded(ChannelHandlerContext ctx) {
return backpressureGauge.backpressureLimitExceeded() && !isAutoRead(ctx);
}

@Override
public final void read(ChannelHandlerContext ctx) throws Exception {
if (decompressor == null) {
ctx.read();
return;
}

backpressureGauge.relieveBackpressure();
if (!reading) {
if (fulfillDemandOutsideRead(ctx)) {
channelReadComplete(ctx);
}
}
}

private void forwardOutput(ChannelHandlerContext ctx) {
while (true) {
Decompressor.Status status = decompressorStatus(ctx);
switch (status) {
case NEED_OUTPUT:
if (downstreamMessageLimitExceeded(ctx)) {
if (heldBack == null) {
heldBack = RecyclableArrayList.newInstance();
}
return;
}
ByteBuf output;
try {
output = decompressor.takeOutput();
} catch (Exception e) {
handleDecompressorException(ctx, e);
return;
}
backpressureGauge.countMessage(output.readableBytes());
anyMessageWrittenSinceReadStart = true;
ctx.fireChannelRead(wrapOutputBuffer(output));
break;
case NEED_INPUT:
return;
case COMPLETE:
if (decompressor != null) {
try {
decompressor.close();
} catch (Exception e) {
ctx.fireExceptionCaught(e);
}
}
decompressor = null;
fireEndOfOutput(ctx);
return;
default:
throw new AssertionError("Unknown status: " + status);
}
}
}

/**
* Wrap an output buffer before sending it down the pipeline. This is used for HttpContent.
*/
protected Object wrapOutputBuffer(ByteBuf output) {
return output;
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
if (decompressor != null) {
decompressor.close();
decompressor = null;
}
if (heldBack != null) {
for (Object o : heldBack) {
ReferenceCountUtil.release(o);
}
heldBack.recycle();
heldBack = null;
}
}

public abstract static class Builder {
BackpressureGauge.Builder backpressureGaugeBuilder = BackpressureGauge.builder();

public Builder backpressureGaugeBuilder(BackpressureGauge.Builder backpressureGaugeBuilder) {
this.backpressureGaugeBuilder =
ObjectUtil.checkNotNull(backpressureGaugeBuilder, "backpressureGaugeBuilder");
return this;
}

public abstract ChannelDuplexHandler build();
}
}
Loading
Loading