1 Exchange 层概述
官方定义:
exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer。
其中Exchanger是SPI扩展点,是该层的入口。其中客户端通过ExchangeClient.request发送请求,服务端通过ExchangeHandler的reply方法处理请求并返回结果。
为了理解上面官方的定义,下面将使用该层的类创建一个客户端和服务器端的应用。
2 实践例子
2.1 项目结构
由于是TCP框架,所以有服务端和客户端,两端的代码。
服务端代码:ExchangeServerTest 启动类,AlfServerExchangeHandler服务端的业务逻辑处理类(类似Netty的Handler作用)。
客户端代码:ExchangeClientTest 启动类,AlfClientExchangeHandler 客户端的业务逻辑处理类。
2.2 服务端代码
ExchangeServerTest类,使用Exchanger接口绑定(bind)端口8888,启动服务器。
注意URL中添加的codec属性,如果不添加程序会走telnet的实现,程序会报错。可以参考:可以参考AbstractEndpoint的getChannelCodec函数。
package org.example.dubbo.exchange;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.ExchangeServer;
import org.apache.dubbo.remoting.exchange.Exchanger;
import org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger;
import java.io.IOException;
/** 服务端代码 */
public class ExchangeServerTest {
public static void main(String[] args) throws RemotingException, IOException {
//构建URL, dubbo中靠URL来传递参数
URLBuilder urlBuilder = new URLBuilder();
urlBuilder.setHost("localhost");
urlBuilder.setPort(8888);
//指定超时事件, 调试时防止超时
urlBuilder.addParameter("codec", "exchange");
URL url = urlBuilder.build();
//Exchanger层入口类,可以通过SPI方式获取实现,这里为了简单直接new了HeaderExchanger
Exchanger exchanger = new HeaderExchanger();
//服务端调用bind方法
ExchangeServer exchangeServer = exchanger.bind(url, new AlfServerExchangeHandler());
System.out.println("服务器启动完成");
//等待,防止程序提前结束
System.in.read();
}
}
AlfServerExchangeHandler类,主要关注reply方法,该方法处理客户端发来的请求,然后通过CompletableFuture异步返回给客户端。还记得官方说的“同步转异步”吗?这里是一个体现。
package org.example.dubbo.exchange;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.ExchangeChannel;
import org.apache.dubbo.remoting.exchange.ExchangeHandler;
import java.util.concurrent.CompletableFuture;
/** 服务端业务处理 */
public class AlfServerExchangeHandler implements ExchangeHandler {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object request) throws RemotingException {
System.out.println("reply AAA, request=" + request);
CompletableFuture<Object> stringCompletableFuture = CompletableFuture.supplyAsync(() -> "服务器8888为你服务");
return stringCompletableFuture;
}
@Override
public void connected(Channel channel) throws RemotingException {
System.out.println("connected AAA");
}
@Override
public void disconnected(Channel channel) throws RemotingException {
System.out.println("disconnected AAA");
}
@Override
public void sent(Channel channel, Object message) throws RemotingException {
System.out.println("sent AAA, message =" + message);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
System.out.println("received AAA");
}
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
System.out.println("caught AAA");
exception.printStackTrace();
}
@Override
public String telnet(Channel channel, String message) throws RemotingException {
System.out.println("telnet AAA, message = " + message);
return null;
}
}
2.3 客户端代码
ExchangeClientTest客户端代码入口,使用Exchanger接口连接(connect)函数来连接本机的8888端口。
注意一下CompletableFuture<Object> completableFuture = exchangeClient.request("你是谁?", null); 这句代码,通过是哦也能够CompletableFuture,把获取相应结果异步了。
package org.example.dubbo.exchange;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.*;
import org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/** 客户端*/
public class ExchangeClientTest {
public static void main(String[] args) throws RemotingException, IOException, ExecutionException, InterruptedException {
//构建URL, dubbo中靠URL来传递参数
URLBuilder urlBuilder = new URLBuilder();
urlBuilder.setHost("localhost");
urlBuilder.setPort(8888);
//指定超时事件, 调试时防止超时
urlBuilder.addParameter("timeout", 1000 * 200);
//指定编码器,可以参考AbstractEndpoint的getChannelCodec函数
urlBuilder.addParameter("codec", "exchange");
URL url = urlBuilder.build();
//Exchanger层入口类,可以通过SPI方式获取实现,这里为了简单直接new了HeaderExchanger
Exchanger exchanger = new HeaderExchanger();
//客户端调用connect方法连接服务端
ExchangeClient exchangeClient = exchanger.connect(url, new AlfClientExchangeHandler());
//发送消息
CompletableFuture<Object> completableFuture = exchangeClient.request("你是谁?", null);
System.out.println("客户端发送消息----------");
Object o = completableFuture.get();
System.out.println("客户端接收到消息----------" + o);
//等待,防止程序提前结束
System.in.read();
}
}
AlfClientExchangeHandler类,业务处理器。主要是展示一下发送消息和获取响应时被掉用的方法。
package org.example.dubbo.exchange;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.ExchangeChannel;
import org.apache.dubbo.remoting.exchange.ExchangeHandler;
import java.util.concurrent.CompletableFuture;
/** 客户端处理器*/
public class AlfClientExchangeHandler implements ExchangeHandler {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object request) throws RemotingException {
//本例中是客户端发送的请求,所以该方法不会被调用的
System.out.println("reply BBB, request=" + request);
CompletableFuture<Object> stringCompletableFuture = CompletableFuture.supplyAsync(() -> "客户端返回数据");
return stringCompletableFuture;
}
@Override
public void connected(Channel channel) throws RemotingException {
System.out.println("connected BBB");
}
@Override
public void disconnected(Channel channel) throws RemotingException {
System.out.println("disconnected BBB");
}
@Override
public void sent(Channel channel, Object message) throws RemotingException {
System.out.println("sent BBB, message =" + message);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
System.out.println("received BBB");
}
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
System.out.println("caught BBB");
}
@Override
public String telnet(Channel channel, String message) throws RemotingException {
System.out.println("telnet BBB, message = " + message);
return null;
}
}
2.4 运行结果
启动服务端,再启动客户端,然后查看日志。
2.4.1服务端日志
1处:服务器启动成功。
2处:AlfServerExchangeHandler中的connected方法别回调了,表明有客户端连接了服务器。
3 处:AlfServerExchangeHandler中的reply方法别回调了,表明服务器收到了客户端发送的Request请求,并在这个方法中进行逻辑处理。
4 处:AlfServerExchangeHandler中的sent方法别回调了,表明服务器发送了一个消息(这里的消息其实是我们收到request处理完毕后的返回结果Response对象)。
2.4.2 客户端日志
1 处:AlfClientExchangeHandler中的connected方法被回调了,表明客户端连接服务器成功了。
2 处:代码中打印的输出System.out.println("客户端发送消息----------")
3 处:AlfClientExchangeHandler中的sent方法被回调了,表明客户端发送了一个请求(Request对象)。
4 处:通过CompletableFuture对象获取服务器的返回内容(这里是字符串)
3 总结
看完上面的代码例子,应该能理解“exchange 信息交换层:封装请求响应模式,同步转异步。”的含义了吧。😁
顺便提一下,看到图中的request方法和reply方法了吗?Protocol层会调用,其实就是我们例子中调用的发送Request和响应Response的两个方法。