国产开源流批统一的数据同步工具Chunjun入门实战

news2025/1/24 14:54:52

文章目录

  • 概述
    • 定义
    • 特性
  • 部署
    • 安装
    • 版本对应关系
    • 通用配置详解
      • 整体配置
      • Content 配置
      • Setting 配置
    • Local提交
    • Standalone提交
      • Json方式使用
      • SQL方式使用
        • MySQL Sink
        • Kafka Sink

概述

定义

Chunjun 官网 https://dtstack.github.io/chunjun-web/ 源码release最新版本1.12.8

Chunjun 文档地址 https://ververica.github.io/flink-cdc-connectors/master/

Chunjun 源码地址 https://github.com/DTStack/chunjun

Chunjun是一个分布式集成框架,原名是FlinkX,由袋鼠云开源,其基于Flink的批流统一打造的数据同步工具,可以实现各种异构数据源之间的数据同步和计算。

ChunJun是一个基于 Flink 提供易用、稳定、高效的批流统一的数据集成工具,可以采集静态的数据如 MySQL,HDFS 等,也可以采集实时变化的数据如 binlog,Kafka等。

特性

  • 易使用:基于JSON模板和SQL脚本 快速构建数据同步任务,SQL脚本兼容Flink SQL语法;只需要关注数据源的结构信息即可, 节省了时间,专注于数据集成的开发。FlinkX既支持数据同步、实时采集,也支持SQL流与维表的Join,实现了一套插件完成数据的同步、转换与计算。
  • 基于 Flink:基于flink 原生的input,output 相关接口来实现多种数据源之间的数据传输,同时可以基于 flink 扩展插件。易于扩展,高度灵活,新扩展的数据源插件可以瞬间与现有的数据源插件集成,插件开发人员无需关心其他插件的代码逻辑;
  • 多种运行模式:支持分布式算子 支持 flink-standalone、yarn-session、 yarn-per job 及其他提交任务方式。支持Docker一键式部署,支持在k8s上部署和运行,支持使用native kuberentes方式以session和run-application模式提交任务。
  • 关键特性
    • 多种数据源之间数据传输 ,支持MySQL、Oracle、SQLServer、Hive、Kudu等20多种数据源的同步计算
    • 断点续传 :配合flink检查点机制,实现断点恢复、任务容灾。比如针对断点续传主要是设置断点续传字段和断点续传字段在reader里的column的位置,当然前提任务也是得开启checkpoint。
      • 部分插件支持通过Flink的checkpoint机制从失败的位置恢复任务。断点续传对数据源 ️强制要求:
        • 必须包含一个升序的字段,比如主键或者日期类型的字段,同步过程中会使用checkpoint机制记录这个字段的值,任务恢复运行时使用这个字段构造查询条件过滤已经同步过的数据,如果这个字段的值不是升序的,那么任务恢复时过滤的数据就是错误的,最终导致数据的缺失或重复。
        • 数据源必须支持数据过滤,如果不支持的话,任务就无法从断点处恢复运行,会导致数据重复。
        • 目标数据源必须支持事务,比如关系数据库,文件类型的数据源也可以通过临时文件的方式支持。
    • 全量与增量同步:不仅支持同步DML数据,还支持DDL同步,如’CREATE TABLE’, 'ALTER COLUMN’等;比如利用增量键,数据库表中增量递增的字段,比如自增id及其开始位置。
    • 实时采集:既支持离线同步计算,又兼容实时场景;实时数据还原。
    • FlinkX支持二阶段提交,目前FlinkX几乎所有插件都支持二阶段提交。
    • FlinkX支持数据湖 Iceberg,可以流式读取和写入Iceberg数据湖,未来也会加入Hudi支持。
    • 流控管理:大数据同步时在负载高的时候有时候会给系统带来很大的压力,FlinkX使用令牌桶限流方式限速,当源端产生数据的速率达到一定阈值就不会读取数据。
    • 大多数插件支持数据的并发读写,可以大大提高读写速度;
    • 脏数据管理:异构系统执行大数据迁移不可避免的会有脏数据产生,脏数据会影响同步任务的执行,FlinkX的Writer插件在写数据是会把以下几种类型作为脏数据写入脏数据表里:
      • 类型转换错误
      • 空指针
      • 主键冲突
      • 其它错误

部署

安装

  • 部署Flink集群(使用前面)
  • 获取源码编译打包
# 最新release版本源码flink12.7,如果是下载主线master版本,目前源码默认引入flink16.1,可以通过git clone https://github.com/DTStack/chunjun.git也可以直接http下main,由于是学习可使用master版本来踩坑
wget https://github.com/DTStack/chunjun/archive/refs/tags/v1.12.8.tar.gz
tar -xvf v1.12.8.tar.gz
# 进入源码目录
cd chunjun-1.12.8/
# 编译打包执行,下面两种选一
./mvnw clean package
sh build/build.sh

在根目录下生成 chunjun-dist目录,官方提供丰富的示例程序,详细可以查看chunjun-examples目录

image-20230703153401950

版本对应关系

下表显示了ChunJun分支与flink版本的对应关系。如果版本没有对齐,在任务中会出现’Serialization Exceptions’, 'NoSuchMethod Exception’等问题。

image-20230703144939648

通用配置详解

整体配置

一个完整的 ChunJun 任务脚本配置包含 content, setting 两个部分。content 用于配置任务的输入源与输出源,其中包含 reader,writer。而 setting 则配置任务整体的环境设定,其中包含 speed,errorLimit,metricPluginConf,restore,log,dirty。总体结构如下所示:

{
    "job" : {
       "content" :[{
            "reader" : {},
            "writer" : {}
       }],
       "setting" : {
          "speed" : {},
          "errorLimit" : {},
          "metricPluginConf" : {},
          "restore" : {},
          "log" : {},
          "dirty":{}
        }
    }
}

image-20230704143608057

Content 配置

reader 用于配置数据的输入源,即数据从何而来。具体配置如下所示:

"reader" : {
  "name" : "xxreader",
  "parameter" : {
        ......
  }
}

image-20230704143924661

Writer 用于配置数据的输出目的地,即数据写到哪里去。具体配置如下所示:

"writer" : {
  "name" : "xxwriter",
  "parameter" : {
        ......
  }
}

image-20230704143951964

Setting 配置

  • speed 用于配置任务并发数及速率限制。具体配置如下所示
  • errorLimit 用于配置任务运行时数据读取写入的出错控制。
  • metricPluginConf 用于配置 flinkx 指标相关信息。目前只应用于 Jdbc 插件中,在作业结束时将 StartLocation 和 EndLocation 指标发送到指定数据源中。目前支持 Prometheus 和 Mysql。
  • restore 用于配置同步任务类型(离线同步、实时采集)和断点续传功能。
  • log 用于配置 ChunJun 中定义的插件日志的保存与记录。
  • dirty 用于配置脏数据的保存,通常与 ErrorLimit 联合使用。

详细使用查看官方的说明

Local提交

进入Chunjun根目录,测试脚本执行本地环境,查看stream.json

{
  "job": {
    "content": [
      {
        "reader": {
          "parameter": {
            "column": [
              {
                "name": "id",
                "type": "id"
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "content",
                "type": "string"
              }
            ],
            "sliceRecordCount": [
              "30"
            ],
            "permitsPerSecond": 1
          },
          "table": {
            "tableName": "sourceTable"
          },
          "name": "streamreader"
        },
        "writer": {
          "parameter": {
            "column": [
              {
                "name": "id",
                "type": "id"
              },
              {
                "name": "name",
                "type": "string"
              }
            ],
            "print": true
          },
          "table": {
            "tableName": "sinkTable"
          },
          "name": "streamwriter"
        },
        "transformer": {
          "transformSql": "select id,name from sourceTable where CHAR_LENGTH(name) < 50 and CHAR_LENGTH(content) < 50"
        }
      }
    ],
    "setting": {
      "errorLimit": {
        "record": 100
      },
      "speed": {
        "bytes": 0,
        "channel": 1,
        "readerChannel": 1,
        "writerChannel": 1
      }
    }
  }
}
bash ./bin/chunjun-local.sh  -job chunjun-examples/json/stream/stream.json

Standalone提交

Json方式使用

将依赖文件复制到Flink lib目录下,这个复制操作需要在所有Flink cluster机器上执行

cp -r chunjun-dist $FLINK_HOME/lib

启动Flink Standalone环境

sh $FLINK_HOME/bin/start-cluster.sh

准备mysql的数据,作为读取数据源

image-20230703170916124

准备job文件,创建chunjun-examples/json/mysql/mysql_hdfs_polling_my.json

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column" : [
              {
                "name" : "id",
                "type" : "bigint"
              },{
                  "name" : "name",
                  "type" : "varchar"
              },{
                  "name" : "age",
                  "type" : "bigint"
              }
            ],
            "splitPk": "id",
            "splitStrategy": "mod",
            "increColumn": "id",
            "startLocation": "1",
            "username": "root",
            "password": "123456",
            "queryTimeOut": 2000,
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://mysqlserver:3308/my_maxwell_01?useSSL=false"
                ],
                "table": [
                  "account"
                ]
              }
            ],
            "polling": false,
            "pollingInterval": 3000
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "fileType": "text",
            "path": "hdfs://myns/user/hive/warehouse/chunjun.db/kudu_txt",
            "defaultFS": "hdfs://myns",
            "fileName": "pt=1",
            "fieldDelimiter": ",",
            "encoding": "utf-8",
            "writeMode": "overwrite",
            "column": [
              {
                "name": "id",
                "type": "BIGINT"
              },
              {
                "name": "VARCHAR",
                "type": "VARCHAR"
              },
              {
                "name": "age",
                "type": "BIGINT"
              }
            ],
            "hadoopConfig": {
              "hadoop.user.name": "root",
              "dfs.ha.namenodes.ns": "nn1,nn2",
              "fs.defaultFS": "hdfs://myns",
              "dfs.namenode.rpc-address.ns.nn2": "hadoop1:9000",
              "dfs.client.failover.proxy.provider.ns": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
              "dfs.namenode.rpc-address.ns.nn1": "hadoop2:9000",
              "dfs.nameservices": "myns",
              "fs.hdfs.impl.disable.cache": "true",
              "fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem"
            }
          }
        }
      }
    ],
    "setting" : {
      "restore" : {
        "restoreColumnName" : "id",
        "restoreColumnIndex" : 0
      },
      "speed" : {
        "bytes" : 0,
        "readerChannel" : 3,
        "writerChannel" : 3
      }
    }
  }
}

启动同步任务

bash ./bin/chunjun-standalone.sh  -job chunjun-examples/json/mysql/mysql_hdfs_polling_my.json

任务执行完后通过web控制台可以看到执行成功信息,查看HDFS路径数据也可以看到刚刚成功写入的数据

image-20230703171408397

SQL方式使用

MySQL Sink

创建一个个Kafka的topic用于数据源读取

kafka-topics.sh --create --zookeeper zk1:2181,zk2:2181,zk3:2181 --replication-factor 3 --partitions 3 --topic my_test1

ClickHouse创建testdb数据库和sql_side_table表

CREATE DATABASE IF NOT EXISTS testdb;

CREATE TABLE if not exists sql_side_table
(
    id Int64,
    test1 Int64,
    test2 Int64
) ENGINE = MergeTree()
PRIMARY KEY (id);
insert into sql_side_table values(1,11,101),(2,12,102),(3,13,103);

MySQL创建sql_sink_table表

CREATE TABLE `sql_sink_table` (
  `id` bigint NOT NULL,
  `name` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL,
  `test1` bigint DEFAULT NULL,
  `test2` bigint DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci

创建sql文件chunjun-examples/sql/clickhouse/kafka_clickhouse_my.sql

CREATE TABLE source (
  id   BIGINT,
  name STRING
) WITH (
  'connector' = 'kafka-x',
  'topic' = 'my_test1',
  'properties.bootstrap.servers' = 'kafka1:9092',
  'properties.group.id' = 'dodge',
  'format' = 'json'
);

CREATE TABLE side (
  id BIGINT,
  test1 BIGINT,
  test2 BIGINT
) WITH (
  'connector' = 'clickhouse-x',
  'url' = 'jdbc:clickhouse://ck1:8123/testdb',
  'table-name' = 'sql_side_table',
  'username' = 'default',
  'lookup.cache-type' = 'lru'
);

CREATE TABLE sink (
  id BIGINT,
  name VARCHAR,
  test1 BIGINT,
  test2 BIGINT
)WITH (
      'connector' = 'mysql-x',
      'url' = 'jdbc:mysql://mysqlserver:3306/test',
      'table-name' = 'sql_sink_table',
      'username' = 'root',
      'password' = '123456',
      'sink.buffer-flush.max-rows' = '1024',
      'sink.buffer-flush.interval' = '10000',
      'sink.all-replace' = 'true'
      );

INSERT INTO sink
  SELECT
    s1.id AS id,
    s1.name AS name,
    s2.test1 AS test1,
    s2.test2 AS test2
  FROM source s1
  JOIN side s2
  ON s1.id = s2.id

启动同步任务

bash ./bin/chunjun-standalone.sh  -job chunjun-examples/sql/clickhouse/kafka_clickhouse_my.sql

往kafka的my_test1这个topic写入数据

./kafka-console-producer.sh --broker-list cdh1:9092 --topic my_test1
{"id":1,"name":"sunhaiyang"}
{"id":2,"name":"gulili"}

查看MySQL的sql_sink_table表已经有刚才写入消息并关联出结果的数据

image-20230704133902927

Kafka Sink

创建两个Kafka的topic,一个用于数据源读取,一个用于数据源写入

kafka-topics.sh --create --zookeeper zk1:2181,zk2:2181,zk3:2181 --replication-factor 3 --partitions 3 --topic my_test3
kafka-topics.sh --create --zookeeper zk1:2181,zk2:2181,zk3:2181 --replication-factor 3 --partitions 3 --topic my_test4

创建sql文件chunjun-examples/sql/kafka/kafka_kafka_my.sql

CREATE TABLE source_test (
    id INT
    , name STRING
    , money decimal
    , datethree timestamp
    , `partition` BIGINT METADATA VIRTUAL -- from Kafka connector
    , `topic` STRING METADATA VIRTUAL -- from Kafka connector
    , `leader-epoch` int METADATA VIRTUAL -- from Kafka connector
    , `offset` BIGINT METADATA VIRTUAL  -- from Kafka connector
    , ts TIMESTAMP(3) METADATA FROM 'timestamp' -- from Kafka connector
    , `timestamp-type` STRING METADATA VIRTUAL  -- from Kafka connector
    , partition_id BIGINT METADATA FROM 'partition' VIRTUAL   -- from Kafka connector
    , WATERMARK FOR datethree AS datethree - INTERVAL '5' SECOND
) WITH (
      'connector' = 'kafka-x'
      ,'topic' = 'my_test3'
      ,'properties.bootstrap.servers' = 'kafka1:9092'
      ,'properties.group.id' = 'test1'
      ,'scan.startup.mode' = 'earliest-offset'
      ,'format' = 'json'
      ,'json.timestamp-format.standard' = 'SQL'
      ,'scan.parallelism' = '2'
      );

CREATE TABLE sink_test
(
    id INT
    , name STRING
    , money decimal
    , datethree timestamp
    , `partition` BIGINT
    , `topic` STRING
    , `leader-epoch` int
    , `offset` BIGINT
    , ts TIMESTAMP(3)
    , `timestamp-type` STRING
    , partition_id BIGINT
) WITH (
      'connector' = 'kafka-x'
      ,'topic' = 'my_test4'
      ,'properties.bootstrap.servers' = 'kafka1:9092'
      ,'format' = 'json'
      ,'sink.parallelism' = '2'
      ,'json.timestamp-format.standard' = 'SQL'
      );

INSERT INTO sink_test
SELECT *
from source_test;

往kafka的my_test3这个topic写入数据

kafka-console-producer.sh --broker-list cdh1:9092 --topic my_test3
{"id":100,"name":"guocai","money":243.18,"datethree":"2023-07-03 22:00:00.000"}
{"id":101,"name":"hanmeimei","money":137.32,"datethree":"2023-07-03 22:00:01.000"}

启动同步任务

bash ./bin/chunjun-standalone.sh  -job chunjun-examples/sql/kafka/kafka_kafka_my.sql

查看kafka的my_test4的数据,已经收到相应数据并打上kafka元数据信息

kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic my_test4 --from-beginning

image-20230704104110791

  • 本人博客网站IT小神 www.itxiaoshen.com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/718031.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

代码源 扫描线权值线段树 板子

矩形面积并&#xff08;存档&#xff09; 矩形面积并 - 题目 - Daimayuan Online Judge 题意&#xff1a; Code&#xff08;存档&#xff0c;还没写完&#xff09;&#xff1a; #include <bits/stdc.h>#define y1 Y1 #define int long longusing namespace std;const…

k8s的资源配置清单的管理操作

目录 一、资源配置清单1.1 初步认识资源清单中svc的重要配置项1.2 手动编写 svc资源配置1.3 手动生成模板&#xff0c;再编写资源清单1.4 写yaml太累怎么办&#xff1f;1.5 官网下载资源模板 一、资源配置清单 Kubernetes 支持 YAML 和 JSON 格式管理资源对象 JSON 格式&#…

HOT45-二叉树的右视图

leetcode原题链接&#xff1a;二叉树的右视图 题目描述 给定一个二叉树的 根节点 root&#xff0c;想象自己站在它的右侧&#xff0c;按照从顶部到底部的顺序&#xff0c;返回从右侧所能看到的节点值。 示例 1: 输入: [1,2,3,null,5,null,4] 输出: [1,3,4]示例 2: 输入: [1,n…

django celery简单 例子

django celery简单 例子 https://docs.celeryq.dev/en/latest/django/first-steps-with-django.html pip list pip install Django4.2.3 pip install redis4.6.0 pip install celery5.3.1 pip install SQLAlchemy2.0.17 source demo1_venv/bin/activate django-admin start…

激光雷达在ADAS测试中的应用与方案

在科技高速发展的今天&#xff0c;汽车智能化已是必然的趋势&#xff0c;且自动驾驶汽车的研究也在世界范围内进行得如火如荼。而在ADAS测试与开发中&#xff0c;激光雷达以其高性能和高精度占据着非常重要的地位&#xff0c;它是ADAS测试与开发中不可缺少的组成。 一 激光雷达…

sendRedirect进行页面重定向无反应

问题 sendRedirect进行页面重定向无反应 详细问题 笔者使用ServletJSP作为技术框架&#xff0c;使用AJAX进行数据请求&#xff0c;后程序运行完成 response.sendRedirect("请求链接");并没有按照笔者预期&#xff0c;进行页面重定向 请求端核心代码 $.ajax({url…

nginx报403 Forbidden错误

nginx是以root启动的&#xff0c;将 "user nobody" 改为 "user root" 刷新一下配置&#xff1a; /usr/local/nginx/sbin/nginx -t /usr/local/nginx/sbin/nginx -s reload

PCL点云处理之细小空洞填补 (一百九十八)

PCL点云处理之细小空洞填补 (一百九十八) 一、算法介绍二、具体实现1.代码2.结果一、算法介绍 点云扫描过程中,由于遮挡或其他原因,可能存在一些细小空洞,有可能造成数据处理上一些问题,这里介绍一种填补细小空洞的方法。具体方法和效果如下所示 二、具体实现 1.代码…

SpringBoot3【⑤ 核心原理】

1. 事件和监听器 1. 生命周期监听 场景&#xff1a;监听应用的生命周期 1. 监听器-SpringApplicationRunListener 自定义SpringApplicationRunListener来监听事件&#xff1b; 1.1. 编写SpringApplicationRunListener 这个接口的实现类 1.2. 在 META-INF/spring.factories …

前端Vue组件Mixin技术

前端vue组件开发的一大优势在于可以提高代码的复用性。极大的提升开发效率&#xff0c;通过Mixin技术&#xff0c;我们可以实现类似继承的效果&#xff0c;组件的复用性可以得到加强。 当我们开发前端项目时&#xff0c;可能会定义非常多的组件&#xff0c;这些组件中可能有部…

SpringBoot3【⑥ 场景整合:①NoSQL:Redis】

0. Docker安装 输入如下参数 sudo yum install -y yum-utilssudo yum-config-manager \--add-repo \https://download.docker.com/linux/centos/docker-ce.reposudo yum install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-pluginsudo syst…

Jupyter Notebook的内核添加新的虚拟环境

最近&#xff0c;在搭建环境的时候发现 Jupyter Notebook 的内核只有基础的python和pytorch&#xff0c;现在我想要在 Jupyter Notebook 中使用新的虚拟环境。 下面是解决的方法&#xff1a; &#xff08;1&#xff09;首先在Anaconda Prompt中激活虚拟环境&#xff0c;比如我…

Android TV:自定义Leanback的VideoDetailsFragment

在Android studio新建TV项目的demo上做修改,实现一下需求: 1、去掉顶部背景区域 2、修改中间详情区域高度 3、修改整体背景界面 效果如图: 搜遍全网,没有找到一个解决方案。只能考自己看代码来自定义实现了。 1、去掉顶部背景区域: VideoDetailsFragment中重写setupD…

SpringBoot 的 概念、创建和运行

目录 1.什么是Spring Boot&#xff1f; 为什么要学Spring Boot&#xff1f; SpringBoot的优点 Spring Boot 项目创建 前置工作&#xff1a;配置国内源 使用 idea 创建 Spring Boot 项目 网页版创建&#xff08;了解&#xff09; 拓展&#xff1a;删除 项目中无用的目录和…

Matlab学习-轨迹热力图绘制

Matlab学习-轨迹热力图绘制 参考链接&#xff1a; MathWork-scatter函数使用 问题需求&#xff1a; 需要将轨迹上的点另一维信息同时显示在图上&#xff0c;比如横纵向误差等&#xff0c;这个时候画轨迹与误差的热力图就能很好同时反应位置和定位误差之间的关系&#xff1b;…

缓冲流~~

1&#xff1a;概述 缓冲流也称高效流&#xff0c;或者高级流。之前学习的字节流可以称为原始流。作用&#xff1a;缓冲流自带缓冲区&#xff0c;可以提高原始字节流&#xff0c;字符流读写数据的性能。 可以提高读写数据的效率。它通过在内存中创建缓冲区来减少对底层数据源的…

MATLAB图像处理实现高光抑制

下面是的几个用MATLAB进行高光抑制的处理例子。 1. 基于最大值滤波的亮光抑制方法 原理是用某像素周围一定大小的邻域中的最大值减去该像素值&#xff0c;可达到亮光抑制的效果。在MATLAB中&#xff0c;可以使用mat2gray函数将图像归一化后&#xff0c;再使用imextendedmax函…

Android Stuido Proguard Retrace Unscrambler直接reProguard反混淆retrace日志

Android Stuido Proguard Retrace Unscrambler直接reProguard反混淆retrace日志 &#xff08;1&#xff09;如果Android Studio里面没有安装下列插件之一的&#xff0c;在Settings的Plugins里面安装其中一个&#xff1a; &#xff08;2&#xff09;菜单栏中的code里面找到反混…

sphinx pdoc 生成API文档

文章目录 sphinxinstall pdoc sphinx install pip install sphinx sphinx_rtd_theme sphinx-autobuild pip install recommonmark sphinx_markdown_tables sphinx-quickstart 选 y Project language [en]: zh_CN conf.py: import sys sys.path.append(..)extensions [sph…

Word公式大括号左对齐

1、大括号公式如下&#xff1a; 2、依次选中每一行&#xff0c;然后在开头输入一个&&#xff0c;然后回车&#xff1a; 3、当最后一行输入完立马可以发现左对齐了&#xff1a; The higher I got, the more amazed I was by the view.