dubbo源码实践-Exchange 信息交换层例子

news2025/1/15 12:56:33

1 Exchange 层概述

官方定义:

exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer。

其中Exchanger是SPI扩展点,是该层的入口。其中客户端通过ExchangeClient.request发送请求,服务端通过ExchangeHandler的reply方法处理请求并返回结果。

为了理解上面官方的定义,下面将使用该层的类创建一个客户端和服务器端的应用。

2 实践例子

2.1 项目结构

由于是TCP框架,所以有服务端和客户端,两端的代码。

服务端代码:ExchangeServerTest 启动类,AlfServerExchangeHandler服务端的业务逻辑处理类(类似Netty的Handler作用)。

客户端代码:ExchangeClientTest 启动类,AlfClientExchangeHandler 客户端的业务逻辑处理类。

2.2 服务端代码

ExchangeServerTest类,使用Exchanger接口绑定(bind)端口8888,启动服务器。

注意URL中添加的codec属性,如果不添加程序会走telnet的实现,程序会报错。可以参考:可以参考AbstractEndpoint的getChannelCodec函数。

package org.example.dubbo.exchange;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.ExchangeServer;
import org.apache.dubbo.remoting.exchange.Exchanger;
import org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger;

import java.io.IOException;

/** 服务端代码 */
public class ExchangeServerTest {
    public static void main(String[] args) throws RemotingException, IOException {
        //构建URL, dubbo中靠URL来传递参数
        URLBuilder urlBuilder = new URLBuilder();
        urlBuilder.setHost("localhost");
        urlBuilder.setPort(8888);
        //指定超时事件, 调试时防止超时
        urlBuilder.addParameter("codec", "exchange");
        URL url = urlBuilder.build();

        //Exchanger层入口类,可以通过SPI方式获取实现,这里为了简单直接new了HeaderExchanger
        Exchanger exchanger = new HeaderExchanger();
        //服务端调用bind方法
        ExchangeServer exchangeServer = exchanger.bind(url, new AlfServerExchangeHandler());

        System.out.println("服务器启动完成");
        //等待,防止程序提前结束
        System.in.read();
    }
}

AlfServerExchangeHandler类,主要关注reply方法,该方法处理客户端发来的请求,然后通过CompletableFuture异步返回给客户端。还记得官方说的“同步转异步”吗?这里是一个体现。

package org.example.dubbo.exchange;

import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.ExchangeChannel;
import org.apache.dubbo.remoting.exchange.ExchangeHandler;

import java.util.concurrent.CompletableFuture;

/** 服务端业务处理 */
public class AlfServerExchangeHandler implements ExchangeHandler {

    @Override
    public CompletableFuture<Object> reply(ExchangeChannel channel, Object request) throws RemotingException {
        System.out.println("reply AAA, request=" + request);
        CompletableFuture<Object> stringCompletableFuture = CompletableFuture.supplyAsync(() -> "服务器8888为你服务");
        return stringCompletableFuture;
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        System.out.println("connected AAA");
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        System.out.println("disconnected AAA");
    }

    @Override
    public void sent(Channel channel, Object message) throws RemotingException {
        System.out.println("sent AAA, message =" + message);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        System.out.println("received AAA");
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        System.out.println("caught AAA");
        exception.printStackTrace();
    }

    @Override
    public String telnet(Channel channel, String message) throws RemotingException {
        System.out.println("telnet AAA, message = " + message);
        return null;
    }
}

2.3 客户端代码

ExchangeClientTest客户端代码入口,使用Exchanger接口连接(connect)函数来连接本机的8888端口。

注意一下CompletableFuture<Object> completableFuture = exchangeClient.request("你是谁?", null); 这句代码,通过是哦也能够CompletableFuture,把获取相应结果异步了。

package org.example.dubbo.exchange;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.URLBuilder;

import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.*;
import org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/** 客户端*/
public class ExchangeClientTest {
    public static void main(String[] args) throws RemotingException, IOException, ExecutionException, InterruptedException {
        //构建URL, dubbo中靠URL来传递参数
        URLBuilder urlBuilder = new URLBuilder();
        urlBuilder.setHost("localhost");
        urlBuilder.setPort(8888);
        //指定超时事件, 调试时防止超时
        urlBuilder.addParameter("timeout", 1000 * 200);
        //指定编码器,可以参考AbstractEndpoint的getChannelCodec函数
        urlBuilder.addParameter("codec", "exchange");
        URL url = urlBuilder.build();
        //Exchanger层入口类,可以通过SPI方式获取实现,这里为了简单直接new了HeaderExchanger
        Exchanger exchanger = new HeaderExchanger();

        //客户端调用connect方法连接服务端
        ExchangeClient exchangeClient = exchanger.connect(url, new AlfClientExchangeHandler());
        //发送消息
        CompletableFuture<Object> completableFuture = exchangeClient.request("你是谁?", null);
        System.out.println("客户端发送消息----------");
        Object o = completableFuture.get();
        System.out.println("客户端接收到消息----------" + o);
        //等待,防止程序提前结束
        System.in.read();
    }
}

AlfClientExchangeHandler类,业务处理器。主要是展示一下发送消息和获取响应时被掉用的方法。

package org.example.dubbo.exchange;

import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.ExchangeChannel;
import org.apache.dubbo.remoting.exchange.ExchangeHandler;

import java.util.concurrent.CompletableFuture;

/** 客户端处理器*/
public class AlfClientExchangeHandler implements ExchangeHandler {

    @Override
    public CompletableFuture<Object> reply(ExchangeChannel channel, Object request) throws RemotingException {
        //本例中是客户端发送的请求,所以该方法不会被调用的
        System.out.println("reply BBB, request=" + request);
        CompletableFuture<Object> stringCompletableFuture = CompletableFuture.supplyAsync(() -> "客户端返回数据");
        return stringCompletableFuture;
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        System.out.println("connected BBB");
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        System.out.println("disconnected BBB");
    }

    @Override
    public void sent(Channel channel, Object message) throws RemotingException {
        System.out.println("sent BBB, message =" + message);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        System.out.println("received BBB");
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        System.out.println("caught BBB");
    }

    @Override
    public String telnet(Channel channel, String message) throws RemotingException {
        System.out.println("telnet BBB, message = " + message);
        return null;
    }
}

2.4 运行结果

启动服务端,再启动客户端,然后查看日志。

2.4.1服务端日志

1处:服务器启动成功。

2处:AlfServerExchangeHandler中的connected方法别回调了,表明有客户端连接了服务器。

3 处:AlfServerExchangeHandler中的reply方法别回调了,表明服务器收到了客户端发送的Request请求,并在这个方法中进行逻辑处理。

4 处:AlfServerExchangeHandler中的sent方法别回调了,表明服务器发送了一个消息(这里的消息其实是我们收到request处理完毕后的返回结果Response对象)。

2.4.2 客户端日志

1 处:AlfClientExchangeHandler中的connected方法被回调了,表明客户端连接服务器成功了。

2 处:代码中打印的输出System.out.println("客户端发送消息----------")

3 处:AlfClientExchangeHandler中的sent方法被回调了,表明客户端发送了一个请求(Request对象)。

4 处:通过CompletableFuture对象获取服务器的返回内容(这里是字符串)

3 总结

看完上面的代码例子,应该能理解“exchange 信息交换层:封装请求响应模式,同步转异步。”的含义了吧。😁

顺便提一下,看到图中的request方法和reply方法了吗?Protocol层会调用,其实就是我们例子中调用的发送Request和响应Response的两个方法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/142961.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

虹科分享 | 网络流量监控 | 构建大型捕获文件(Ⅰ)——Wireshark过滤器和其他Allegro网络万用表工具

数据包分析是一个复杂的话题。如果在没有设置参数的情况下启动Wireshark&#xff0c;就会开始实时捕获或打开一个预先录制的pcap文件。在很短的时间内&#xff0c;可能有成千上万的数据包等待分析。有一种危险&#xff0c;就是被大量的数据困住了。 然而&#xff0c;如果用户想…

BPF学习笔记(八)--Linux tracing system对比分析

Linux trace技术发展已久&#xff0c;经常看到很多的专业术语&#xff0c;从perf LTTng systemtap bpftrace tracepoint trace BCC bpf ebpf等词汇&#xff0c;这些关键的词汇有着怎样的联系和关联&#xff0c;通过下面的这个图可以直观的认识到这几种关键技术的内在联系。 整…

Java认识多线程与Thread类的使用

目录 认识线程&#xff08;Thread&#xff09; 概念 为什么会有线程的出现&#xff1f; 刨根问底。为什么进程的创建与销毁效率很低呢&#xff1f; 多线程的轻量体现&#xff1a; 进程与线程的区别 第一个多线程程序 抢占式执行是啥 JDK中jconsole工具的使用 创建线程…

Live800:智能客服机器人的知识库怎么创建?

智能客服机器人的知识库是以知识为基础的系统&#xff0c;它可以明确地表达与实际问题相对应的知识&#xff0c;并构成相对独立的程序行为主体&#xff0c;有利于有效、准确地解决实际问题。从本质上来说&#xff0c;智能客服机器人的知识库实际上就像人类的大脑&#xff0c;储…

Flutter关于软键盘的一些问题

Scaffold类有个resizeToAvoidBottomInset 属性&#xff0c;它的作用是当弹出软键盘的时候&#xff0c;可以自动调节body区域的高度&#xff0c;撑起body的内容&#xff0c;使其底部高度刚好为键盘的高度&#xff0c;这样一来就不至于让键盘覆盖内容。 Scaffold( /// ..... ///…

教你如何搭建CRM—商机管理系统的demo

1、简介 1.1、案例简介 本文将介绍&#xff0c;如何搭建CRM-商机管理。 1.2、应用场景 CRM-商机管理应用完整记录所有商机资料&#xff0c;合理的对商机进行销售阶段的变更&#xff0c;实现商机管理智能化。 2、设置方法 2.1、表单搭建 1&#xff09;新建主表【商机】表…

进程间通信——共享内存

目录 1 概念 2 操作流程 fork(获取key值) shmget(申请对象) shmat(内存映射) 读写共享内存&#xff1a;类似堆区内存的直接读写 shmdt(解除映射) shmctl(删除对象) 范例&#xff1a; 1 概念 共享内存是进程间通信中最简单最高效的方式之一。共享内存允许两个或更多进程…

使用Python的Selenium进行网络自动化的入门教程

使用Python的Selenium进行网络自动化入门 自动化可以被看作是在使用电子机器或机器人来执行任务的过程中去除人力的过程。 在这篇文章中&#xff0c;我们将研究网络流程的自动化。 让软件机器人在网络上自动执行流程和任务的能力被称为网络自动化。 使用网络自动化&#xf…

2022年协议转让投资策略研究报告

第一章 协议转让的概念 协议转让是指双方当事人就转让标的物所有权达成协议&#xff0c;是典型的商业交易方式。而在破产案件中&#xff0c;则是一种有别于拍卖和以物抵债的处置财产的方式。根据《企业破产法》第112条&#xff0c;变价出售财产应当通过拍卖进行。但是&#xf…

云原生|kubernetes|安全漏扫神器trivy的部署和使用

前言&#xff1a; 云原生领域内的安全漏扫工具有clair和trivy是比较常用的&#xff0c;而安全漏扫工具是可以和harbor这样的私有仓库集成的&#xff0c;自harbor-1.21版以后都是默认使用trivy这个漏扫工具的&#xff0c;而在此之前是使用clair的。 那么&#xff0c;本文将就什…

实验十七 VLAN间的三层通信

实验十七 VLAN间的三层通信配置要求&#xff1a;通过三层交换机实现VLAN间互通通过单臂路由实现VLAN间互通网络拓扑图&#xff1a;操作步骤&#xff1a;一、 通过三层交换机实现VLAN间互通1、配置交换机LSW1的接口为trunk接口&#xff0c;g0/0/1口允许vlan 10通过&#xff0c;g…

构建器/生成器模式Builder

1.意图&#xff1a;将一个复杂对象的构建与它的表示分离&#xff0c;使得同样的构建过程可以创建不同的表示。 2.结构 Builder为创建一个Product对象的各个部件指定抽象接口。 ConcreteBuilder实现Builder的接口以构造和装配该产品的各个部件&#xff0c;定义并明确它所创建的…

实验十五 IS-IS协议基本配置

实验十五 IS-IS协议基本配置IS-IS(中间系统到中间系统)协议与OSPF(开放最短路径优先)协议有许多类似之处&#xff0c; 如都是链路状态的IGP路由协议&#xff0c;采用的都SPF路由算法&#xff0c;都划分了区域。为了支持大规模 的路由网络&#xff0c;IS-IS在自治系统内采用骨丁…

四、MySQL 存储引擎及数据类型

文章目录一、前置知识二、MySQL 存储引擎(先了解&#xff0c;初步有个印象)2.1 MySQL 存储引擎的概念2.2 查询 MySQL 中支持的存储引擎2.3 InnoDB 存储引擎2.4 MyISAM 存储引擎2.5 MEMORY 存储引擎2.6 如何选择 MySQL 存储引擎&#xff1f;三、MySQL 数据类型3.1 数字类型3.2 日…

Dataway让SpringBoot不在需要Controller、Service、DAO、Mapper了

Dataway介绍 Dataway 是基于 DataQL 服务聚合能力&#xff0c;为应用提供的一个接口配置工具。使得使用者无需开发任何代码就配置一个满足需求的接口。整个接口配置、测试、冒烟、发布。一站式都通过 Dataway 提供的 UI 界面完成。UI 会以 Jar 包方式提供并集成到应用中并和应用…

君子签用区块链打造电子合同证据链闭环,提升电子合同证据效力

电子合同作为电子证据的主要表现形式&#xff0c;采用电子合同发生纠纷时&#xff0c;相关的电子合同数据成为证据证明的关键。运用区块链去中心化、分布式存储、加密可溯源等技术特性&#xff0c;让电子合同的整个过程都可以在互联网上安全地进行&#xff0c;帮助打通数据信任…

JavaERP系统源码+数据库,业务闭环、灵活稳定的企业级ERP系统,真正的财务业务一体化系统

JavaERP系统源码数据库&#xff0c;业务闭环、灵活稳定的企业级ERP系统&#xff0c;真正的财务业务一体化系统 完整代码下载地址&#xff1a;JavaERP系统源码数据库 产品展示 社区版 主界面 权限模块 职员与组织 科目结构 库存管理 库存盘点 订单管理 出入库管理 凭…

【Spring【AOP】】——21、@EnableAspectJAutoProxy注解详解?

&#x1f4eb;作者简介&#xff1a;zhz小白 公众号&#xff1a;小白的Java进阶之路 专业技能&#xff1a; 1、Java基础&#xff0c;并精通多线程的开发&#xff0c;熟悉JVM原理 2、熟悉Java基础&#xff0c;并精通多线程的开发&#xff0c;熟悉JVM原理&#xff0c;具备⼀定的线…

Docker二进制安装

目录 1、选择安装目录 2、了解默认安装 3、Docker网络了解 docker官网 1、选择安装目录 安装包下载 链接&#xff1a;https://pan.baidu.com/s/1mbUl2XLnlN4xZuHbvRF-vg?pwdpdoq 提取码&#xff1a;pdoq docker官网 1、选择安装目录 docker指定数据存储目录到 /data/…

mysql 乐观锁和悲观锁

悲观锁介绍&#xff08;百科&#xff09;&#xff1a; 悲观锁&#xff0c;正如其名&#xff0c;它指的是对数据被外界&#xff08;包括本系统当前的其他事务&#xff0c;以及来自外部系统的事务处理&#xff09;修改持保守态度&#xff0c;因此&#xff0c;在整个数据处理过程中…