Apache Zeppelin系列教程第六篇——Zengine调用Interpreter原理分析

news2025/1/12 20:48:04

Apache Zeppelin系列教程第五篇——Interpreter原理分析_诸葛子房_的博客-CSDN博客

Apache Zeppelin系列教程第四篇——JDBCInterpreter原理分析_诸葛子房_的博客-CSDN博客

前文介绍jdbc interpreter和interpreter模块交互代码,本篇文章主要分析Zengine调用Interpreter模块代码。

介绍完这篇文章之后,我们即可将paragraph run的流程串起来(后面会将整个流程进行串讲)

同样,来看下这个测试类

zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java

  @Test
  public void testFIFOScheduler() throws InterruptedException, InterpreterException {
    LOGGER.info("===testFIFOScheduler====");
    interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
    // by default SleepInterpreter would use FIFOScheduler
    LOGGER.info("===getInterpreter====");
    final Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", note1Id, "sleep");
    LOGGER.info("===createDummyInterpreterContext====");
    final InterpreterContext context1 = createDummyInterpreterContext();
    // run this dummy interpret method first to launch the RemoteInterpreterProcess to avoid the
    // time overhead of launching the process.
    LOGGER.info("111");
    LOGGER.info("=====name:{}=======",interpreter1.getClassName());
    System.out.println(interpreter1.getClassName());
    interpreter1.interpret("10101", context1);
    LOGGER.info("222");
    Thread thread1 = new Thread() {
      @Override
      public void run() {
        try {
          assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code());
        } catch (InterpreterException e) {
          e.printStackTrace();
          fail();
        }
      }
    };
    Thread thread2 = new Thread() {
      @Override
      public void run() {
        try {
          assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code());
        } catch (InterpreterException e) {
          e.printStackTrace();
          fail();
        }
      }
    };
    long start = System.currentTimeMillis();
    thread1.start();
    thread2.start();
    thread1.join();
    thread2.join();
    long end = System.currentTimeMillis();
    assertTrue((end - start) >= 200);
  }

可以看下这个测试方法,这边加了一些日志

RemoteInterpreterTest 继承 AbstractInterpreterTest 里面的抽象类,会先执行setUp方法对读取配置文件信息interpreter 进行初始化

核心主要是执行RemoteInterpreter里面的 interpret 方法,

  @Override
  public InterpreterResult interpret(final String st, final InterpreterContext context)
      throws InterpreterException {
    LOGGER.info("st:\n{}", st);
    if (LOGGER.isDebugEnabled()) {
      LOGGER.debug("st:\n{}", st);
    }

    final FormType form = getFormType();
    RemoteInterpreterProcess interpreterProcess = null;
    try {
      interpreterProcess = getOrCreateInterpreterProcess();
    } catch (IOException e) {
      throw new InterpreterException(e);
    }
    if (!interpreterProcess.isRunning()) {
      return new InterpreterResult(InterpreterResult.Code.ERROR,
              "Interpreter process is not running\n" + interpreterProcess.getErrorMessage());
    }
    return interpreterProcess.callRemoteFunction(client -> {
          RemoteInterpreterResult remoteResult = client.interpret(
              sessionId, className, st, convert(context));
          Map<String, Object> remoteConfig = (Map<String, Object>) GSON.fromJson(
              remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
              }.getType());
          context.getConfig().clear();
          if (remoteConfig != null) {
            context.getConfig().putAll(remoteConfig);
          }
          GUI currentGUI = context.getGui();
          GUI currentNoteGUI = context.getNoteGui();
          if (form == FormType.NATIVE) {
            GUI remoteGui = GUI.fromJson(remoteResult.getGui());
            GUI remoteNoteGui = GUI.fromJson(remoteResult.getNoteGui());
            currentGUI.clear();
            currentGUI.setParams(remoteGui.getParams());
            currentGUI.setForms(remoteGui.getForms());
            currentNoteGUI.setParams(remoteNoteGui.getParams());
            currentNoteGUI.setForms(remoteNoteGui.getForms());
          } else if (form == FormType.SIMPLE) {
            final Map<String, Input> currentForms = currentGUI.getForms();
            final Map<String, Object> currentParams = currentGUI.getParams();
            final GUI remoteGUI = GUI.fromJson(remoteResult.getGui());
            final Map<String, Input> remoteForms = remoteGUI.getForms();
            final Map<String, Object> remoteParams = remoteGUI.getParams();
            currentForms.putAll(remoteForms);
            currentParams.putAll(remoteParams);
          }

          return convert(remoteResult);
        }
    );

  }

其中getOrCreateInterpreterProcess()一路点下去 最终是去调用zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java 里面的start 方法,通过 commons-exec命令执行shell 或者cmd 脚本(bin/interpreter.sh) 启动一个独立的进程,shell 脚本里面具体执行的类(org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer),和前一篇文章interpreter 原理分析相呼应

  @Override
  public void start(String userName) throws IOException {
    // start server process
    CommandLine cmdLine = CommandLine.parse(interpreterRunner);
    cmdLine.addArgument("-d", false);
    cmdLine.addArgument(getInterpreterDir(), false);
    cmdLine.addArgument("-c", false);
    cmdLine.addArgument(getIntpEventServerHost(), false);
    cmdLine.addArgument("-p", false);
    cmdLine.addArgument(String.valueOf(intpEventServerPort), false);
    cmdLine.addArgument("-r", false);
    cmdLine.addArgument(getInterpreterPortRange(), false);
    cmdLine.addArgument("-i", false);
    cmdLine.addArgument(getInterpreterGroupId(), false);
    if (isUserImpersonated() && !userName.equals("anonymous")) {
      cmdLine.addArgument("-u", false);
      cmdLine.addArgument(userName, false);
    }
    cmdLine.addArgument("-l", false);
    cmdLine.addArgument(getLocalRepoDir(), false);
    cmdLine.addArgument("-g", false);
    cmdLine.addArgument(getInterpreterSettingName(), false);

    interpreterProcessLauncher = new InterpreterProcessLauncher(cmdLine, getEnv());
    interpreterProcessLauncher.launch();
    interpreterProcessLauncher.waitForReady(getConnectTimeout());
    if (interpreterProcessLauncher.isLaunchTimeout()) {
      throw new IOException(
          String.format("Interpreter Process creation is time out in %d seconds", getConnectTimeout() / 1000) + "\n"
              + "You can increase timeout threshold via "
              + "setting zeppelin.interpreter.connect.timeout of this interpreter.\n"
              + interpreterProcessLauncher.getErrorMessage());
    }

    if (!interpreterProcessLauncher.isRunning()) {
      throw new IOException("Fail to launch interpreter process:\n" + interpreterProcessLauncher.getErrorMessage());
    }

    if (isHadoopClientAvailable()) {
      String launchOutput = interpreterProcessLauncher.getProcessLaunchOutput();
      Matcher m = YARN_APP_PATTER.matcher(launchOutput);
      if (m.find()) {
        String appId = m.group(1);
        LOGGER.info("Detected yarn app: {}, add it to YarnAppMonitor", appId);
        YarnAppMonitor.get().addYarnApp(ConverterUtils.toApplicationId(appId), this);
      }
    }
  }

而实际调用thrift server 端服务的client 端代码

参考

程序员的福音 - Apache Commons Exec - 知乎

Apache Thrift系列详解(一) - 概述与入门 - 掘金

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/537860.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

智能的本质人工智能与机器人领域的64个大问题阅读笔记(三)

目录 机器智能提高到人类的水平或者人类智能下降到机器的水平&#xff0c;都可以到达图灵点。 或许图灵测试是一个自我实现的预言&#xff1a;我们&#xff08;声称&#xff09;在打造“聪明”机器的同时&#xff0c;我们也在把人变笨。 不长脑的机器和不思考的人没什么两样&…

工作利器:三种简单方法将PPT转换成PDF

PDF是一种常用的文件格式&#xff0c;适合数据传输和阅读。在工作中&#xff0c;有时我们需要将PPT文件转换为PDF格式以方便使用。下面是几种将PPT转换为PDF的方法&#xff0c;其中方法二将修改为使用记灵在线工具进行转换。 方法一&#xff1a;直接将文件导出为PPT 一般来说…

OpenHarmony3.1安全子系统-签名系统分析

介绍 应用签名系统主要负责鸿蒙hap应用包的签名完整性校验&#xff0c;以及应用来源识别等功能。 子系统间接口&#xff1a; 应用完整性校验模块给其他模块提供的接口&#xff1b;完整性校验&#xff1a; 通过验签&#xff0c;保障应用包完整性&#xff0c;防篡改&#xff1b;…

postman接口自动化测试

Postman除了前面介绍的一些功能&#xff0c;还有其他一些小功能在日常接口测试或许用得上。今天&#xff0c;我们就来盘点一下&#xff0c;如下所示&#xff1a; 1.数据驱动 想要批量执行接口用例&#xff0c;我们一般会将对应的接口用例放在同一个Collection中&#xff0c;然…

上周,又劝退了10几个...

最近看了很多简历&#xff0c;很多候选人年限不小&#xff0c;但是做的都是一些非常传统的项目&#xff0c;想着也不能通过简历就直接否定一个人&#xff0c;何况现在大环境越来 越难&#xff0c;大家找工作也不容易&#xff0c;于是就打算见一见。 在沟通中发现&#xff0c;由…

chatgpt赋能Python-openpyxl_批注

Openpyxl 批注简介 Openpyxl 是一个用于操作 Microsoft Excel 文件的 Python 库&#xff0c;它提供了许多方便的功能来读取、写入和修改 Excel 文件。其中一个功能是批注&#xff0c;可以在单元格中添加注释或提醒。 Openpyxl 批注的具体用途 Openpyxl 批注在 Excel 工作表中…

应届毕业生第一份C++程序员工作看重什么?我聊聊自己的看法

大家知道应届毕业生的第一份工C程序员工作看重什么&#xff0c;我相信那位同学可能他那个想去做的时候就说啊&#xff0c;因为第二家公司是世界杯公司吗&#xff0c;是单休哈对吧&#xff0c;而且待遇没有另一家高。我相信我们大部分人其实都看中一个&#xff0c;是累不累啊&am…

(浙大陈越版)数据结构 第三章 树(上) 3.1 树和树的表示

目录 3.1.1 引子&#xff08;顺序查找&#xff09; 什么是树 查找 3.1.2 引子 二分查找例子(BinarySearch) 二分查找 3.1.3 引子 二分查找实现 二分查找代码 二分查找的启示 3.1.4 树的定义 一些基本术语&#xff1a; 3.1.5 树的表示 3.1.1 引子&#xff08;顺序查找…

学习Se-net和Sk-net 附网络简单代码(pytorch)

&#xff08;一&#xff09;Se-net的原理和思路     Se-net严格来说是一个小结构&#xff0c;它可以直接插入已有的网络结构中&#xff0c;帮助原有结构获得更好的效果&#xff0c;如插入Resnet网络中。 Se-net的整个流程如下&#xff1a;     &#xff08;1&#xf…

chatgpt赋能Python-opencv_python打开摄像头

OpenCV Python打开摄像头&#xff1a;一种简单的图像处理方式 OpenCV是一种常用的图像处理库&#xff0c;可以用Python编程轻松进行图像和视频处理。其中&#xff0c;打开摄像头也是OpenCV中常用的一种方法。在这篇文章中&#xff0c;我们将介绍OpenCV Python打开摄像头的原理…

chatgpt赋能Python-numpy开根

NumPy开根 在科学计算中&#xff0c;开根运算是一个经常需要进行的操作&#xff0c;它非常有用&#xff0c;可以用来求解方程、计算距离或者简单地将数据压缩成更容易理解的形式等。NumPy是一个强大的库&#xff0c;被广泛地用于Python编程中&#xff0c;它提供了用于开根的特…

chatgpt赋能Python-mofan_python

Mofan Python&#xff1a;一个优秀的入门编程网站 介绍 Mofan Python 是一个致力于帮助人们快速入门 Python 编程的网站。该网站提供了各种编程资源&#xff0c;包括 Python 相关的教程、实例、项目&#xff0c;以及机器学习和深度学习课程等。它的特点在于提供了详细的代码解…

华为OD机试真题 Java 实现【投篮大赛】【2023Q1 100分】

一、题目描述 你现在是一场采用特殊赛制投篮大赛的记录员。 这场比赛由若于回合组成&#xff0c;过去几回合的得分可能会影响以后几回合的得分&#xff0c;比赛开始时&#xff0c;记录是空白的。 你会得到一个记录操作的字符串列表 ops&#xff0c;其中 ops[i] 是你需要记录…

FastDDS安全机制1 - 安全配置

背景 OMG组织对于DDS的安全机制有着对应的定义&#xff0c;其定义在DDS-SECURITY文档中。 这其中主要包含了对应的身份认证、访问控制、通信加密和审计相关的插件。 资料来源&#xff1a;DDS-SECURITY 其实也主要保护了通信过程中的相关安全风险。 资料来源&#xff1a;DDS-S…

轻松保护文档安全:三种实用的PDF加密方法

在我们的日常工作中&#xff0c;经常会使用到PDF格式的文件。为了保护版权和隐私&#xff0c;有时候我们需要对文档进行加密处理。那么&#xff0c;如何对PDF进行加密呢&#xff1f;今天我将为大家介绍几种方法&#xff0c;其中包括记灵在线工具、迅捷PDF编辑器和Speedpdf。 方…

Debian11之 RKE2 部署 K8S 集群

官方地址 资源列表 主机IP主机名称主机角色软件192.168.111.50server1主节点1API Server、controller-manager 和 scheduler192.168.111.51server2主节点2API Server、controller-manager 和 scheduler192.168.111.52server3主节点3API Server、controller-manager 和 schedu…

SocketTools crack所有安全连接的默认安全协议

SocketTools crack所有安全连接的默认安全协议 在所有HTTP客户端组件中添加了对HTTP/2.0协议的支持。 更新了TLS 1.2(及更高版本)和SSH 2.0的安全选项&#xff0c;以使用Microsoft Windows 11和Windows Server 2022中提供的密码套件。较旧、安全性较低的密码套件已被弃用&#…

JavaScript 基础 DOM (二)

事件流 事件流是对事件执行过程的描述 事件捕获 从DOM的根元素开始去执行对应的事件 (从外到里) 事件冒泡 当一个元素的事件被触发时&#xff0c;同样的事件将会在该元素的所有祖先元素中依次被触发。这一过程被称为事件冒 泡 addEventListener 第3个参数决定了事件是在捕获阶…

(数据结构)栈的实现——再一次保姆级教学

目录 1. 栈 ​编辑 1.2 栈的实现 2. 代码的实现 2.1 初始化栈和销毁栈 2.2栈顶元素的插入 2.3栈顶元素的删除 栈元素删除 2.4栈顶元素的获取和栈元素的个数 1. 栈 1.1 栈的概念和结构 栈(Stack)是一种线性存储结构&#xff0c;它具有如下特点&#xff1a; &#xff0…

git的学习3

文章目录 一、git status 命令二、git diff 命令三、git commit 命令四、git reset 命令五、git rm 命令六、git mv 命令七、提交日志1、Git 查看提交历史2、git blame 总结 提交与修改部分 一、git status 命令 git status 命令用于查看在你上次提交之后是否有对文件进行再次…