【Flink】处理迟到元素(续)、自定义水位线和多流的合并与合流

news2025/1/16 5:15:40

文章目录

  • 一 处理迟到元素
    • 1 处理策略
      • (3)使用迟到元素更新窗口计算结果
        • a 代码编写
        • b 测试
  • 二 自定义水位线
    • 1 产生水位线的接口
    • 2 自定义水位线的产生逻辑
  • 三 多流的合流与分流
    • 1 union算子
    • 2 水位线传递规则
      • (1) 分流
        • a 代码编写
        • b 测试
      • (2) 合流
        • a 代码编写
        • b 测试
    • 3 connect算子

一 处理迟到元素

1 处理策略

(3)使用迟到元素更新窗口计算结果

由于存在迟到的元素,所以已经计算出的窗口结果是不准确和不完全的。我们可以使用迟到元素更新已经计算完的窗口结果。

如果我们要求一个 operator 支持重新计算和更新已经发出的结果,就需要在第一次发出结果以后也要保存之前所有的状态。但显然我们不能一直保存所有的状态,肯定会在某一个时间点将状态清空,而一旦状态被清空,结果就再也不能重新计算或者更新了。而迟到的元素只能被抛弃或者发送到旁路输出流。

window operator API 提供了方法来明确声明我们要等待迟到元素。当使用 event-timewindow,我们可以指定一个时间段叫做 allowed lateness(允许等待时间)。window operator 如果设置了allowed lateness,这个 window operator 在水位线没过窗口结束时间时也将不会删除窗口和窗口中的状态。窗口会在一段时间内 (allowed lateness 设置的) 保留所有的元素。

当迟到元素在 allowed lateness 时间内到达时,这个迟到元素会被实时处理并发送到触发器 (trigger)。当水位线超过了窗口结束时间 + allowed lateness 时间时,窗口会被删除,并且所有后来的迟到的元素都会被丢弃。

使用迟到数据更新窗口的计算结果:

a 代码编写

public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    SingleOutputStreamOperator<String> result = env
            .socketTextStream("localhost", 9999)
            .map(new MapFunction<String, Tuple2<String, Long>>() {
                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    String[] arr = value.split(" ");
                    return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);
                }
            })
            .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                            .withTimestampAssigner(
                                    new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                        @Override
                                        public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                                            return element.f1;
                                        }
                                    }
                            )
            )
            .keyBy(r -> r.f0)
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            // 允许等待时间为5S,5S后窗口被销毁
            .allowedLateness(Time.seconds(5))
            // 等待5S后到来的数据被输出到侧输出流中
            .sideOutputLateData(new OutputTag<Tuple2<String, Long>>("late") {
            })
            .process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
                // 在窗口闭合的时候调用
                @Override
                public void process(String s, Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
                    // 初始化一个窗口状态变量,注意,窗口状态变量的可见范围是当前窗口
                    // 是否为第一次触发计算
                    ValueState<Boolean> firstCalcaulate =
                            context.windowState().getState(new ValueStateDescriptor<Boolean>("first", Types.BOOLEAN));
                    // 闭合窗口,产生第一次计算
                    if (firstCalcaulate.value() == null) {
                        out.collect("窗口第一次触发了计算!水位线是【" + context.currentWatermark()
                                + "】窗口中共有【" + elements.spliterator().getExactSizeIfKnown() + "】条元素");
                        // 第一次触发process计算,将状态更新为true
                        firstCalcaulate.update(true);
                    } else {
                        out.collect("迟到数据到了,更新以后窗口中的元素数量是【" + elements.spliterator().getExactSizeIfKnown() + "】");
                    }
                }
            });

    result.print("正常数据:");

    result.getSideOutput(new OutputTag<Tuple2<String,Long>>("late"){}).print("迟到数据:");

    env.execute();

}

b 测试

输入数据输出结果如下:

a 1
a 2
a 10	输出:正常数据:> 窗口第一次触发了计算!水位线是【4999】窗口中共有【2】条元素
	    注意:此时【0-5】窗口不会被销毁,窗口销毁时间 = 5 + 5 - 1ms = 9999ms
a 1		输出:正常数据:> 迟到数据到了,更新以后窗口中的元素数量是【3】
a 1		输出:正常数据:> 迟到数据到了,更新以后窗口中的元素数量是【4】
a 7		注意:属于【5-10】窗口,不会触发【5-10】窗口的第一次计算
a 8		
	    注意:输入a 15,关闭【0-5】窗口,且触发【5-10】窗口的计算
a 15	输出:正常数据:> 窗口第一次触发了计算!水位线是【9999】窗口中共有【2】条元素
	    注意:再输入【0-5】窗口中的数据,被输出到侧输出流中
a 1		输出:迟到数据:> (a,1000)

所以从本质上看,此种处理迟到数据的策略是【上文(2)】的扩展,那么为什么不直接将延迟时间设置为10S呢?因为

  • 有些窗口不存在迟到数据
  • 能够更快的看到计算结果

所以对于迟到数据的处理方法,要根据实际情况进行权衡。

二 自定义水位线

1 产生水位线的接口

@Public
public interface WatermarkGenerator<T> {
    /**
    * 每来一个事件都会调用, 允许水位线产生器记忆和检查事件的时间戳。
    * 允许水位线产生器基于事件本身发射水位线。
    */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);
    /**
    * 周期性的调用(默认 200ms 调用一次), 可能会产生新的水位线,也可能不会。
    *
    * 调用周期通过 ExecutionConfig#getAutoWatermarkInterval() 方法来配置。
    */
    void onPeriodicEmit(WatermarkOutput output);
}

2 自定义水位线的产生逻辑

public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    env
            .socketTextStream("localhost",9999)
            .map(new MapFunction<String, Tuple2<String,Long>>() {
                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    String[] arr = value.split(" ");
                    return Tuple2.of(arr[0],Long.parseLong(arr[1]) * 1000L);
                }
            })
            .assignTimestampsAndWatermarks(new CustomerWatermarkGenerator())
            .print();

    env.execute();
}
public static class CustomerWatermarkGenerator implements WatermarkStrategy<Tuple2<String,Long>> {

    @Override
    public TimestampAssigner<Tuple2<String, Long>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        return new SerializableTimestampAssigner<Tuple2<String, Long>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        };
    }

    @Override
    public WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new WatermarkGenerator<Tuple2<String, Long>>() {
            // 延迟时间为5s
            private long bound = 5000L;
            // 防止溢出,最后计算水位线时需要- bound - 1L
            private long maxTs = -Long.MAX_VALUE + bound + 1L;
            // 来一个事件,执行一次
            @Override
            public void onEvent(Tuple2<String, Long> event, long eventTimestamp, WatermarkOutput output) {
                // 更新观察到的最大事件事件
                maxTs = Math.max(maxTs,event.f1);
            }
            
            // 周期型计算水位线
            @Override
            public void onPeriodicEmit(WatermarkOutput output) {
                // 发送水位线,注意水位线的计算公式
                output.emitWatermark(new Watermark(maxTs - bound - 1L));
            }
        };
    }
}

三 多流的合流与分流

1 union算子

union算子用于多条流的合并,所有流中的元素类型必须相同。

多流按照先进先出(FIFO)的原则进行合并。

public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    //DataStreamSource<T> extends SingleOutputStreamOperator<T>
    //SingleOutputStreamOperator<T> extends DataStream<T>
    DataStreamSource<Integer> stream1 = env.fromElements(1, 2);
    DataStreamSource<Integer> stream2 = env.fromElements(3, 4);
    DataStreamSource<Integer> stream3 = env.fromElements(5, 6);


    DataStream<Integer> result = stream2.union(stream1, stream3);

    result.print();

    env.execute();
}

水位线在单条流进行分流(找最小的时钟)和多条流进行合流(复制,向下广播)的时候,可以从union中看出来,如下:

当三条流进行合流时,会更新事件时钟算子,选择最小的(详情见2(2)合流的测试);向下传播的时候,会进行广播(详情见2(1)分流的测试),此为水位线传播规则。

在这里插入图片描述

2 水位线传递规则

(1) 分流

a 代码编写

public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    env
            .socketTextStream("localhost",9999)
            .map(new MapFunction<String, Tuple2<String,Long>>() {
                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    String[] arr = value.split(" ");
                    return Tuple2.of(arr[0],Long.parseLong(arr[1]) * 1000L);
                }
            })
            .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
                    .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                        @Override
                        public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                            return element.f1;
                        }
                    })
            )
            .keyBy(r -> r.f0)
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
                @Override
                public void process(String s, Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
                    out.collect("key:" + s + " 的窗口触发了");
                }
            })
            .print();

    env.execute();
}

b 测试

输入输出结果如下:

由于分流开窗,每个key的每个窗口都会有一个ProcessWindowFunction,水位线也会进入process算子里面

初始	负无穷大的水位线
a 1   后面跟了一个999ms的水位线
b 5   后面跟了一个4999ms的水位线,当b的4999进入process,就会关闭a和b的窗口
	  输出:key:a 的窗口触发了
a 6
b 6
a 10  输出:key:b 的窗口触发了
	       key:a 的窗口触发了

分流会向不同的分区广播水位线。

(2) 合流

a 代码编写

public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    SingleOutputStreamOperator<Tuple2<String, Long>> stream1 = env
            .socketTextStream("localhost", 9999)
            .map(new MapFunction<String, Tuple2<String, Long>>() {
                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    String[] arr = value.split(" ");
                    return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);
                }
            })
            .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
                            .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                @Override
                                public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                                    return element.f1;
                                }
                            })
            );

    SingleOutputStreamOperator<Tuple2<String, Long>> stream2 = env
            .socketTextStream("localhost", 9998)
            .map(new MapFunction<String, Tuple2<String, Long>>() {
                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    String[] arr = value.split(" ");
                    return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);
                }
            })
            .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
                            .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                @Override
                                public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                                    return element.f1;
                                }
                            })
            );

    stream1
            .union(stream2)
            .process(new ProcessFunction<Tuple2<String, Long>, String>() {
                @Override
                public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {
                    out.collect("当前水位线是:" + ctx.timerService().currentWatermark());
                }
            })
            .print();

    env.execute();
}

b 测试

在流一种输入a 1,流二中输入a 2、a 3,流一中输入a 4、a 5。

监控9999和9998端口
9999:
负无穷大水位线
a 1 到达后200ms,后面会跟随着一个999ms的水位线,首先输出当前水位线
    输出:当前水位线是:-9223372036854775808
    并覆盖掉自己的负无穷大时钟,更新算子时钟min(-MAX,999),仍然为负无穷大
    
a 4 到达后200ms,后面会跟随着一个3999的水位线,首先选择两条流中较小的水位线输出,min(999,3999)
    输出:当前水位线是:999
    并覆盖掉自己的999时钟,更新算子时钟min(2999,3999)为2999
a 5 到达后200ms,后面会跟随着一个4999的水位线,首先选择两条流中较小的水位线输出,min(2999,3999)
	输出:当前水位线是:2999
	并覆盖掉自己的3999时钟,更新算子时钟min(2999,4999)为2999

9998:
负无穷大水位线
a 2 到达后200ms,后面会跟随着一个1999ms的水位线,首先输出当前水位线
    输出:当前水位线是:-9223372036854775808
    并覆盖掉自己的负无穷大时钟,更新算子时钟min(1999,999),为999
    
a 3 到达后200ms,后面会跟随着一个2999ms的水位线,首先选择两条流中较小的水位线,min(999,1999)输出
    输出:当前水位线是:999
    并覆盖自己的1999ms时钟,更新算子时钟min(999,2999)为999

元素到来首先调用processElement,水位线到来,更新时钟。

分析过程如下图:

在这里插入图片描述

3 connect算子

union算子用于两条流的合并,两条流中的元素类型可以不同。

public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    DataStreamSource<Example1.Event> clickStream = env.addSource(new Example1.ClickSource());

    DataStreamSource<String> queryStream = env.socketTextStream("localhost", 9999).setParallelism(1);

    clickStream
            .keyBy(r -> r.user)
            // 和查询流的广播流进行连接,即对点击流的每一个分区都广播一份
            // 实现在查询流中输入./home可以将用户访问./home的数据过滤出来
            // 这也是为什么要连接查询流的广播流
            .connect(queryStream.broadcast())
            // 依次为第一条流、第二条流、输出的泛型
            // CoFlatMapFunction是和FlatMapFunction对应的一种接口,不是复函数
            // 功能相对受限,没有办法注册定时器、状态变量
            .flatMap(new CoFlatMapFunction<Example1.Event, String, Example1.Event>() {
                private String query = "";
                // 不同元素类型的两条流合并到了一起
                // 第一条流中的元素调用flatMap1方法,反之调用flatMap2方法
                @Override
                public void flatMap1(Example1.Event value, Collector<Example1.Event> out) throws Exception {
                    if(value.url.equals(query)) out.collect(value);
                }

                @Override
                public void flatMap2(String value, Collector<Example1.Event> out) throws Exception {
                    query = value;
                }
            })
            .print();

    env.execute();
}

可以随时在查询流中改变想要查找的内容,查询流和点击流合并,不满足条件的数据会被放弃掉,无法查找以前的数据。

connect一般用于两条流都进行keyBy,即将两条数据相同分区的数据进行合流处理;要么一条流进行keyBy,另一条流进行广播,类似于spark中的广播变量(使用不同key将数据shuffle到了不同的分区,针对每一个分区广播一个变量),只不过这里针对的对象是流,流中的每一条元素进行复制,广播到所有分区。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/49470.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

virtio-net 实现机制【二】(图文并茂)

4. virio-net前端驱动分析 4.1 重要数据结构 4.1.1 send_queue结构 send_queue结构是对virtqueue的封装&#xff0c;是virtio-net的发送队列&#xff0c;即数据流向从前端驱动&#xff08;Guest&#xff09;到后端驱动&#xff08;Host&#xff09; 4.1.2 receive_queue结构…

【PlasticSCM Could Edition】新版本云托管浅试2

【PlasticSCM Could Edition】新版本云托管浅试2首先修复更改托管提一嘴首先 建议还是使用 PlasticHub&#xff0c;不要去用 PlasticSCM Cloud 原因&#xff1a; 由于比较新&#xff0c;伴随着的就是 —— 太多 bigs 了&#xff01;而且不知道怎么去改。 当时我创建了一个…

数据结构复习

期末的一套模拟题&#xff0c;仅供参考&#xff0c;切莫作为期末考试依据&#xff01;&#xff01;&#xff01; 选择题 数组A[1..5,1..6]每个元素占5个单元&#xff0c;将其按行优先次序存储在起始地址为1000的连续的内存单元中&#xff0c;则元素A[5,5]的地址为&#xff1a;…

轻量应用服务器部署vue项目

首先我已经拥有一个轻量云服务器了.windows2012的版本&#xff1a; 1.搭建 FTP 服务 为了将我们打包好的vue项目传到服务器&#xff0c;我们要先在服务器搭建FTP服务。 具体步骤可以参考官方文档&#xff0c;官方文档的教程十分清楚详细&#xff0c;按照步骤来就不会出错&am…

传奇服务端服务端运行7个窗口的各窗口功能讲解

大家都知道打开传奇版本里的游戏引擎后&#xff0c;就会弹出7各窗口&#xff0c;下面给大家讲解下各窗口的功能定义 GameCenter是游戏服务器启动程序&#xff0c;负责将DBServer.exe处转来的客户 端转发到相应的M2Server.exe。 DBServer.exe是用户数据库服务器&#xff0c;负责…

Allure与Jenkins持续集成

目标&#xff1a;每次提交代码到代码托管平台&#xff08;gitee&#xff09;&#xff0c;自动触发jenkins项目构建&#xff0c;生成allure测试报告&#xff0c;并发送邮件通知。 1、部署jenkins Jenkins — 快速入门 2、配置Gitee &#xff08;1&#xff09;Plugin Manager&…

01_openstack概述

一、openstack起源 Openstack是一个由NASA(美国国家航空航天局)和Rackspace合作研发并发起的项目Openstack是一套IaaS解决方案Openstack是一个开源的云计算管理平台以Apache许可证为授权二、Openstack七大组件 1、Horizaon组件 Horizon为Openstack服务的Web控制面板&#xff…

leecode#用Read4读取n个字符#相交链表

题目描述&#xff1a; 给你一个文件&#xff0c;并且该文件只能通过给定的 read4 方法来读取&#xff0c;请实现一个方法使其能够读取 n 个字符。 分析&#xff1a; read4 方法&#xff1a;API read4 可以从文件中读取 4 个连续的字符&#xff0c;并且将它们写入缓存数组 buf…

为什么电脑运行越来越慢?解决方法又是什么呢?

文章目录为什么电脑运行越来越慢&#xff1f;解决方法又是什么呢&#xff1f;一&#xff0c;电脑运行慢的原因二&#xff0c;提高电脑运行速度的方法1&#xff0c;重启电脑2&#xff0c;还原&#xff08;重置&#xff09;操作系统3&#xff0c;关闭不使用的应用程序4&#xff0…

CrossOver2023Win电脑软件操作在Mac、Linux系统运行教程

在Mac系统中一直存在一个比较令用户们头疼的问题&#xff0c;那就是安装不了想要的Windows软件。如果使用的第一台电脑就是MacBook那接触到的Windows软件想必并不是很多。但我们中的大多数人都是从小先学习了Windows的操作系统&#xff0c;再过渡到Mac系统上的。 那有小伙伴会…

面试:插件化相关---service

插件service启动分析 同样的&#xff0c;先来看看service的常规启动流程 调用contextimpl.startService/bindService/stopService -> AMS&#xff0c;AMS对应创建ServiceRecord和token后&#xff0c;通知ActivityThreadActivityThread收到startService后&#xff0c;会创建…

少儿编程 电子学会图形化编程等级考试Scratch一级真题解析(选择题)2022年9月

2022年9月scratch一级真题解析 选择题(共25题,每题2分,共50分) 1、点击绿旗,下列哪个选项可以实现播放马叫声并在声音全部播放完后,马向右移动 A、 B、 C、 D、

[附源码]计算机毕业设计JAVA小说网站的设计与实现1

[附源码]计算机毕业设计JAVA小说网站的设计与实现1 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM m…

基于RV1126 Video分析-----链接 isp 与mipi csi 的media模块

从前文&#xff1a;<<图像处理模块所代表的V4L2设备注册>> 中了解到。rkcif_mipi设备注册的过程就是以设备通知器为线索&#xff0c;从顶向下&#xff0c;依次找到下一级设备&#xff0c;添加到V4L2设备种&#xff0c;循环处理。将全部的子设备整理到 V4L2设备中&a…

《InnoDB引擎五》Checkpoint技术

Checkpoint技术 缓冲池的设计目的是为了协调CPU速度与磁盘速度的鸿沟。因此页的操作首先都是在缓冲池中完成的。如果一条DML语句&#xff0c;比如Update或Delete改变了页中的记录&#xff0c;那么此时页是脏的&#xff0c;即缓冲池中的页要比磁盘的新&#xff0c;那么数据库就需…

网络编程简单学习

目录 一、 概述 1. 什么是计算机网络&#xff1f; 2. 网络编程的目的 3. 需要达到什么样的效果&#xff1f; 二、 网络通信的两个要素 1. 通信双方地址 2. 规则&#xff1a;网络通信协议 三、IP地址 1. IP地址概述 2. IP地址分类 3. 域名 四、端口 五、通信协议 …

Python破解WIFI密码完整源代码,实测可成功破解

目录 # 修正记录&#xff1a;2022-12-01 1&#xff0c;先安装Python环境(这个不用多说了吧) 2&#xff0c;安装PyWifi 3,自己手工整理高频弱口令&#xff0c;不建议程序生成的字典&#xff0c;生成的字典成功率实在太低。 4&#xff0c;自己生成字典的算法&#xff1a; 5…

九联UNT413A_S905L3A__AI语音_默认打开ADB_完美线刷固件包【可救砖】

UNT413A_S905L3A__AI语音_默认打开ADB_完美线刷固件包【可救砖】_基于云南原机制作_端口5555 固件特点&#xff1a; 1、修改dns&#xff0c;三网通用&#xff1b; 2、开放原厂固件屏蔽的市场安装和u盘安装apk&#xff1b; 3、无开机广告&#xff0c;无系统更新&#xff0c;…

PyQt5 设置窗口背景

PyQt5 设置窗口背景使用setStyleSheet设置窗口背景图片使用setStyleSheet设置窗口背景颜色使用QPalette设置窗口背景颜色使用QPalette设置窗口背景图片使用paintEvent设置窗口背景颜色使用paintEvent设置窗口背景图片窗口背景主要包括&#xff1a;背景色和背景图片。设置窗口背…

跨境电商必知的交叉销售和追加销售:2022终极指南

关键词&#xff1a;跨境电商、交叉销售、追加销售 跨境电商可以从客户那里获得的潜在收入不会在销售点结束。 交叉销售和追加销售通过吸引客户增加支出来增加您的收入。这是一个双赢的局面&#xff0c;消费者获得了卓越的体验&#xff0c;而你赚到了更多的钱。 但是&#xff0c…