详解rocketMq通信模块升级构想

news2025/1/10 3:26:23

5cad4538f0e07615feaed71345ce6bb4.gif

本文从开发者的角度深入解析了基于netty的通信模块, 并通过简易扩展实现微服务化通信工具雏形, 适合于想要了解netty通信框架的使用案例, 想了解中间件通信模块设计, 以及微服务通信底层架构的同学。希望此文能给大家带来通信模块架构灵感。

827714be39954fc3824ba65ceb23a816.png

概述

网络通信是很常见的需求,

对于传统web网页工具短连接场景,浏览器和服务器交互,常见为浏览器通过http协议请求Tomcat服务器;

对于长连接场景, 比如即时通讯,或中间件等实时性要求高的场景,一般采用tcp协议的长连接进行全双工实时通信;

对于java开发者来说,使用原生socket进行tcp开发,效率是比较低的,稳定性可靠性等也不好保障,一般选择网络通信框架netty加快开发效率。

对于上层应用来说,netty的标准使用方式依然比较繁琐,未能很好的适配一些业务使用场景,比如rocketMq根据netty包装了一层业务框架:通信模块remoting。

该模块可用性高,稳定性好,易扩展,经过了中间件产品长期高并发的质量验证, 值得信任,并广泛用于其他点对点(指定ip)通信场景,如dleger(raft的java实现)。

有相关通信需求的同学也都可以参考该通信模块,相信有很多的灵感,或直接使用该通信模块,带来开发效率的提升。

本文从一个普通java开发者的视角,去解析该通信模块

  1. 如何用 - 常见使用方式

  2. 实现原理 - 数据流转链路

  3. 设计关键点 - 为什么要如此设计

  4. 模块升级 - 实现简易的微服务化通信工具

本文代码版本:

<parent>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-remoting</artifactId>
  <version>5.0.1-PREVIEW-SNAPSHOT</version>
</parent>

131d403609944f9bd30e014dfce995d7.png

如何用

编写简单易懂的测试demo,实现server client的交互流程。

简单示例 协议code 为写死 0 1 5 9,输入测试信息,输出使用sysout。

  启动server 注册服务监听 

import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Server {
    public static void main(String[] args) throws Exception {
        NettyServerConfig nettyServerConfig = new NettyServerConfig();
        // 配置端口
        nettyServerConfig.setListenPort(8888);
        // 配置线程数 netty workGroup 线程池 处理io等低耗时
        nettyServerConfig.setServerSelectorThreads(2);
        // 配置线程数 netty eventGroup 线程池 处理自定义hander/长耗时等
        nettyServerConfig.setServerWorkerThreads(8);
        NettyRemotingServer remotingServer = new NettyRemotingServer(nettyServerConfig, null);


        // 支持共用或独立的业务处理线程池
        ExecutorService poolA = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));
        ExecutorService poolB = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));


        // 业务处理器
        NettyRequestProcessor processA = new NettyRequestProcessor() {
            @Override
            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
                System.out.println("received from client, remark:" + request.getRemark() + ", coe:" + request.getCode());
                RemotingCommand response = RemotingCommand.createResponseCommand(0, "server");
                switch (request.getCode()) {
                    case 0:
                        response.setBody(new String("hello sync 0").getBytes());
                    case 1:
                        response.setBody(new String("hello sync 1").getBytes());
                    default:
                        break;
                }
                return response;
            }
            @Override
            public boolean rejectRequest() {
                return false;
            }
        };
        // 业务处理器
        NettyRequestProcessor processB = new NettyRequestProcessor(){
            @Override
            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
                System.out.println("received from client, remark:" + request.getRemark() + ", coe:" + request.getCode());
                RemotingCommand response = RemotingCommand.createResponseCommand(0, "server");
                switch (request.getCode()) {
                    case 9:
                        response.setBody(new String("hello sync 9").getBytes());
                    default:
                        break;
                }
                return response;
            }
            @Override
            public boolean rejectRequest() {
                return false;
            }
        };
        // 注册 协议 - 对应的处理器, 类似web url 路由到对应的class
        remotingServer.registerProcessor(0, processA, poolA);
        remotingServer.registerProcessor(1, processA, poolA);
        remotingServer.registerProcessor(9, processB, poolB);


        remotingServer.start();


        System.out.println("start ok " + JSON.toJSONString(nettyServerConfig));
        System.in.read();
    }
}

  启动client 发起调用

import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Client {
    public static void main(String[] args) throws Exception {
        NettyClientConfig nettyServerConfig = new NettyClientConfig();
        // 配置线程数 netty eventGroup 线程池 处理自定义hander/耗时长等
        nettyServerConfig.setClientWorkerThreads(8);
        NettyRemotingClient remotingClient = new NettyRemotingClient(nettyServerConfig, null);


        // 支持共用或独立的业务处理线程池
        ExecutorService poolA = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));


        // 监听服务端发过来的请求
        remotingClient.registerProcessor(5, new NettyRequestProcessor() {
            @Override
            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
                System.out.println("receive from server : " + request.getCode());
                return null;
            }
            @Override
            public boolean rejectRequest() {
                return false;
            }
        }, poolA);


        remotingClient.start();


        // 主动发起远程调用 
        {
            // 同步调用
            RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
            request.setRemark("sync");
            RemotingCommand response = remotingClient.invokeSync("127.0.0.1:8888", request, 30 * 1000L);
            System.out.println("call sync ok remark:" + response.getRemark() + " body:" + new String(response.getBody()));
        }
        {
            // 异步调用
            RemotingCommand request = RemotingCommand.createRequestCommand(1, null);
            request.setRemark("async");
            remotingClient.invokeAsync("127.0.0.1:8888", request, 30 * 1000L, new InvokeCallback() {
                @Override
                public void operationComplete(ResponseFuture responseFuture) {
                    RemotingCommand response = responseFuture.getResponseCommand();
                    System.out.println("call async ok remark:" + response.getRemark() + " body:" + new String(response.getBody()));
                }
            });
        }
        {
            // 单向调用
            RemotingCommand request = RemotingCommand.createRequestCommand(9, null);
            request.setRemark("oneway");
            remotingClient.invokeOneway("127.0.0.1:8888", request, 30 * 1000L);
            System.out.println("call oneway ok ");
        }
        System.in.read();
    }
}

该点对点调用,是需要手动指定目标服务器的ip和端口的,不同于hsf拥有注册中心进行协调撮合提供目标ip。

  日志输出

Connected to the target VM, address: '127.0.0.1:57381', transport: 'socket'
start ok {"listenPort":8888,"serverAsyncSemaphoreValue":64,"serverCallbackExecutorThreads":0,"serverChannelMaxIdleTimeSeconds":120,"serverOnewaySemaphoreValue":256,"serverPooledByteBufAllocatorEnable":true,"serverSelectorThreads":2,"serverSocketRcvBufSize":65535,"serverSocketSndBufSize":65535,"serverWorkerThreads":8,"useEpollNativeSelector":false}
received from client, remark:sync, coe:0
received from client, remark:async, coe:1
received from client, remark:oneway, coe:9
Connected to the target VM, address: '127.0.0.1:57385', transport: 'socket'
call sync ok remark:server body:hello sync 1
call oneway ok 
call async ok remark:server body:hello sync 1

331900d55eb4537a79b246aef1973db8.png

实现原理

关于netty如何封装java基础nio socket不做展开。

这里分析通信模块是如何封装netty,扩展调用协议规范的部分,重点描述其中关键的设计要点。

▐  server 启动 监听请求

作为服务端,需绑定端口,监听请求,这里采用标准netty服务端模式。

remotingServer.start();

@Override
    public void start() {
        ...
        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
    encoder,
    new NettyDecoder(),
    new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
    connectionManageHandler,
    serverHandler
);
                    }
                });
        ...
        ChannelFuture sync = this.serverBootstrap.bind().sync();
        InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
        ...
    }

关注涉及几个线程池的地方:

  1. bossGroup -> eventLoopGroupBoss 固定线程数1

  2. workerGroup -> eventLoopGroupSelector 若linux采用epoll实现 否则使用nio实现, 线程数可配置

  3. eventGroup -> defaultEventExecutorGroup 普通实现的 handler 工作线程池, 线程数可配置

另外就是传统艺能:心跳, 解码器 NettyEncoder,编码器 NettyDecoder,连接管理器 connectionManageHandler,和最终的业务处理器 serverHandler

▐  server 注册业务处理器

业务线程池配置

请求协议code关联业务处理器

// 支持共用或独立的业务处理线程池
        ExecutorService poolA = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));
        ExecutorService poolB = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));


        // 业务处理器
        NettyRequestProcessor processA = new NettyRequestProcessor() {
            @Override
            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
                System.out.println("received from client, remark:" + request.getRemark() + ", coe:" + request.getCode());
                RemotingCommand response = RemotingCommand.createResponseCommand(0, "server");
                switch (request.getCode()) {
                    case 0:
                        response.setBody(new String("hello sync 0").getBytes());
                    case 1:
                        response.setBody(new String("hello sync 1").getBytes());
                    default:
                        break;
                }
                return response;
            }
            @Override
            public boolean rejectRequest() {
                return false;
            }
        };
        // 业务处理器
        NettyRequestProcessor processB = new NettyRequestProcessor(){
            @Override
            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
                System.out.println("received from client, remark:" + request.getRemark() + ", coe:" + request.getCode());
                RemotingCommand response = RemotingCommand.createResponseCommand(0, "server");
                switch (request.getCode()) {
                    case 9:
                        response.setBody(new String("hello sync 9").getBytes());
                    default:
                        break;
                }
                return response;
            }
            @Override
            public boolean rejectRequest() {
                return false;
            }
        };
        // 注册 协议 - 对应的处理器, 类似web url 路由到对应的class
        remotingServer.registerProcessor(0, processA, poolA);
        remotingServer.registerProcessor(1, processA, poolA);
        remotingServer.registerProcessor(9, processB, poolB);

不同业务独立线程池的必要性

在复杂业务场景中,比如商品管理链路,订单交易链路,将所有的请求堆积在一个线程池中,快请求和慢请求公用一个赛道,无法避免资源分配不均问题

通信模块设计为可手动配置每个业务的处理线程池

注册路由和线程池关系

@Override
    public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
        ExecutorService executorThis = executor;
        if (null == executor) {
            executorThis = this.publicExecutor;
        }


        Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
        this.processorTable.put(requestCode, pair);
    }

建立 code - processor - pool 的三者映射关系,在后续收到请求后,可查找注册关系进行路由唤起processor

▐  client 启动 发起请求

NettyRemotingClient remotingClient = new NettyRemotingClient(nettyServerConfig, null);


        remotingClient.start();


// 主动发起远程调用
        {
            // 同步调用
            RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
            request.setRemark("sync");
            RemotingCommand response = remotingClient.invokeSync("127.0.0.1:8888", request, 30 * 1000L);
            System.out.println("call sync ok remark:" + response.getRemark() + " body:" + new String(response.getBody()));
        }
        {
            // 异步调用
            RemotingCommand request = RemotingCommand.createRequestCommand(1, null);
            request.setRemark("async");
            remotingClient.invokeAsync("127.0.0.1:8888", request, 30 * 1000L, new InvokeCallback() {
                @Override
                public void operationComplete(ResponseFuture responseFuture) {
                    RemotingCommand response = responseFuture.getResponseCommand();
                    System.out.println("call async ok remark:" + response.getRemark() + " body:" + new String(response.getBody()));
                }
            });
        }
        {
            // 单向调用
            RemotingCommand request = RemotingCommand.createRequestCommand(9, null);
            request.setRemark("oneway");
            remotingClient.invokeOneway("127.0.0.1:8888", request, 30 * 1000L);
            System.out.println("call oneway ok ");
        }

启动客户端client后,即处于长连接状态,双向通信及时性有保障

三种调用模式

作为通信组件,需要适配多种调用场景,同步异步调用已是基本操作,oneway用于不关心是否返回的场景。

试想一下,在全双工双向异步通信的背景下,如何能像http一样实现同步调用,发出一个请求,收到一个请求后怎么跟前面发出的请求关联起来,又如何实现异步等待转为同步响应。

e895638f5c47d00c77ca61000a56a531.png

  • 同步调用

发起请求

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        // 唯一id
        final int opaque = request.getOpaque(); 
    ...
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
        // 把当前请求记录到待响应table中
        this.responseTable.put(opaque, responseFuture);
        final SocketAddress addr = channel.remoteAddress();
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                if (f.isSuccess()) {
                    //标记为写入成功
                    responseFuture.setSendRequestOK(true);
                    return;
                } else {
                    responseFuture.setSendRequestOK(false);
                }
                // 写入异常结果 并唤起wait的线程
                responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                responseFuture.putResponse(null);
                public void putResponse(final RemotingCommand responseCommand) {
                    this.responseCommand = responseCommand;
                    this.countDownLatch.countDown();
                }
                log.warn("send a request command to channel <" + addr + "> failed.");
            }
        });
        // 同步等待结果
        RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
        public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
            this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
            return this.responseCommand;
        }
      ...
    }

关键设计点:每一个请求request,都分配了一个 client唯一自增的id (request.getOpaque(); requestId.getAndIncrement())。

把id和上下文存储到请求待响应table中:发送请求后(写入channel),线程等待结果响应 responseFuture.waitResponse,利用countDownLatch等待结果。

  • 异步调用

发起请求

public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback)
        // 唯一id
    final int opaque = request.getOpaque();
    ... 
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
        // 把当前请求记录到待响应table中
        this.responseTable.put(opaque, responseFuture);
        ...
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                if (f.isSuccess()) {
                    //标记为写入成功
                    responseFuture.setSendRequestOK(true);
                    return;
                }
                requestFail(opaque);
                log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
            }
        }); 
        ...
    }

关键设计点:每一个请求request,都分配了一个 client唯一自增的id (request.getOpaque(); requestId.getAndIncrement())。

把id和上下文存储到请求待响应table中:发送请求后,将callback传递给responseFuture,等待callback被调用。

  • 单向调用oneway

发起请求

public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        request.markOnewayRPC();
        ...
        boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                once.release();
                if (!f.isSuccess()) {
                    log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
                }
            }
        });
        ...
    }

无需监听结果

关键设计点:使用信号量Semaphore控制并发数

是通道瞬间并发度,不同于流控qps

oneway模式:不同于同步调用 异步调用 这里不关心返回值 所以无需记录id到待响应table

▐  server受理请求 路由

监听请求

class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }
    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                // 来自client的请求
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                // 来自client的响应
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }
     public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        // 路由关系 线程池配置 查询 
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        final int opaque = cmd.getOpaque();
    ...
        Runnable run = new Runnable() {
            @Override
            public void run() {
        ...
                final RemotingResponseCallback callback = new RemotingResponseCallback() {
                    @Override
                    public void callback(RemotingCommand response) {
                        ...  
                        // 非oneway模式 才需要回写response
                        if (!cmd.isOnewayRPC()) {
                            ...
                            ctx.writeAndFlush(response); 
                            ...
                        }
                    }
                };
                ...
                // 使用指定的业务处理器processor处理业务
                NettyRequestProcessor processor = pair.getObject1();
                RemotingCommand response = processor.processRequest(ctx, cmd);
                callback.callback(response); 
                ...
            }
        };
        ...
        // 包装为线程任务 放到配置的线程池中执行
        final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
        pair.getObject2().submit(requestTask);
        ...
    }

关键设计点

抽象复用:

client 和 server的 网络通信读模块是高度一致的,所以抽象出来共有的部分,复用代码,继承结构:

719db27e1821c3f18a36d4fe27af4ba0.png

是一个很标准的抽象复用案例, 但需注意在两个角色(client server)中同一份代码是有不一样的解读链路

路由实现:

利用code - processor - pool 的三者映射关系方便的拿到对应业务的处理器及其独立的线程池,进行任务投递

设计理念类似观察者模式,添加观察者-业务处理器(这里仅单个观察者),当事件来了(socket消息读取)后,通知到所有观察者进行具体业务处理。

▐  client 监听响应

  • 监听 同步调用结果

class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {        @Override        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }
    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                // 来自server的请求
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                // 来自server的响应
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }
     public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        final int opaque = cmd.getOpaque();
         // 从待响应table中找到响应对应的请求
        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) {
            responseFuture.setResponseCommand(cmd);


            responseTable.remove(opaque);
            if (responseFuture.getInvokeCallback() != null) {
                // 异步调用 回调callback
                executeInvokeCallback(responseFuture);
            } else {
                // 同步调用
                // 写入正常结果 并唤起wait的线程
                responseFuture.putResponse(cmd);
                public void putResponse(final RemotingCommand responseCommand) {
                    this.responseCommand = responseCommand;
                    this.countDownLatch.countDown();
                }
                responseFuture.release();
            }
        } else {
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }

关键设计点

异步协调 && 同步等待 && 唤起机制

读取到来自server响应数据的线程 -> 通过待响应table查找当前响应归属的请求 -> 操作其countDownLatch定向唤起等待结果的请求线程

同步结果唤起条件:写入异常 || 等待超时 || 读取到来自server的对应id的响应

// 同步等待结果

RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);

  • 监听 异步调用结果

class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }
    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                // 来自server的请求
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                // 来自server的响应
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }
     public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        final int opaque = cmd.getOpaque();
         // 从待响应table中找到响应对应的请求
        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) {
            responseFuture.setResponseCommand(cmd);


            responseTable.remove(opaque);
            if (responseFuture.getInvokeCallback() != null) {
                // 异步调用
                executeInvokeCallback(responseFuture);
            } else {
                // 同步调用
                // 写入结果 并唤起wait的线程
                responseFuture.putResponse(cmd);
                   public void putResponse(final RemotingCommand responseCommand) {
                this.responseCommand = responseCommand;
                this.countDownLatch.countDown();
            }
                responseFuture.release();
            }
        } else {
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }


  private void executeInvokeCallback(final ResponseFuture responseFuture) {
        ExecutorService executor = this.getCallbackExecutor();
    ...
        executor.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    responseFuture.executeInvokeCallback();
                } catch (Throwable e) {
                    log.warn("execute callback in executor exception, and callback throw", e);
                } finally {
                    responseFuture.release();
                }
            }
        });
        ...  
    }

关键设计点

  • 异步协调 && callback机制

读取到来自server响应数据的线程 -> 通过待响应table查找当前响应归属的请求 -> 回调callback

异步结果回调callback条件:写入异常 || 等待超时 || 读取到来自server的对应id的响应

另外callback执行采用了cas机制限制仅执行一次

4878d100a9d307ab75ef3ef48e20e559.png

模块升级-微服务化通信工具

why?

从业务视角开发来看,通信模块依然是比较基础的,对于普通开发者,希望能够像hsf一样,简单的定制协议service,契合java接口实现多态机制,不希望每次都去根据code或其他url之类的手动去分发路由,显得过于原始。

how?

参考hsf系列的远程调用方式,使用动态代理规范化协议传输,使用泛化反射机制便捷调用。

封装程度跟灵活程度往往是成反比的,注意不要过度设计,尽可能保留原始通信模块的灵活。

▐  使用方式

  • 定义接口 和 实现

public interface ServiceHello {
    String sayHello(String a, String b);
    Integer sayHelloInteger(Integer a, Integer b);
}
import com.uext.remote.rf.service.ServiceHello;
public class ServiceHelloImpl implements ServiceHello {
    @Override
    public String sayHello(String a, String b) {
        return "hello " + a + " " + b;
    }
    @Override
    public Integer sayHelloInteger(Integer a, Integer b) {
        return 1000 + a + b;
    }
}

同hsf,接口interface可打包后提供给消费者,实现类隐藏于提供者代码中

  • 启动provider 注册服务监听

import com.alibaba.fastjson.JSON;
import com.uext.remote.rf.provider.ServiceHelloImpl;
import com.uext.remote.rf.provider.ServiceWorldImpl;
import com.uext.remote.rf.service.ServiceHello;
import com.uext.remote.rf.service.ServiceWorld;


public class TestServer {
    public static void main(String[] args) throws Exception {
        ApiProviderBean apiProviderBean = new ApiProviderBean();
        apiProviderBean.setPort(8888);
        apiProviderBean.init();
        apiProviderBean.register(ServiceHello.class, new ServiceHelloImpl());
        apiProviderBean.register(ServiceWorld.class, new ServiceWorldImpl());


        System.out.println("start ok " + JSON.toJSONString(apiProviderBean));
        System.in.read();
    }
}

启动服务端,注册一些需要暴露的服务,通过接口和接口的实现类的实例进行绑定

  • 启动consumer 发起调用

import com.uext.remote.rf.service.ServiceHello;
import com.uext.remote.rf.service.ServiceWorld;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
public class TestClient {
    public static void main(String[] args) throws Exception {
        // 初始化一个连接客户端
        NettyClientConfig nettyServerConfig = new NettyClientConfig();
        NettyRemotingClient remotingClient = new NettyRemotingClient(nettyServerConfig, null);
        remotingClient.start();
        ApiConsumerBean apiConsumerBean = new ApiConsumerBean();
        apiConsumerBean.setRemotingClient(remotingClient);
        apiConsumerBean.setInterfac(ServiceHello.class);
        apiConsumerBean.setTimeOut(30000L);
        apiConsumerBean.setAddr("127.0.0.1:8888");


        ServiceHello serviceHello = apiConsumerBean.getProxy();
        ApiConsumerBean apiConsumerBean2 = new ApiConsumerBean();
        apiConsumerBean2.setRemotingClient(remotingClient);
        apiConsumerBean2.setInterfac(ServiceWorld.class);
        apiConsumerBean2.setTimeOut(30000L);
        apiConsumerBean2.setAddr("127.0.0.1:8888");
        ServiceWorld serviceWorld = apiConsumerBean2.getProxy();


        System.out.println(serviceHello.sayHello("a", "b"));
        System.out.println(serviceHello.sayHelloInteger(1, 2));
        serviceWorld.sayWorld("aa", "bb");


        System.in.read();
    }
}

初始化一个长连接客户端, 获取接口远程实现实例, 发起调用。

  • 日志输出

Connected to the target VM, address: '127.0.0.1:49830', transport: 'socket'
start ok {"index":{"com.uext.remote.hsf.service.ServiceWorld":{"public abstract void com.uext.remote.hsf.service.ServiceWorld.sayWorld(java.lang.String,java.lang.String)":{}},"com.uext.remote.hsf.service.ServiceHello":{"public abstract java.lang.Integer com.uext.remote.hsf.service.ServiceHello.sayHelloInteger(java.lang.Integer,java.lang.Integer)":{},"public abstract java.lang.String com.uext.remote.hsf.service.ServiceHello.sayHello(java.lang.String,java.lang.String)":{}}},"port":8888,"remotingServer":{"callbackExecutor":{"activeCount":0,"completedTaskCount":0,"corePoolSize":4,"largestPoolSize":0,"maximumPoolSize":4,"poolSize":0,"queue":[],"rejectedExecutionHandler":{},"shutdown":false,"taskCount":0,"terminated":false,"terminating":false,"threadFactory":{}},"rPCHooks":[]}}
world aa bb
Connected to the target VM, address: '127.0.0.1:53211', transport: 'socket'
hello a b
1003

▐  实现方式

  • 请求头 参数协议

import lombok.Data;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@Data
public class CommonHeader implements CommandCustomHeader{
    /**
     * com.uext.remote.hsf.service.ServiceHello
     */
    String interfaceName;
    /**
     * public abstract java.lang.String com.uext.remote.hsf.service.ServiceHello.sayHello(java.lang.String,java.lang.String)
     */
    String methodName;


    String argsJsonJson;


    @Override
    public void checkFields() throws RemotingCommandException {
    }
}

使用接口interface package url 和 方法 method的作为识别码,用以路由选择。

其中动态参数问题,需要考虑如何解决解码为方法参数对应的不同类型,本文采用简易实现(json)。

  • provider实现代码

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import io.netty.channel.ChannelHandlerContext;
import lombok.Data;
import org.apache.rocketmq.remoting.netty.*;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.util.*;
@Data
public class ApiProviderBean {
    private int port = 8888;
    // 长连接实例
    private NettyRemotingServer remotingServer;
    public void init() throws Exception {
        NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(port);
        remotingServer = new NettyRemotingServer(nettyServerConfig, null);
        remotingServer.registerProcessor(0, new NettyRequestProcessor() {
            @Override
            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
                // 请求数据解析
                CommonHeader commonHeader = (CommonHeader) request.decodeCommandCustomHeader(CommonHeader.class);


                // 路由查找
                Map<String/*method*/, Call> map = index.get(commonHeader.getInterfaceName());
                Call call = Objects.requireNonNull(map, "interface not exists " + commonHeader.getInterfaceName()).get(commonHeader.getMethodName());
                if(call == null){
                    throw new RuntimeException("method not exists " + commonHeader.getMethodName());
                }
                // 参数解码 todo 优化解码编码
                Parameter[] ts = call.method.getParameters();
                List<Object> args = new ArrayList<>();
                List<String> argsJson = JSON.parseObject(commonHeader.argsJsonJson, new TypeReference<List<String>>(){});
                for (int i = 0; i < argsJson.size(); i++) {
                    // 根据method规范 逐一反序列化
                    args.add(JSON.parseObject(argsJson.get(i), ts[i].getType()));
                }
                // 反射调用
                Object res = call.method.invoke(call.instance, args.toArray(new Object[0]));
                // 结果编码 回传 todo 优化解码编码
                RemotingCommand response = RemotingCommand.createResponseCommand(0, null);
                if(res != null) {
                    response.setBody(JSON.toJSONBytes(res));
                }
                return response;
            }
            @Override
            public boolean rejectRequest() {
                return false;
            }
        }, null);
        remotingServer.start();
    }
    private static class Call{
        Object instance;
        Method method;
    }
    private Map<String/*interface*/, Map<String/*method*/, Call>> index = new HashMap<>();
    /**
     * @param interfac 接口 协议
     * @param impl 实现类的实例
     */
    public synchronized <T> void register(Class<T> interfac, T impl){
        // 建立 接口-实现类-方法 路由关系
        String iname = interfac.getName();
        Map<String/*method*/, Call> map = index.get(iname);
        if(map == null){
            map = new LinkedHashMap<>();
            index.put(iname, map);
        }
        for (Method declaredMethod : interfac.getDeclaredMethods()) {
            Call call = new Call();
            call.instance = impl;
            call.method = declaredMethod;
            map.put(declaredMethod.toString(), call);
        }
    }
}

关键在于 注册协议(interface)和实现类, 维护映射路由关系。

收到channel请求的数据后,解码,根据映射路由关系进行反射调用拿到结果,编码结果,回写到channel

由于通道code 定义为int,但为了灵活配置接口及实现,不想硬编码,所以丢失了自定义不同业务线程池的特性,如果有需要可以重构通道code为string,然后把相关路由协议序列化到通道code中。


  • consumer实现代码

import com.alibaba.fastjson.JSON;
import lombok.Data;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@Data
public class ApiConsumerBean implements InvocationHandler {
    /**
     * 超时时间
     */
    private Long timeOut = 3000L;
    /**
     * 目标 ip:port
     */
    private String addr = "127.0.0.1:8888";
    /**
     * 实现类
     */
    private Class<?> interfac;
    /**
     * 长连接实例
     */
    private NettyRemotingClient remotingClient;
    /**
     * 获取协议 代理实例
     */
    public <T> T getProxy() throws IllegalArgumentException {
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{interfac}, this);
    }
    /**
     * 规范编码协议
     */
    @Override
    public Object invoke(Object target, Method method, Object[] args) throws Throwable {
        if (Object.class.equals(method.getDeclaringClass())) {
            return method.invoke(this, args);
        }
        // 协议编码入参
        CommonHeader header = new CommonHeader();
        header.setInterfaceName(interfac.getName());
        header.setMethodName(method.toString());
        // todo 优化解码编码
        List<String> argJson = new ArrayList<>();
        for (Object arg : args) {
            argJson.add(JSON.toJSONString(arg));
        }
        header.setArgsJsonJson(JSON.toJSONString(argJson));
        // 定义通道code 0 为 远程使用
        RemotingCommand request = RemotingCommand.createRequestCommand(0, header);
        Object res = null;
        if(method.getReturnType() != null) {
            RemotingCommand response = remotingClient.invokeSync(addr, request, timeOut);
            // 协议解码结果 todo 优化解码编码
            if(response.getBody() != null && response.getBody().length > 0) {
                res = JSON.parseObject(new String(response.getBody(), StandardCharsets.UTF_8), method.getReturnType());
            }
        }else{
            remotingClient.invokeOneway(addr, request, timeOut);
        }
        return res;
    }
}

关键在于 委托接口(interface)的调用实现, 动态代理为: 根据协议编码, 包装request之后写入channel

同步等待, 所以采用了同步调用模式

收到channel响应的结果后, 解码, 返回结果

其中无返回值的接口, 不关心响应结果, 可使用oneway方式调用

▐  更进一步 注册中心 ip自动选择

引入注册中心 zk 或 namesrv,通过中心化协调,让某一些consumer自动选择某一台provider,并同时可以支持配置中心化下放,实现服务治理,越来越像微服务(dubbo)框架了哈。

当然,在跟多业务场景中,是无法引入其他中间件的,能少依赖就少依赖,降低复杂度。

在内网环境中,绝大部分项目采用Axxx一站式发布部署,配套Nxxxxxxx集群云资源管理,是支持按应用名动态获取当前集群ip清单的。

curl http://xxxxx.xxxx 

{
  "num": 164,
  "result": [
    {
      "dns_ip": "13.23.xx.xxx",  
      "state": "working_online"
    },
    ...
  ],
  "start": 0,
  "total": 164
}

那么我们是否可以依赖该 ip清单,用来做本地hash ip自动选择呢?

当然可以,配合可用性心跳探测,每台机器节点自己维护一份可用性提供者消费者清单缓存,通过一致性hash等算法选择机器匹配机器。

那么就得到了一个简易版的低依赖,去中心化,高可用的微服务通信框架。

8247df36812d4690d64d21112872add4.png

团队介绍

大淘宝技术开放平台,是淘宝天猫与外部生态互联互通的重要开放途径,通过开放的产品技术把一系列基础服务像水、电、煤一样输送给我们的商家、开发者、社区媒体以及其他合作伙伴,推动行业的定制、创新、进化,并最终促成新商业文明生态圈。
我们是一支技术能力雄厚,有着光荣历史传统的技术团队。在历年双十一战场上,团队都表现着优异的成绩。这里承载着每秒百万级的业务处理,90%的订单通过订单推送服务实时地推送到商家的ERP系统完成电商作业,通过奇门开放的ERP-WMS场景已经成为仓储行业标准。随着新零售业务的持续探索与快速发展,我们渴求各路高手加入,参与核心系统架构设计、性能调优,开放模式创新等富有技术挑战的工作。

¤ 拓展阅读 ¤

3DXR技术 | 终端技术 | 音视频技术

服务端技术 | 技术质量 | 数据算法

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/795722.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

065、故障处理之OMM_TiKV

TiKV Server OOM 对业务的影响 TiKV 上的请求失败造成异常退出region leader重新选举 raft group 开始选举新的 region leader新的region leader 上报信息给PD Server region cache频繁更新 在访问TiDB Server的region cache时&#xff0c;出现TiKV rpc相关报错后台自动进行Ba…

解放程序员,加速创新,缺少的就是一个工具而已

随着科技的不断进步和应用场景的不断扩大&#xff0c;软件开发已经成为当今世界的核心驱动力之一。 然而&#xff0c;传统的软件开发模式往往存在着繁琐的编码过程、复杂的架构设计和漫长的调试周期&#xff0c;使得程序员们难以专注于创新和高难度的研究。 很多程序员上班的时…

基于解析法和遗传算法相结合的配电网多台分布式电源降损配置(Matlab实现)

目录 1 概述 2 数学模型 2.1 问题表述 2.2 DG的最佳位置和容量&#xff08;解析法&#xff09; 2.3 使用 GA 进行最佳功率因数确定和 DG 分配 3 仿真结果与讨论 3.1 33 节点测试配电系统的仿真 3.2 69 节点测试配电系统仿真 4 结论 1 概述 为了使系统网损达到最低值&a…

一分钟学会利用GPT编写爆款标题

&#x1f3c6; 文章目标&#xff1a;学习利用GPT编写爆款标题 &#x1f340; 入门篇&#xff1a;一分钟学会利用GPT编写爆款标题 ✅ 创作者&#xff1a;熊猫Jay ✨ 个人公众号: 熊猫Jay字节之旅 (文末有链接) &#x1f341; 展望&#xff1a;若本篇讲解内容帮助到您&#xff0c…

Web网站性能压测实践 | 数据平台

一、 为什么要做压测&#xff1f; 首先解释下为什么要做性能压测&#xff1a;根据 Amazon 统计&#xff0c;每慢 100 毫秒&#xff0c;交易额下降 1%。这个统计数据为大家敲响了警钟&#xff0c;也客观说明了性能压测对于企业应用的重要性。从具体的OKR上讲&#xff0c;我们希望…

常见排序算法-Python实现

python 排序 算法 1.二分法 ​ python 32行 #codingutf-8 def binary_search(input_array, value): """Your code goes here.""" length len(input_array) left 0 right length-1 if length 1: return 0 if value input_value[0] els…

Linux 多线程并发Socket服务端的实现( 11 ) -【Linux通信架构系列 】

系列文章目录 C技能系列 Linux通信架构系列 C高性能优化编程系列 深入理解软件架构设计系列 高级C并发线程编程 设计模式系列 期待你的关注哦&#xff01;&#xff01;&#xff01; 现在的一切都是为将来的梦想编织翅膀&#xff0c;让梦想在现实中展翅高飞。 Now everythi…

3秒快速打开 jupyter notebook

利用 bat 脚本&#xff0c;实现一键打开 minconda 特点&#xff1a; 1、可指定 python 环境 2、可指定 jupyter 目录 一、配置环境 minconda 可以搭建不同的 python 环境&#xff0c;所以我们需要找到 minconda 安装目录&#xff0c;把对应目录添加到电脑环境 PATH 中&#…

prepros.crack.7.8.5 by Xacker

您友好的 Web 开发伙伴 Prepros 编译您的文件&#xff0c;转译您的 JavaScript&#xff0c;重新加载您的浏览器&#xff0c;并使开发和测试您的网站变得非常容易&#xff0c;这样您就可以专注于使它们完美。 适用于 Windows、macOS 和 Linux 试用版包括所有 Prepros 功能。 编…

【数据结构】树状数组和线段树

树状数组和线段树 下文为自己的题解总结&#xff0c;参考其他题解写成&#xff0c;取其精华&#xff0c;做以笔记&#xff0c;如有描述不清楚或者错误麻烦指正&#xff0c;不胜感激&#xff0c;不喜勿喷&#xff01; 树状数组 需求&#xff1a; 能够快速计算区间和保证在修改…

了解 MySQL 中 MVCC 的原理

点击上方↑“追梦 Java”关注&#xff0c;一起追梦&#xff01; 要解决读一致性的问题&#xff0c;保证一个事务中前后两次读取数据结果一致&#xff0c;还有一种 MVCC 的方式&#xff0c;又叫多版本的并发控制&#xff08;Multi Version Concurrency Control&#xff09;。 MV…

Flink状态的理解

Flink是一个带状态的数据处理系统&#xff1b;系统在处理数据的过程中&#xff0c;各算子所记录的状态会随着数据的处理而不断变化&#xff1b; 1. 状态 所谓状态State&#xff0c;一般指一个具体的 Task 的状态&#xff0c;即线程处理过程中需要保存的历史数据或历史累计数据…

SpringBoot集成Redis的环境部署以及操作Redis

文章目录 Spring Boot 集成Redis1.环境配置 redis连接配置信息不写默认wei6379&#xff0c;数据库为02.操作Redis2.1 代码形式操作Redis2.2 使用注解方式操作Redis Spring Boot 集成Redis 1.环境配置 添加redis依赖 在老项目添加&#xff0c;可以在pom.xml文件直接添加&#…

DAMO-YOLO 论文学习

1. 解决了什么问题&#xff1f; 工业界追求高性能、低延迟的目标检测算法&#xff0c;研究人员于是聚焦于单阶段目标检测&#xff0c;探索高效的网络结构和训练策略。YOLOv5/v6/v7、YOLOX 和 PP-YOLOE 在 COCO 数据集上实现了不错的精度-速度平衡&#xff0c;得到广泛应用&…

超标量处理器寄存器rename

1.相关性介绍 在CPU中&#xff0c;一段程序会被编译成一连串的汇编指令&#xff0c;指令与指令之间可能会具有相关性&#xff08;dependency&#xff09;。所谓相关性&#xff0c;即一条指令的执行会依赖于另一条指令的结果&#xff0c;相关性可以分为&#xff1a;① 数据相关性…

el-table树形表格实现复选框多选效果

2023.7.26今天我学习了如何使用树形表格的时候进行复选框的多选效果。 当我们使用树形结构表格需要进行多选功能操作的时候会发现点击全选的时候&#xff0c;只有一级表格数据会被选中&#xff0c;问题如图&#xff1a; 我们需要实现的是点击全选的不管是几级表格数据都可以被…

ElasticSearch之IK分词器安装以及使用介绍

文章目录 一、IK 分词器简介1. 支持细粒度分词&#xff1a;2. 支持多种分词模式&#xff1a;3. 支持自定义词典&#xff1a;4. 支持拼音分词&#xff1a;5. 易于集成和使用&#xff1a; 二、安装步骤1、下载 IK 分词器插件&#xff1a;2、安装 IK 分词器插件&#xff1a;3. 安装…

各种知名游戏的技术分析

介绍一个GitHub&#xff0c;里面包括了市面上的各种游戏的技术分析&#xff0c;包括渲染管线、工作流、技术文章等等&#xff0c;在做某个类型的游戏的时候&#xff0c;可以针对某个游戏去进行技术参考&#xff0c;特别实用。 GitHub - OTFCG/Awesome-Game-Analysis: a compre…

C++设计模式之模板方法、策略模式、观察者模式

面向对象设计模式是”好的面向对象设计“&#xff0c;所谓”好的面向对象设计“指的是可以满足”应对变化&#xff0c;提高复用“的设计。 现代软件设计的特征是”需求的频繁变化“。设计模式的要点是”寻求变化点&#xff0c;然后在变化点处应用设计模式&#xff0c;从而更好地…

力扣天天练--week3-LeetCode75

topic75-9-t443:压缩字符串 题目描述&#xff1a; 给你一个字符数组 chars &#xff0c;请使用下述算法压缩&#xff1a; 从一个空字符串 s 开始。对于 chars 中的每组 连续重复字符 &#xff1a; 如果这一组长度为 1 &#xff0c;则将字符追加到 s 中。 否则&#xff0c;需…