文章目录
- 一、背景与原理
- 1.1 问题场景
- 网络架构影响分析
- 1.1 客户端与Nginx之间存在的NAT/VPN
- 1.2 Nginx与RPC服务之间的NAT
- 1.2 技术原理
- 二、环境配置验证
- 2.1 Nginx配置
- 2.2 版本要求
- 三、Netty服务端实现
- 3.1 Pipeline配置(核心代码)
- 3.2 协议处理器实现
- 3.3 业务处理器调用示例
- 四、关键实现细节
- 4.1 解码顺序保障
- 4.2 地址类型处理
- 4.3 安全性增强
- 五、异常处理方案
- 六、 HAProxyMessageDecoder
- 七、总结
一、背景与原理
1.1 问题场景
在TCP四层代理场景下,Nginx作为反向代理将客户端请求转发至Netty实现的RPC服务。由于经过代理转发,RPC服务默认只能获取到Nginx服务器的IP地址。需要通过Proxy Protocol协议传递客户端真实IP。
网络架构影响分析
1.1 客户端与Nginx之间存在的NAT/VPN
场景 | 可获取IP类型 | 技术原理 |
---|---|---|
企业级NAT网关 | 仅NAT出口公网IP | NAT设备替换源IP地址 |
家用路由器NAT | 路由器WAN口IP | 私有地址转换为公网IP |
全局VPN接入 | VPN服务器出口IP | 流量封装后隧道传输 |
结论:Proxy Protocol只能传递Nginx直接看到的IP(即NAT/VPN出口地址)
1.2 Nginx与RPC服务之间的NAT
场景 | 影响程度 | 解决方案 |
---|---|---|
普通NAT转发 | 无影响 | 保持现有Proxy Protocol配置 |
复杂SDN网络 | 需验证 | 确保TCP连接透传代理协议头 |
1.2 技术原理
- Proxy Protocol:由HAProxy提出的传输层协议扩展,在建立TCP连接时发送包含源地址信息的头部
- Nginx配置:
proxy_protocol on
指令启用协议支持 - Netty解码:通过
HAProxyMessageDecoder
解析协议头
二、环境配置验证
2.1 Nginx配置
仅用于演示,仅展示核心配置。
# 定义后端服务器组
upstream rpc_backend {
server 10.0.0.1:12345;
server 10.0.0.2:12345;
}
# 配置TCP代理
stream {
server {
listen 12345;
proxy_pass rpc_backend;
proxy_protocol on;
}
}
在Nginx的stream模块中,设置一个TCP代理服务器,监听12345端口,将所有到达该端口的连接通过proxy protocol转发到rpc_backend定的后端服务器组。这样后端服务器可以获取到原始客户端的IP地址等信息,前提是后端服务支持proxy protocol。
proxy_protocol on
- 启用PROXY协议(版本1或2),在转发流量时,将客户端的原始信息(如源IP、端口)附加到数据包头部。
- 作用:使后端服务器能获取客户端真实IP,而非Nginx代理的IP。
- 要求:后端服务必须支持并配置解析PROXY协议。
2.2 版本要求
- Nginx ≥ 1.9.0 我这里是1.27.1
- Netty ≥ 4.1.x 我这里是 4.1.109
三、Netty服务端实现
3.1 Pipeline配置(核心代码)
public class RpcServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 添加PROXY协议解码器
pipeline.addLast(new HAProxyMessageDecoder());
// 自定义协议处理器
pipeline.addLast(new ProxyProtocolHandler());
pipeline.addLast(new RpcMessageDecoder());
pipeline.addLast(new RpcMessageHandler());
}
}
3.2 协议处理器实现
提取IP
public class ProxyProtocolHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HAProxyMessage) {
HAProxyMessage proxyMessage = (HAProxyMessage) msg;
// 提取真实客户端地址
String clientIp = proxyMessage.sourceAddress();
int clientPort = proxyMessage.sourcePort();
// 存储到Channel属性中
ctx.channel().attr(CLIENT_IP_ATTRIBUTE).set(clientIp);
// 释放资源并移除当前消息
ReferenceCountUtil.release(msg);
return;
}
// 非PROXY协议消息继续传递
ctx.fireChannelRead(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("Proxy protocol parsing failed", cause);
ctx.close();
}
}
3.3 业务处理器调用示例
使用IP
public class RpcMessageHandler extends SimpleChannelInboundHandler<RpcRequest> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) {
String clientIp = ctx.channel().attr(CLIENT_IP_ATTRIBUTE).get();
log.info("Received request from {}: {}", clientIp, request);
// 业务处理逻辑...
}
}
四、关键实现细节
4.1 解码顺序保障
- HAProxyMessageDecoder必须作为第一个入站处理器
- 需要处理完PROXY协议头后立即移除解码器(自动完成)
我抓了个包,如下
右键 Proxyv1 追踪流
4.2 地址类型处理
// 支持IPv4/IPv6地址类型判断
if (proxyMessage.proxyProtocol().addressType() == HAProxyAddressType.IPV4) {
// IPv4处理逻辑
} else if (proxyMessage.proxyProtocol().addressType() == HAProxyAddressType.IPV6) {
// IPv6处理逻辑
}
4.3 安全性增强
// 限制允许的代理服务器IP(可选)
List<String> allowedProxies = Arrays.asList("10.0.0.0/8", "192.168.0.0/16");
if (!isAllowedProxy(proxyMessage.destinationAddress())) {
ctx.close();
return;
}
五、异常处理方案
异常场景 | 处理方案 |
---|---|
无效PROXY头 | 记录日志并关闭连接 |
协议版本不匹配 | 返回错误响应码 |
地址格式错误 | 使用默认地址并告警 |
六、 HAProxyMessageDecoder
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.haproxy;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.ProtocolDetectionResult;
import io.netty.util.CharsetUtil;
import java.util.List;
import static io.netty.handler.codec.haproxy.HAProxyConstants.*;
/**
* Decodes an HAProxy proxy protocol header
*
* @see <a href="https://haproxy.1wt.eu/download/1.5/doc/proxy-protocol.txt">Proxy Protocol Specification</a>
*/
public class HAProxyMessageDecoder extends ByteToMessageDecoder {
/**
* Maximum possible length of a v1 proxy protocol header per spec
*/
private static final int V1_MAX_LENGTH = 108;
/**
* Maximum possible length of a v2 proxy protocol header (fixed 16 bytes + max unsigned short)
*/
private static final int V2_MAX_LENGTH = 16 + 65535;
/**
* Minimum possible length of a fully functioning v2 proxy protocol header (fixed 16 bytes + v2 address info space)
*/
private static final int V2_MIN_LENGTH = 16 + 216;
/**
* Maximum possible length for v2 additional TLV data (max unsigned short - max v2 address info space)
*/
private static final int V2_MAX_TLV = 65535 - 216;
/**
* Binary header prefix length
*/
private static final int BINARY_PREFIX_LENGTH = BINARY_PREFIX.length;
/**
* {@link ProtocolDetectionResult} for {@link HAProxyProtocolVersion#V1}.
*/
private static final ProtocolDetectionResult<HAProxyProtocolVersion> DETECTION_RESULT_V1 =
ProtocolDetectionResult.detected(HAProxyProtocolVersion.V1);
/**
* {@link ProtocolDetectionResult} for {@link HAProxyProtocolVersion#V2}.
*/
private static final ProtocolDetectionResult<HAProxyProtocolVersion> DETECTION_RESULT_V2 =
ProtocolDetectionResult.detected(HAProxyProtocolVersion.V2);
/**
* Used to extract a header frame out of the {@link ByteBuf} and return it.
*/
private HeaderExtractor headerExtractor;
/**
* {@code true} if we're discarding input because we're already over maxLength
*/
private boolean discarding;
/**
* Number of discarded bytes
*/
private int discardedBytes;
/**
* Whether or not to throw an exception as soon as we exceed maxLength.
*/
private final boolean failFast;
/**
* {@code true} if we're finished decoding the proxy protocol header
*/
private boolean finished;
/**
* Protocol specification version
*/
private int version = -1;
/**
* The latest v2 spec (2014/05/18) allows for additional data to be sent in the proxy protocol header beyond the
* address information block so now we need a configurable max header size
*/
private final int v2MaxHeaderSize;
/**
* Creates a new decoder with no additional data (TLV) restrictions, and should throw an exception as soon as
* we exceed maxLength.
*/
public HAProxyMessageDecoder() {
this(true);
}
/**
* Creates a new decoder with no additional data (TLV) restrictions, whether or not to throw an exception as soon
* as we exceed maxLength.
*
* @param failFast Whether or not to throw an exception as soon as we exceed maxLength
*/
public HAProxyMessageDecoder(boolean failFast) {
v2MaxHeaderSize = V2_MAX_LENGTH;
this.failFast = failFast;
}
/**
* Creates a new decoder with restricted additional data (TLV) size, and should throw an exception as soon as
* we exceed maxLength.
* <p>
* <b>Note:</b> limiting TLV size only affects processing of v2, binary headers. Also, as allowed by the 1.5 spec
* TLV data is currently ignored. For maximum performance it would be best to configure your upstream proxy host to
* <b>NOT</b> send TLV data and instantiate with a max TLV size of {@code 0}.
* </p>
*
* @param maxTlvSize maximum number of bytes allowed for additional data (Type-Length-Value vectors) in a v2 header
*/
public HAProxyMessageDecoder(int maxTlvSize) {
this(maxTlvSize, true);
}
/**
* Creates a new decoder with restricted additional data (TLV) size, whether or not to throw an exception as soon
* as we exceed maxLength.
*
* @param maxTlvSize maximum number of bytes allowed for additional data (Type-Length-Value vectors) in a v2 header
* @param failFast Whether or not to throw an exception as soon as we exceed maxLength
*/
public HAProxyMessageDecoder(int maxTlvSize, boolean failFast) {
if (maxTlvSize < 1) {
v2MaxHeaderSize = V2_MIN_LENGTH;
} else if (maxTlvSize > V2_MAX_TLV) {
v2MaxHeaderSize = V2_MAX_LENGTH;
} else {
int calcMax = maxTlvSize + V2_MIN_LENGTH;
if (calcMax > V2_MAX_LENGTH) { // lgtm[java/constant-comparison]
v2MaxHeaderSize = V2_MAX_LENGTH;
} else {
v2MaxHeaderSize = calcMax;
}
}
this.failFast = failFast;
}
/**
* Returns the proxy protocol specification version in the buffer if the version is found.
* Returns -1 if no version was found in the buffer.
*/
private static int findVersion(final ByteBuf buffer) {
final int n = buffer.readableBytes();
// per spec, the version number is found in the 13th byte
if (n < 13) {
return -1;
}
int idx = buffer.readerIndex();
// 主要修改这里
if (match(TEXT_PREFIX, buffer, idx)) {
return 1;
}
if (match(BINARY_PREFIX, buffer, idx)) {
return buffer.getByte(idx + BINARY_PREFIX_LENGTH);
}
return -1;
// return match(BINARY_PREFIX, buffer, idx) ? buffer.getByte(idx + BINARY_PREFIX_LENGTH) : 1;
}
/**
* Returns the index in the buffer of the end of header if found.
* Returns -1 if no end of header was found in the buffer.
*/
private static int findEndOfHeader(final ByteBuf buffer) {
final int n = buffer.readableBytes();
// per spec, the 15th and 16th bytes contain the address length in bytes
if (n < 16) {
return -1;
}
int offset = buffer.readerIndex() + 14;
// the total header length will be a fixed 16 byte sequence + the dynamic address information block
int totalHeaderBytes = 16 + buffer.getUnsignedShort(offset);
// ensure we actually have the full header available
if (n >= totalHeaderBytes) {
return totalHeaderBytes;
} else {
return -1;
}
}
/**
* Returns the index in the buffer of the end of line found.
* Returns -1 if no end of line was found in the buffer.
*/
private static int findEndOfLine(final ByteBuf buffer) {
final int n = buffer.writerIndex();
for (int i = buffer.readerIndex(); i < n; i++) {
final byte b = buffer.getByte(i);
if (b == '\r' && i < n - 1 && buffer.getByte(i + 1) == '\n') {
return i; // \r\n
}
}
return -1; // Not found.
}
@Override
public boolean isSingleDecode() {
// ByteToMessageDecoder uses this method to optionally break out of the decoding loop after each unit of work.
// Since we only ever want to decode a single header we always return true to save a bit of work here.
return true;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
if (finished) {
ctx.pipeline().remove(this);
}
}
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// determine the specification version
if (version == -1) {
if ((version = findVersion(in)) == -1) {
// 头部
finished = true;
return;
}
}
ByteBuf decoded;
if (version == 1) {
decoded = decodeLine(ctx, in);
} else {
decoded = decodeStruct(ctx, in);
}
if (decoded != null) {
finished = true;
try {
if (version == 1) {
out.add(HAProxyMessage.decodeHeader(decoded.toString(CharsetUtil.US_ASCII)));
} else {
out.add(HAProxyMessage.decodeHeader(decoded));
}
} catch (HAProxyProtocolException e) {
fail(ctx, null, e);
}
}
}
/**
* Create a frame out of the {@link ByteBuf} and return it.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link HAProxyMessageDecoder} belongs to
* @param buffer the {@link ByteBuf} from which to read data
* @return frame the {@link ByteBuf} which represent the frame or {@code null} if no frame could
* be created
*/
private ByteBuf decodeStruct(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
if (headerExtractor == null) {
headerExtractor = new StructHeaderExtractor(v2MaxHeaderSize);
}
return headerExtractor.extract(ctx, buffer);
}
/**
* Create a frame out of the {@link ByteBuf} and return it.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link HAProxyMessageDecoder} belongs to
* @param buffer the {@link ByteBuf} from which to read data
* @return frame the {@link ByteBuf} which represent the frame or {@code null} if no frame could
* be created
*/
private ByteBuf decodeLine(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
if (headerExtractor == null) {
headerExtractor = new LineHeaderExtractor(V1_MAX_LENGTH);
}
return headerExtractor.extract(ctx, buffer);
}
private void failOverLimit(final ChannelHandlerContext ctx, int length) {
failOverLimit(ctx, String.valueOf(length));
}
private void failOverLimit(final ChannelHandlerContext ctx, String length) {
int maxLength = version == 1 ? V1_MAX_LENGTH : v2MaxHeaderSize;
fail(ctx, "header length (" + length + ") exceeds the allowed maximum (" + maxLength + ')', null);
}
private void fail(final ChannelHandlerContext ctx, String errMsg, Exception e) {
finished = true;
ctx.close(); // drop connection immediately per spec
HAProxyProtocolException ppex;
if (errMsg != null && e != null) {
ppex = new HAProxyProtocolException(errMsg, e);
} else if (errMsg != null) {
ppex = new HAProxyProtocolException(errMsg);
} else if (e != null) {
ppex = new HAProxyProtocolException(e);
} else {
ppex = new HAProxyProtocolException();
}
throw ppex;
}
/**
* Returns the {@link ProtocolDetectionResult} for the given {@link ByteBuf}.
*/
public static ProtocolDetectionResult<HAProxyProtocolVersion> detectProtocol(ByteBuf buffer) {
if (buffer.readableBytes() < 12) {
return ProtocolDetectionResult.needsMoreData();
}
int idx = buffer.readerIndex();
if (match(BINARY_PREFIX, buffer, idx)) {
return DETECTION_RESULT_V2;
}
if (match(TEXT_PREFIX, buffer, idx)) {
return DETECTION_RESULT_V1;
}
return ProtocolDetectionResult.invalid();
}
private static boolean match(byte[] prefix, ByteBuf buffer, int idx) {
for (int i = 0; i < prefix.length; i++) {
final byte b = buffer.getByte(idx + i);
if (b != prefix[i]) {
return false;
}
}
return true;
}
/**
* HeaderExtractor create a header frame out of the {@link ByteBuf}.
*/
private abstract class HeaderExtractor {
/** Header max size */
private final int maxHeaderSize;
protected HeaderExtractor(int maxHeaderSize) {
this.maxHeaderSize = maxHeaderSize;
}
/**
* Create a frame out of the {@link ByteBuf} and return it.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link HAProxyMessageDecoder} belongs to
* @param buffer the {@link ByteBuf} from which to read data
* @return frame the {@link ByteBuf} which represent the frame or {@code null} if no frame could
* be created
* @throws Exception if exceed maxLength
*/
public ByteBuf extract(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
final int eoh = findEndOfHeader(buffer);
if (!discarding) {
if (eoh >= 0) {
final int length = eoh - buffer.readerIndex();
if (length > maxHeaderSize) {
buffer.readerIndex(eoh + delimiterLength(buffer, eoh));
failOverLimit(ctx, length);
return null;
}
ByteBuf frame = buffer.readSlice(length);
buffer.skipBytes(delimiterLength(buffer, eoh));
return frame;
} else {
final int length = buffer.readableBytes();
if (length > maxHeaderSize) {
discardedBytes = length;
buffer.skipBytes(length);
discarding = true;
if (failFast) {
failOverLimit(ctx, "over " + discardedBytes);
}
}
return null;
}
} else {
if (eoh >= 0) {
final int length = discardedBytes + eoh - buffer.readerIndex();
buffer.readerIndex(eoh + delimiterLength(buffer, eoh));
discardedBytes = 0;
discarding = false;
if (!failFast) {
failOverLimit(ctx, "over " + length);
}
} else {
discardedBytes += buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
}
return null;
}
}
/**
* Find the end of the header from the given {@link ByteBuf},the end may be a CRLF, or the length given by the
* header.
*
* @param buffer the buffer to be searched
* @return {@code -1} if can not find the end, otherwise return the buffer index of end
*/
protected abstract int findEndOfHeader(ByteBuf buffer);
/**
* Get the length of the header delimiter.
*
* @param buffer the buffer where delimiter is located
* @param eoh index of delimiter
* @return length of the delimiter
*/
protected abstract int delimiterLength(ByteBuf buffer, int eoh);
}
private final class LineHeaderExtractor extends HeaderExtractor {
LineHeaderExtractor(int maxHeaderSize) {
super(maxHeaderSize);
}
@Override
protected int findEndOfHeader(ByteBuf buffer) {
return findEndOfLine(buffer);
}
@Override
protected int delimiterLength(ByteBuf buffer, int eoh) {
return buffer.getByte(eoh) == '\r' ? 2 : 1;
}
}
private final class StructHeaderExtractor extends HeaderExtractor {
StructHeaderExtractor(int maxHeaderSize) {
super(maxHeaderSize);
}
@Override
protected int findEndOfHeader(ByteBuf buffer) {
return HAProxyMessageDecoder.findEndOfHeader(buffer);
}
@Override
protected int delimiterLength(ByteBuf buffer, int eoh) {
return 0;
}
}
}
七、总结
在存在NAT或VPN的网络架构中,通过Proxy Protocol获取客户端真实IP的能力受限于网络设备的位置。
若NAT/VPN位于客户端与Nginx之间(如企业VPN或家庭路由),Proxy Protocol仅能传递经过NAT转换或VPN隧道出口的IP(如公网IP或VPN分配地址),无法穿透获取终端设备的内网真实IP。若需突破此限制,可采取混合方案:客户端主动上报IP(需改造客户端代码)并配合网络设备改造(如VPN网关记录原始IP、专用隧道协议)。但需注意隐私合规风险,避免采集敏感信息。
对于常规业务场景,Proxy Protocol结合Nginx配置已能满足“获取客户端侧网络出口IP”的需求,但若涉及终端设备溯源,需结合应用层协议与网络基础设施深度协同实现。