Flink实时计算引擎入门教程

news2025/1/11 21:51:17

Flink实时计算引擎入门教程

1.简介

Fink是一个开源的分布式,高性能,高可用,准确的实时数据计算框架,它主要优点如下:

流式计算: Fink可以连接处理流式(实时)数据。
容错: Fink提供了有状态的计算,会记录任务的中间状态,当执行失败时可以实现故障恢复。
可伸缩: Fink集群可以支持上千个节点。
高性能: Fink能提供高吞吐,低延迟的性能。
三大实时计算框架对比:

Spark Streaming: 可以处理秒级别延迟的实时数据计算,但是无法处理真正的实时数据计算,适合小型且独立的实时项目。
Storm: 可以处理真正的实时计算需求,但是它过于独立没有自己的生态圈,适合能够接受秒级别延迟不需要Hadoop生态圈的实时项目。
Fink: 新一代实时计算引擎,它包含了Strorm和Spark Streaming的优点,它即可以实现真正意义的实时计算需求,也融入了Hadoop生态圈,适合对性能要求高吞吐低延迟的实时项目。

2.执行流程

![

3.核心三大组件

](https://img-blog.csdnimg.cn/b2dcc5d962344a54bde9c4d149a3cdff.png)

DataSource: 数据源,主要用来接受数据。例如: readTextFile(),socketTextStream(),fromCollection(),以及一些第三方数据源组件。
Transformation: 计算逻辑,主要用于对数据进行计算。例如:map(),flatmap(),filter(),reduce()等类型的算子。
DataSink: 目的地,主要用来把计算的结果数据输出到其他存储介质。例如Kafka,Redis,Elasticsearch等。

4.应用场景

实时ETL: 集成实时数据计算系统现有的诸多数据通道和SQL灵活的加工能力,对实时数据进行清洗、归并和结构化处理。同时,为离线数仓进行有效的补充和优化,为数据实时传输计算通道。
实时报表: 实时采集、加工和存储,实时监控和展现业务指标数据,让数据化运营实时化。
监控预警: 对系统和用户行为进行实时检测和分析,实时检测和发现危险行为。
在线系统: 实时计算各类数据指标,并利用实时结果及时调整在线系统的相关策略。

5.架构原理

Fink常用的两种架构是: Standalone(独立集群)和ON YARN。

Standalone: 独立部署,不依赖Hadoop环境,但是需要使用Zookeeper实现服务的高可用。
ON YARN: 依赖Hadoop环境的YARN实现Flink任务的调度,需要Hadoop版本2.2以上。
Flink ON YARN架构图如下:

在这里插入图片描述

1.客户端将作业提交给 YARN 的资源管理器,这一步中会同时将 Flink 的 Jar 包和配置
上传到 HDFS,以便后续启动 Flink 相关组件的容器。
2.YARN 的资源管理器分配 Container 资源,启动 Flink JobManager,并将作业提交给
JobMaster。这里省略了 Dispatcher 组件。
3.JobMaster 向资源管理器请求资源(slots)。
4.资源管理器向 YARN 的资源管理器请求 container 资源。
5.YARN 启动新的 TaskManager 容器。
6.TaskManager 启动之后,向 Flink 的资源管理器注册自己的可用任务槽。
7.资源管理器通知 TaskManager 为新的作业提供 slots。
8.TaskManager 连接到对应的 JobMaster,提供 slots。
9.JobMaster 将需要执行的任务分发给 TaskManager,执行任务。

Flink ON YARN 在运行的时候可以细分为两种模式。

Session模式: 可以称为会话模式或多任务模式。这种模式会在YARN中初始化一个Flink集群,以后提交的任务都会提交到这个集群中,这个Flink集群会在YARN集群中,除非手动停止。
Per-Job模式: 可以称为单任务模式,这种模式每次提交Flink任务时任务都会创建一个集群,Flink任务之间都是互相独立,互不影响,执行任务资源会释放掉。

6.常用的API

Flink中提供了4种不同层次的API,每种都有对应的使用场景。

Sateful Stream Processing: 低级API,提供了对时间和状态的细粒度控制,简洁性和易用性较差,主要应用一些复杂事件处理逻辑上。
DataStream/DataSet API: 核心API,提供了针对实时数据和离线数据的处理,是对低级API进行的封装,提供了filter(),sum(),max(),min()等高级函数,简单易用。
Table API: 对DataStream/DataSet API做了进一步封装,提供了基于Table对象的一些关系型API。
SQL: 高级语言,Flink的SQL是基于Apache Calcite开发的,实现了标准SQL(类似于Hive的SQL),使用起来比其他API更加方便。Table API和SQL可以很容易结合使用,它们都返回Table对象。
在工作中能用SQL解决的优先使用SQL,复杂一些的考虑DataStream/DataSet API。

DataStreamAPI中常用的Transformation函数。
![](https://img-blog.csdnimg.cn/36dcad6eb7d7402182d93c2f6203cbf7.png
)

7.java编写flink程序

引入依赖,此文用的flink版本是1.15.2。

    <properties>
        <flink.version>1.15.2</flink.version>
        <java.version>1.8</java.version>
        <slf4j.version>1.7.30</slf4j.version>
        <!--flink依赖的作用域 provided 表示表示该依赖包已经由目标容器提供,compile 标为默认值 -->
        <flink.scope>compile</flink.scope>
    </properties>

    <dependencies>
        <!-- core dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>${flink.scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>${flink.scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>${flink.scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>${flink.scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
            <scope>${flink.scope}</scope>
        </dependency>

        <!-- test dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-test-utils</artifactId>
            <version>${flink.version}</version>
            <scope>test</scope>
        </dependency>

        <!-- 引入日志管理相关依赖-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>

    </dependencies>

    <build>
        <finalName>flink</finalName>
        <plugins>
            <!-- scala 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.2.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.ljm.hadoop.flink.Main</mainClass>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

DataStream执行模式:
从1.12.0版本以后,flink实现了api的流批一体化处理。DataStream新增一个执行模式(execution mode),通过设置不同的执行模式,即可实现流处理与批处理之间的切换,这样一来,dataSet基本就被废弃了。

STREAMING: 流执行模式(默认)
BATCH: 批执行模式
AUTOMATIC: 在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。

在这里插入图片描述
以下为DataStream相关Api在Java中的简单应用

public class Main {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置执行模式
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        map(env);
//        flatMap(env);
//        union(env);
//        connect(env);
//        socketTextStream(env);
        env.execute("testJob");
    }

    /**
     * 对数据处理
     */
    private static void map(StreamExecutionEnvironment env) {
        //在测试阶段,可以使用fromElements构造数据流
        DataStreamSource<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 9, 11);
        //处理数据
        SingleOutputStreamOperator<Integer> numStream = data.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer num) throws Exception {
                return num - 1;
            }
        });
        //使用一个线程打印数据
        numStream.print().setParallelism(1);
        //多线程输出(最大值=cpu总核数)
        //numStream.print();
    }

    /**
     * 将数据中的每行数据根据符号拆分为单词
     */
    private static void flatMap(StreamExecutionEnvironment env) {
        DataStreamSource<String> data = env.fromElements("hello,world", "hello,hadoop");
        //读取文件内容,文件内容格式  hello,world
        //DataStreamSource<String> data =  env.readTextFile("D:\\java\\hadoop\\text.txt");
        //处理数据
        SingleOutputStreamOperator<String> wordStream = data.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                String[] words = line.split(",");
                for (String word : words) {
                    collector.collect(word);
                }
            }
        });
        wordStream.print().setParallelism(1);
    }

    /**
     * 过滤数据中的奇数
     */
    private static void filter(StreamExecutionEnvironment env) {
        DataStreamSource<Integer> data1 = env.fromElements(1, 2, 3, 4, 5, 6, 7);
        //处理数据
        SingleOutputStreamOperator<Integer> numStream = data1.filter(new FilterFunction<Integer>() {
            @Override
            public boolean filter(Integer num) throws Exception {
                return num % 2 == 1;
            }
        });
        numStream.print().setParallelism(1);
    }

    /**
     * 将两个流中的数字合并
     */
    private static void union(StreamExecutionEnvironment env) {
        //第1份数据流
        DataStreamSource<Integer> data1 = env.fromElements(1, 2, 3, 4);
        //第2份数据流
        DataStreamSource<Integer> data2 = env.fromElements(3, 4, 5, 6);
        //合并流
        DataStream unionData = data1.union(data2);
        unionData.print().setParallelism(1);
    }

    /**
     * 将两个数据源中的数据关联到一起
     */
    private static void connect(StreamExecutionEnvironment env) {
        //第1份数据流
        DataStreamSource<String> data1 = env.fromElements("user:tom,age:18");
        //第2份数据流
        DataStreamSource<String> data2 = env.fromElements("user:jack_age:18");
        //连接两个流
        ConnectedStreams<String, String> connectedStreams = data1.connect(data2);
        //处理数据
        SingleOutputStreamOperator<String> resStream = connectedStreams.map(new CoMapFunction<String, String, String>() {
            @Override
            public String map1(String s) throws Exception {
                return s.replace(",", "-");
            }
            @Override
            public String map2(String s) throws Exception {
                return s.replace("_", "-");
            }
        });
        resStream.print().setParallelism(1);
    }

    /**
     * 每隔3秒重socket读取数据
     */
    private static void socketTextStream(StreamExecutionEnvironment env) {
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        //加载数据源
        DataStreamSource<String> data = env.socketTextStream("127.0.0.1", 9001);
        //数据处理
        SingleOutputStreamOperator<String> wordStream = data.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                String[] words = line.split(",");
                for (String word : words) {
                    collector.collect(word);
                }
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordCountStream = wordStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
        //根据Tuple2中的第1列分组
        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = wordCountStream.keyBy(0);
        //窗口滑动设置,对指定时间窗口(例如3s内)内的数据聚合统计,并且把时间窗口内的结果打印出来
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> windowedStream = keyedStream.timeWindow(Time.seconds(3));
        //根据Tuple2中的第2列进行合并数据
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumRes = windowedStream.sum(1);
        //数据输出
        sumRes.print();
    }

}

上面示例的socketTextStream方法中用到了socketTextStream函数需要通过netnat工具发送数据

netnat工具下载
在netnat目录下执行 nc -L -p 9001 -v
在这里插入图片描述
运行socketTextStream方法,可以发现控制台打印了数据
在这里插入图片描述
上图中的3和5表示线程Id,如果只需要单线程打印则需要在print()后面追加setParallelism(1);

sumRes.print().setParallelism(1);

8.把flink程序部署到hadoop环境上面运行

8.1.安装flink程序

flink下载地址,下载1.15.2版本然后上传到服务器 /home/soft/目录下解压

tar -zxvf flink-1.15.2-bin-scala_2.12.tgz

flink客户端节点上需要设置HADOOP_HOME和HADOOP_CLASSPATH环境变量

vi /etc/profile
export HADOOP_HOME=/home/soft/hadoop-3.2.4
export HADOOP_CLASSPATH=`${HADOOP_HOME}/bin/hadoop classpath`
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
source /etc/profile

8.2.编译java开发的flink应用

使用socketTextStream接受socket传输的数据
在这里插入图片描述

修改socketTextStream方法里面的代码,把127.0.0.1改成netcat工具部署机器ip地址

DataStreamSource<String> data = env.socketTextStream("192.168.239.128", 9001);

需要把pom.xml文件中flink.scope属性值设置为provided,这些依赖不需要打进Jar包中。

<properties>
    <flink.scope>provided</flink.scope>
</properties>

执行命令打包

mvn clean package

在这里插入图片描述

把flink-1.0-SNAPSHOT.jar上传至/home/soft/flink-1.15.2目录下然后提交任务

8.3.提交Flink任务到YARN集群中

cd /home/soft/flink-1.15.2
bin/flink  run -m yarn-cluster -yjm 1024 -ytm 1024   flink-1.0-SNAPSHOT.jar

参数说明
bin/flink: 这个脚本启动的是Per-Job,bin/yarn-session.sh 则启动的是Session模式的
-m: 指定模式,yarn-cluster=集群模式,yarn-client=客户端模式
-yjm:每个JobManager内存 (default: MB)
-ytm:每个TaskManager内存 (default: MB)

在这里插入图片描述

8.4.测试任务并查看结果
在服务器上面安装netcat工具,然后发送数据,这台机器的ip必须和Java编写的Flink程序一致。

yum install nc
nc -l 9001

使用浏览器访问: http://hadoop集群主节点ip:8088/cluster可以看到已提交的Flink任务,然后下图的点击顺序可以看到任务的执行结果

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

8.5.停止任务
通过YARN命令停止

yarn application -kill  application_1665651925022_0008  

或通过Flink命令停止

bin/flink cancel -yid application_1665651925022_0008  a39f8b9258c9b9d0c17eca768c5b54c3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/153963.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【软件测试】关于BUG的那些点点滴滴

关于BUG1. 如何合理的创建Bug1.1 创建Bug的要素2. Bug 的级别3. Bug 的生命周期3.1 Bug的状态3.2 Bug的生命周期4. 提出Bug后&#xff0c;跟开发产生争执怎么办1. 如何合理的创建Bug 1.1 创建Bug的要素 问题的版本&#xff0c;如浏览器的版本问题的环境&#xff0c;如windows…

融合注意力模块CBAM基于轻量级yolov5n开发共享单车目标检测系统

在很多的项目实战中验证分析注意力机制的加入对于模型最终性能的提升发挥着积极正向的作用&#xff0c;在我之前的一些文章里面也做过了一些尝试&#xff0c;这里主要是想基于轻量级的n系列模型来开发构建共享单车检测系统&#xff0c;在模型中加入CBAM模块&#xff0c;以期在轻…

web自动化测试---使用java+selenium+Junit

目录 1.什么是自动化以及为什么要进行自动化 2.为什么选择selenium作为web自动化工具 3.selenium环境部署 4.什么是驱动以及驱动的原理 5.selenium的基础语法和操作 5.1定位元素 5.2元素的操作 5.3等待 5.4信息打印 5.5窗口 5.6导航 5.7弹窗 5.8鼠标、键盘操作 5.…

Python + Appium 自动化操作微信入门

Appium 是一个开源的自动化测试工具&#xff0c;支持 Android、iOS 平台上的原生应用&#xff0c;支持 Java、Python、PHP 等多种语言。 Appium 封装了 Selenium&#xff0c;能够为用户提供所有常见的 JSON 格式的 Selenium 命令以及额外的移动设备相关的控制命令&#xff0c;…

虚拟化技术

虚拟化 虚拟化技术 目前虚拟化技术有软件模拟、全虚拟化&#xff08;使用二进制翻译&#xff09;、半虚拟化&#xff08;操作系统辅助&#xff09;、硬件辅助虚拟化和容器虚拟化这几种。 &#xff08;1&#xff09;软件虚拟化 软件模拟是通过软件完全模拟cpu、芯片组、磁盘、…

Spring Cloud Gateway 远程代码执行漏洞(CVE-2022-22947)

参考链接&#xff1a;https://blog.csdn.net/xiaobai_20190815/article/details/124045768 http://news.558idc.com/290335.html Java 安全-手把手教你SPEL表达式注入_4ct10n的博客-CSDN博客_spel注入 一、漏洞描述 Spring Cloud Gateway 是基于 Spring Framework 和 Spring…

组学新品|“4K”微生态之肠道菌群深度宏基因组

1.“4K”微生态定义 人体微生物群是人体内部与体表所有微生物有机体的总称[1]&#xff0c;其组成包括非细胞结构的病毒&#xff08;包括噬菌体&#xff09;、原核生物中的真细菌和古细菌&#xff0c;以及真核细胞微生物。与之对应&#xff0c;微生物群可以分为病毒群、细菌群、…

Redis未授权访问漏洞(二)Webshell提权篇

前言 在学习这篇文章之前&#xff0c;请先通过这篇文章Redis未授权访问漏洞(一)先导篇学习一下基础知识&#xff0c;再来学习这篇文章。 webshell提权 环境准备 攻击机&#xff1a; Centos7 IP:192.168.44.130 靶机&#xff1a;Centos7 IP:192.168.44.129 首先我们需要准备好…

QuartzNet的基本使用,Scheduler,Job,Trigger的应用

Quartz.Net的基本使用方法 Quartz.Net的基本使用是比较简单的&#xff0c;主要是对下面三个工具的创建和使用。 Scheduler调度器Job执行的动作Trigger触发器 Scheduler的创建和使用 scheduler的创建有几种不同的方式,但一般可以直接使用其提供的工厂类直接创建 通过工厂类…

分享92个NET源码,总有一款适合您

NET源码 分享92个NET源码&#xff0c;总有一款适合您 92个NET源码下载链接&#xff1a;https://pan.baidu.com/s/1Ya4GMXuHhNbqkLU5b7SPEw?pwd5vpx 提取码&#xff1a;5vpx 下面是文件的名字&#xff0c;我放了一些图片&#xff0c;文章里不是所有的图主要是放不下...&…

【指针面试题】-还在为学过指针的理论只是不会实践吗??赶紧来看看这里讲解了许多指针的面试题,带你更好掌握指针!!!

&#x1f387;作者&#xff1a;小树苗渴望变成参天大树 &#x1f9e8;作者宣言&#xff1a;认真写好每一篇博客 &#x1f38a;作者gitee:link &#x1f389;指针笔试题详解&#x1f4a5;前言&#x1f4a6;一、指针和数组面试题解析&#x1f4a8;1.1一维数组&#x1f4a2;1.2字…

Episode 01 密码技术基础

一、加密和解密&&发送者、接收者和窃听者 发送者&#xff1a;发出消息的人 接收者&#xff1a;接收消息的人 窃听者&#xff1a;恶意获取消息的第三方 加密&#xff1a;发送者将明文转换为密文 解密&#xff1a;接收者将密文还原为明文 破译&#xff1a;接收者以外…

客流统计系统为什么深受门店商户的喜爱?

人数或人群流动信息进行精确的统计分析&#xff0c;而且还可以利用这些高精度的数据&#xff0c;还能获得该区域精确的客流量信息&#xff0c;实现有效合理的组织运营工作&#xff0c;提升工作效益&#xff0c;帮助使用者开展更有成效的组织工作。客流统计系统内置高性能CMOS图…

同源跨域的概念与实现跨域的几种方案

本文将结合周老师的讲义对同源与跨域这一前端经典问题进行系统的总结、整理。一起来坐牢&#xff0c;快&#xff01; 1. 同源限制 1.1 历史背景 - 含义的转变 1995年&#xff0c;同源政策由 Netscape 公司引入浏览器。目前&#xff0c;所有浏览器都实行这个政策。 最初&…

【无标题】javaEE初阶---多线程(面试常用)

这篇文章 , 我将主要介绍多线程进阶部分的内容 . 主要涉及到一些在面试中常考的内容。 一:常见的锁策略 1.1乐观锁和悲观锁 乐观锁 : 预测接下来发生锁冲突的可能性不大 , 而进行的一类操作; 悲观锁 : 预测接下来发生锁冲突的可能性很大 , 而进行的一类操作.乐观锁 : 假设数…

ClassIn:如何打造更稳定的Zabbix监控系统

作者简介&#xff1a;罗呈祥。现就职于北京翼鸥教育科技有限公司&#xff0c;负责数据库相关的运维管理和技术支持工作&#xff0c;擅长故障处理和性能优化&#xff0c;对分布式数据库也有深入研究。 近期&#xff0c;OceanBase 社区发布了一篇关于我们公司选型分布式数据库的文…

bilateral_filter 双边滤波器详细用法

一、双边滤波&#xff08;Bilateral filter&#xff09;是一种可以保边去噪的滤波器。其输出像素的值依赖于邻域像素的值的加权组合。 从效果来说&#xff0c;双边滤波可产生类似美肤的效果。皮肤上的皱纹和斑&#xff0c;与正常皮肤的差异&#xff0c;远小于黑白眼珠之间的差异…

13种Shell逻辑与算术,能写出5种算你赢!

相较于最初的 Bourne shell&#xff0c;现代 bash 版本的最大改进之一体现在算术方面。早期的 shell 版本没有内建的算术功能&#xff0c;哪怕是给变量加1&#xff0c;也得调用单独的程序来完成。 1、算术方法一&#xff1a; $(( )) 只要都是整数运算&#xff0c;就可以在 $(…

DHT11温湿度传感器初识

目录 一、产品概述 1、接线方式 2、特点 3、数据传送逻辑 二、发送时序检测模块是否存在 1、C51单片机&#xff08;主机&#xff09;时序分析 2、编写代码检测模块是否存在 3、读取DHT11数据的时序分析 三、温湿度通过串口传到PC显示 四、温湿度检测小系统——使数据…

Discrete Opinion Tree Induction for Aspect-based Sentiment Analysis 论文阅读笔记

一、作者 Chenhua Chen、Zhiyang Teng、Zhongqing Wang、Yue Zhang School of Engineering, Westlake University, China Institute of Advanced Technology, Westlake Institute for Advanced Study Soochow University 二、背景 如何为每一个方面词定位相应的意见上下文…