双11订单实时大屏 - Flink实战洞见

news2024/9/25 3:27:06

        每年的双11购物狂欢节不仅是消费者的狂欢,更是技术人员展示实力的舞台。在这个全民关注的时刻,如何实现实时、准确、鲜活的订单数据展示,成为了各大电商平台技术竞技的焦点。本文将深入探讨如何利用Apache Flink构建一个面向双11的订单实时大屏,展现数据实时处理的魅力。

1、前言

        实时大屏的核心在于“实时”,而Apache Flink作为一款高吞吐、低延迟、精确状态管理的流处理框架,是构建实时数据处理系统的不二之选。在高并发的双11场景下,Flink的稳定性和可伸缩性显得尤为关键。

2、架构设计

在设计双11订单实时大屏时,我们要考虑以下几个关键点:

  1. **数据源接入**:订单数据通常来源于用户的点击、购买等行为,这些数据需要通过消息队列(如Kafka)实时接入Flink。
  2. **实时计算逻辑**:处理数据的逻辑要尽可能简洁高效,包括订单量统计、金额汇总、实时热销榜单等。
  3. **状态管理**:在大数据量下,精确控制状态,保证计算的正确性和一致性。
  4. **可视化展示**:将处理后的数据实时推送到前端大屏,以图表或其他形式直观展示。

2.1 架构流程

  1. **数据采集**:从各个业务系统采集订单数据,通过Logstash等工具推送到Kafka。
  2. **Flink处理**:
    - **数据清洗**:对接入的数据进行格式化、过滤不必要的信息。
    - **实时计算**:进行订单统计、金额计算、用户行为分析等。
    - **窗口聚合**:利用Flink的时间窗口功能,对数据进行实时聚合。
  3. **结果存储与推送**:将计算结果存储到Redis或其他NoSQL数据库,并通过WebSocket等方式实时推送到大屏前端。
  4. **前端展示**:使用ECharts、D3.js等图表库实现动态的数据可视化。

3、关键技术点解析

3.1 Flink的时间窗口

在实时计算中,时间窗口是一个非常重要的概念。Flink提供了多种时间窗口,如滚动窗口、滑动窗口和会话窗口,可以根据业务需求选择合适的窗口类型进行数据聚合。

3.2 状态管理与容错

Flink的状态管理能力保证了在高并发的情况下数据的一致性和准确性。同时,Flink提供了保存点(Savepoint)和检查点(Checkpoint)机制,确保系统具备良好的容错性。

3.3 可视化技术

实时大屏的另一个关键是前端的数据可视化技术。前端不仅要实现数据的动态展示,还要保证用户交互的流畅性和视觉的吸引力。

4、实战案例

接下来,我们将通过一个简化的实战案例,演示如何使用Flink构建双11订单实时大屏。

4.1 数据模拟

我们可以通过Flink自带的数据源API模拟实时订单数据,数据字段包括订单ID、用户ID、商品ID、订单金额和下单时间。

4.2 Flink处理逻辑

4.2.1 模拟订单数据生成

首先,我们需要模拟一些订单数据。可以使用Python脚本来生成模拟数据,并将其发送到Kafka主题中。

import json
import random
import time
from kafka import KafkaProducer

# Kafka配置
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# 模拟订单数据
def generate_order():
    order = {
        "order_id": random.randint(1000, 9999),
        "user_id": random.randint(100, 999),
        "product_id": random.randint(10, 99),
        "amount": round(random.uniform(10.0, 1000.0), 2),
        "timestamp": int(time.time() * 1000)
    }
    return order

# 发送订单数据到Kafka
while True:
    order = generate_order()
    producer.send('orders', json.dumps(order).encode('utf-8'))
    time.sleep(1)

4.2.2 Flink作业处理订单数据

接下来,我们使用Flink来处理这些订单数据,计算实时的订单总金额,并将结果输出到控制台。

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Properties;

public class RealTimeOrderProcessing {
    public static void main(String[] args) throws Exception {
        // 设置Flink执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka消费者配置
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-group");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 创建Kafka消费者
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), properties);

        // 从Kafka中读取数据
        DataStream<String> orderStream = env.addSource(consumer);

        // 处理订单数据
        orderStream
            .map(order -> {
                // 解析JSON数据
                return new Order(order);
            })
            .keyBy(Order::getUserId)
            .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
            .aggregate(new AggregateFunction<Order, Double, Double>() {
                @Override
                public Double createAccumulator() {
                    return 0.0;
                }

                @Override
                public Double add(Order value, Double accumulator) {
                    return accumulator + value.getAmount();
                }

                @Override
                public Double getResult(Double accumulator) {
                    return accumulator;
                }

                @Override
                public Double merge(Double a, Double b) {
                    return a + b;
                }
            })
            .addSink(new SinkFunction<Double>() {
                @Override
                public void invoke(Double value, Context context) {
                    System.out.println("当前窗口订单总金额: " + value);
                }
            });

        // 执行Flink作业
        env.execute("Real-Time Order Processing");
    }
}

class Order {
    private int orderId;
    private int userId;
    private int productId;
    private double amount;
    private long timestamp;

    public Order(String json) {
        // 解析JSON字符串
        // 这里假设使用了某种JSON库进行解析
        // 例如:JSONObject obj = new JSONObject(json);
        //       this.orderId = obj.getInt("order_id");
        //       this.userId = obj.getInt("user_id");
        //       this.productId = obj.getInt("product_id");
        //       this.amount = obj.getDouble("amount");
        //       this.timestamp = obj.getLong("timestamp");
    }

    public int getOrderId() {
        return orderId;
    }

    public int getUserId() {
        return userId;
    }

    public int getProductId() {
        return productId;
    }

    public double getAmount() {
        return amount;
    }

    public long getTimestamp() {
        return timestamp;
    }
}

4.2.3 运行并观察结果

  1. 启动Kafka并创建一个名为`orders`的主题。
  2. 运行Python脚本生成并发送订单数据到Kafka。
  3. 运行Flink作业,观察控制台输出的订单总金额。

通过上述步骤,就实现了一个简化版的双11订单实时大屏,展示了订单总金额的实时计算过程。

4.3 前端展示

前端通过WebSocket实时接收后端推送的数据,并利用ECharts等库渲染成图表,实现动态更新的效果。

5、结论

        双11订单实时大屏是一个复杂而具有挑战性的项目,它不仅考验了Flink在实时数据处理领域的强大能力,也展示了前端可视化技术的重要性。通过本文的实战洞见,相信大家对于如何构建一个高性能、可靠的实时大屏系统有了更深的理解。在未来的双11中,让我们期待更多技术的精彩应用吧!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1993121.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

软件设计原则 KISS、YAGNI、DRY

软件设计原则 KISS、YAGNI、DRY flyfish 设计模式、设计原则与编程惯用法综述 图片来源SpaceX 简约至极的猛禽3(Raptor 3)火箭发动机 一、KISS (Keep It Simple, Stupid!, 保持简单) KISS原则&#xff0c;即“Keep it simple, stupid!”&#xff08;保持简单&#xff0c;傻…

鸿蒙(API 12 Beta3版)【AVCodec Kit简介】 音视频编码服务

AVCodec kit&#xff08;Audio & Video Codec Kit&#xff0c;音视频编解码&#xff0c;封装解封装原子能力&#xff09;是媒体系统中的音视频的编解码、媒体文件的解析、封装、媒体数据输入等原子能力。 能力范围 媒体数据输入&#xff1a;媒体应用可以传入文件fd、或者…

力扣面试150 反转链表 II 三指针

Problem: 92. 反转链表 II &#x1f468;‍&#x1f3eb; 参考题解 特殊情况 /*** Definition for singly-linked list.* public class ListNode {* int val;* ListNode next;* ListNode() {}* ListNode(int val) { this.val val; }* ListNode(int val…

【区块链+医疗健康】健康管理平台 | FISCO BCOS应用案例

如今&#xff0c;医疗数据的共通共享依然存在一些难点&#xff1a; 1. 数据碎片化&#xff0c;分散在各个机构和公司&#xff0c;难以整合和共享。 2. 数据不完整&#xff0c;缺乏全面的患者信息&#xff0c;导致决策质量下降。数据的可扩展性不足&#xff0c;难以长期跟踪患…

“Mutation Observer:让DOM变化尽在掌握

Mutation Observer&#xff08;变动观察者&#xff09; 定义 Mutation Observer是一种JavaScript API&#xff0c;用于异步监测DOM树的变动&#xff0c;包括元素的添加、删除、属性变化等。当DOM发生变动时&#xff0c;它可以触发回调函数&#xff0c;允许你对变动作出响应。 …

Ubuntu14.04安装网卡驱动

1&#xff0c;lspci 查看网卡型号 2,到官网下载解压&#xff0c;进入文件目录 3,新装的服务器会报错&#xff0c;提示我们没有安装make&#xff0c;下载缺少的gcc和make依赖&#xff0c;记得先执执行一下系统更新 sudo apt update && sudo apt upgrade -y sudo apt …

【css】使用CSS绘制奥运五环--巴黎奥运

使用CSS绘制奥运五环 在2024年巴黎奥运会期间&#xff0c;本文来使用 CSS 来画一个奥运五环。奥运五环由五个相互交叠的圆环组成&#xff0c;分别代表五大洲。 奥运五环是相互连接的&#xff0c;因此在视觉上会产生重叠效果&#xff0c;这也是实现五环最有挑战性的部分 HTML结…

Linux -软件包管理 下载与安装方式

1. wget下载 先下载再安装 wget命令是Linux系统用于从Web下载文件的命令行工具&#xff0c;支持 HTTP、HTTPS及FTP协议下载文件 当然现在更多支持用yum工具的&#xff0c;不过有的时候一些镜像站点不再维护一些旧版本的软件时&#xff0c;这时候就可以用wget可以先获取&…

Unity补完计划 之 SpriteEditer Multiple

本文仅作笔记学习和分享&#xff0c;不用做任何商业用途 本文包括但不限于unity官方手册&#xff0c;unity唐老狮等教程知识&#xff0c;如有不足还请斧正 1. SpriteEditer Multiple Automatic slicing - Unity 手册 这是用于裁剪图集的模式 应用之后精灵编辑器会看到Slice亮…

CSP-J 复赛 模拟题7 and 解析

1.超级素数&#xff1a; 题目描述 素数&#xff0c;又称质数&#xff0c;是指除 11 和其自身之外&#xff0c;没有其他约数的正整数。例如 2,3,5,132,3,5,13 都是素数&#xff0c;而 4,9,12,184,9,12,18 则不是。特别地&#xff0c;规定 11 不是素数&#xff08;因此自然数的…

【C++进阶学习】第十二弹——C++ 异常处理:深入解析与实践应用

前言&#xff1a; 在C编程语言中&#xff0c;异常处理是一种重要的机制&#xff0c;它允许程序员在运行时捕获和处理错误或异常情况。本文将详细介绍C异常处理的相关知识点&#xff0c;包括异常的定义、抛出与捕获、异常处理的原则、以及在实际编程中的应用。 目录 1. 异常处理…

算法力扣刷题记录 六十九【动态规划基础及509. 斐波那契数】

前言 调整一下做题顺序&#xff0c;多个章节同步进行&#xff0c;穿插练习。可以在各章节的专栏中找同一类。 记录 六十九【动态规划基础】。 一、动态规划理论基础学习 参考学习链接 二、509. 斐波那契数 2.1 题目阅读 斐波那契数 &#xff08;通常用 F(n) 表示&#x…

屏蔽浏览器搜索出csdn相关内容的方法

屏蔽csdn搜索结果的方法 前言 鉴于你对知识质量的渴望&#xff0c;以及对挖掘知识金子的欲求&#xff0c;你一定想在浏览器结果中去除有关Csdn的全部内容&#x1f608;**(确信)**&#xff0c;但是当你在用bing或者google搜索有没有可以屏蔽CSDN搜索结果的方法时&#xff0c;通…

一套基于tailwindcss的后台管理系统模板Chakra UI + React + TS

下载地址给你们&#xff1a; https://horizon-ui.com/#version

算法混合杂项

基础类型 可用template 投影 是有方向的 求俩直线交点 推公式 q我们不知道&#xff0c;已知p1 p2&#xff0c;正弦定理&#xff0c;α可以用叉积表示出来 β同理 所以我们能求出p1q 已知piq 回归到我们上一个问题&#xff0c;已知方向和长度&#xff0c;我们就能够求出Voq …

24/8/8算法笔记 不同分类算法的差异

import numpy as np from sklearn.tree import DecisionTreeClassifier from sklearn.linear_model import LogisticRegression from sklearn.svm import SVCfrom sklearn import datasets 加载数据 我们加载的是啤酒的数据 wine datasets.load_wine() wine LR逻辑斯蒂回归…

【数据结构】数组复习-二分查找法

写这篇博客的起因&#xff1a; 刚开始刷力扣&#xff0c;发现没有一个很好的做题方法&#xff0c;在网络上发现了这个博主的评论&#xff0c;如下。感觉挺适合我&#xff0c;所以开始复习一下数据结构。 c基础主要是看&#xff1a; 1.bilibili上青岛大学王卓第02周03--2.3线…

算法 二

求中点 LR&#xff0c;可能溢出 除以2&#xff0c;等同于右移一位 递归、递归的时间复杂度 母问题的规模 子问题的规模&#xff0c;且都相等 调用次数 不用展开看&#xff0c;就看一层。 归并排序 时间复杂度降低的原因&#xff1a;没有浪费比较。比如选择排序&#xff…

48天笔试训练错题——day44

目录 选择题 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 编程题 1. 单词倒排 选择题 1. A 类 IP 地址&#xff1a;0.0.0.0 ~ 127.255.255.255 1 字节网络号&#xff0c;3 字节主机号 B 类 IP 地址&#xff1a;128.0.0.0 ~ 191.255.255.255 2…

服务器网络磁盘挂载

一、Ping测试 先测试磁盘网络的连通性 例如&#xff1a;这里申请的网络磁盘是&#xff1a; 127.0.0.1:/shareData ping 127.0.0.1二、挂载 确认连通后&#xff0c;确定需要挂载的目录&#xff0c;这里服务器的挂载目录为&#xff1a;/data/share &#xff08;自主选择创建目录…