当速度很重要时:使用 Hazelcast 和 Redpanda 进行实时流处理

news2025/1/15 1:04:10

        在本教程中,了解如何构建安全、可扩展、高性能的应用程序,以释放实时数据的全部潜力。

        在本教程中,我们将探索 Hazelcast 和 Redpanda 的强大组合,以构建对实时数据做出反应的高性能、可扩展和容错的应用程序。

        Redpanda 是一个流数据平台,旨在处理高吞吐量、实时数据流。Redpanda 与 Kafka API 兼容,为 Apache Kafka 提供了高性能且可扩展的替代方案。Redpanda 独特的架构使其能够每秒处理数百万条消息,同时确保低延迟、容错和无缝可扩展性。

        Hazelcast 是一个统一的实时流数据平台,通过独特地将流处理和快速数据存储相结合,实现对事件流和传统数据源的低延迟查询、聚合和状态计算,从而对动态数据进行即时操作。它允许您快速构建资源高效的实时应用程序。您可以以任何规模部署它,从小型边缘设备到大型云实例集群。

在这篇文章中,我们将指导您设置和集成这两种技术,以实现实时数据摄取、处理和分析,从而实现强大的流分析。最后,您将深入了解如何利用 Hazelcast 和 Redpanda 的组合功能来释放应用程序的流分析和即时操作的潜力。

那么,让我们开始吧!

Pizza in Motion:披萨外卖服务的解决方案架构

        首先,让我们了解我们要构建什么。我们大多数人都喜欢披萨,所以让我们以披萨送货服务为例。我们的披萨外卖服务实时接收来自多个用户的订单。这些订单包含时间戳、user_id、pizza_type 和数量。我们将使用 Python 生成订单,将它们引入 Redpanda,然后使用 Hazelcast 处理它们。

        但是,如果您想通过上下文数据丰富披萨订单怎么办?例如,为特定类型的披萨推荐特定的开胃菜。如何实时做到这一点?

        实际上有多种选择,但在这篇博文中,我们将向您展示如何使用 Hazelcast 通过存储在 Hazelcast 的 iMap 中的开胃菜丰富来自 Redpanda 的披萨订单。

下面是该解决方案的简要示意图。

教程:使用 Redpanda 和 Hazelcast 进行实时流处理

设置 Redpanda

在本教程的范围内,我们将使用 Docker Compose 设置 Redpanda 集群。因此,请确保本地安装了 Docker Compose。
docker-compose.yml在您选择的位置创建文件并向其中添加以下内容。

version: "3.7"
name: redpanda-quickstart
networks:
redpanda_network:
driver: bridge
volumes:
redpanda-0: null
services:
redpanda-0:
command:
- redpanda
- start
- --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092
# Address the broker advertises to clients that connect to the Kafka API.
# Use the internal addresses to connect to the Redpanda brokers'
# from inside the same Docker network.
# Use the external addresses to connect to the Redpanda brokers'
# from outside the Docker network.
- --advertise-kafka-addr internal://redpanda-0:9092,external://localhost:19092
- --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082
# Address the broker advertises to clients that connect to the HTTP Proxy.
- --advertise-pandaproxy-addr internal://redpanda-0:8082,external://localhost:18082
- --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081
# Redpanda brokers use the RPC API to communicate with eachother internally.
- --rpc-addr redpanda-0:33145
- --advertise-rpc-addr redpanda-0:33145
# Tells Seastar (the framework Redpanda uses under the hood) to use 1 core on the system.
- --smp 1
# The amount of memory to make available to Redpanda.
- --memory 1G
# Mode dev-container uses well-known configuration properties for development in containers.
- --mode dev-container
# enable logs for debugging.
- --default-log-level=debug
image: docker.redpanda.com/redpandadata/redpanda:v23.1.11
container_name: redpanda-0
volumes:
- redpanda-0:/var/lib/redpanda/data
networks:
- redpanda_network
ports:
- 18081:18081
- 18082:18082
- 19092:19092
- 19644:9644
console:
container_name: redpanda-console
image: docker.redpanda.com/redpandadata/console:v2.2.4
networks:
- redpanda_network
entrypoint: /bin/sh
command: -c 'echo "$$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console'
environment:
CONFIG_FILEPATH: /tmp/config.yml
CONSOLE_CONFIG_FILE: |
kafka:
brokers: ["redpanda-0:9092"]
schemaRegistry:
enabled: true
urls: ["http://redpanda-0:8081"]
redpanda:
adminApi:
enabled: true
urls: ["http://redpanda-0:9644"]
ports:
- 8080:8080
depends_on:
      - redpanda-0

上面的文件包含使用单个代理启动 Redpanda 集群所需的配置。如果需要,您可以使用三代理集群。但是,对于我们的用例来说,单个经纪人就足够了。

请注意,仅建议在 Docker 上使用 Redpanda 进行开发和测试。对于其他部署选项,请考虑Linux或Kubernetes。

为了生成数据,我们使用 Python 脚本:

import asyncio
import json
import os
import random
from datetime import datetime

from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic

BOOTSTRAP_SERVERS = (
    "localhost:19092"
    if os.getenv("RUNTIME_ENVIRONMENT") == "DOCKER"
    else "localhost:19092"
)

PIZZASTREAM_TOPIC = "pizzastream"
PIZZASTREAM_TYPES = [
    "Margherita",
    "Hawaiian",
    "Veggie",
    "Meat",
    "Pepperoni", 
    "Buffalo",
    "Supreme",
    "Chicken",
]

async def generate_pizza(user_id):

    producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS)
    while True:
        data = {
            "timestamp_": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "pizza": random.choice(PIZZASTREAM_TYPES),
            "user_id": user_id,
            "quantity": random.randint(1, 10),
        }
        producer.send(
            PIZZASTREAM_TOPIC,
            key=user_id.encode("utf-8"),
            value=json.dumps(data).encode("utf-8"),
        )
        print(
            f"Sent a pizza stream event data to Redpanda: {data}"
        )
        await asyncio.sleep(random.randint(1, 5))

async def main():
    tasks = [
        generate_pizza(user_id)
        for user_id in [f"user_{i}" for i in range(10)]
    ]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    # Create kafka topics if running in Docker.
    if os.getenv("RUNTIME_ENVIRONMENT") == "DOCKER":
        admin_client = KafkaAdminClient(
            bootstrap_servers=BOOTSTRAP_SERVERS, client_id="pizzastream-producer"
        )
        # Check if topics already exist first
        existing_topics = admin_client.list_topics()
        for topic in [PIZZASTREAM_TOPIC]:
            if topic not in existing_topics:
                admin_client.create_topics(
                    [NewTopic(topic, num_partitions=1, replication_factor=1)]
                )
    asyncio.run(main())

设置 Hazelcast

启动 Hazelcast 本地集群。这将以客户端/服务器模式运行 Hazelcast 集群以及在本地网络上运行的管理中心实例。

brew tap hazelcast/hz
brew install hazelcast@5.3.1
hz -V

现在我们了解了要构建的内容并设置了先决条件,让我们直接进入解决方案。

步骤1:启动Redpanda集群

让我们通过在终端中运行以下命令来启动Redpanda 集群。确保您位于保存文件的同一位置docker-compose.yml

docker compose up -d

类似于以下内容的输出确认 Redpanda 集群已启动并正在运行。

[+] Running 4/4
⠿ Network redpanda_network                 Created  0.0s
⠿ Volume "redpanda-quickstart_redpanda-0"  Created  0.0s
⠿ Container redpanda-0                     Started  0.3s
⠿ Container redpanda-console               Started  0.6s

第 2 步:运行 Hazelcast

您可以运行以下命令来启动具有一个节点的 Hazelcast 集群。

hz start

要将更多成员添加到集群,请打开另一个终端窗口并重新运行启动命令。

第 3 步:在 Hazelcast 上运行 SQL

我们将使用 SQL shell——在集群上运行 SQL 查询的最简单方法。您可以使用SQL查询地图和Kafka主题中的数据。结果可以直接发送到客户端或插入到地图或 Kafka 主题中。您还可以使用 Kafka Connector,它允许您在 Hazelcast 集群和 Kafka 之间流式传输、过滤和转换事件。您可以通过运行以下命令来执行此操作:

bin/hz-cli sql

第 4 步:摄取 Hazelcast iMap (pizzastream)

使用 SQL 命令,我们创建pizzastreamMap:

CREATE OR REPLACE MAPPING pizzastream(
timestamp_ TIMESTAMP,
pizza VARCHAR,
user_id VARCHAR,
quantity DOUBLE
)
TYPE Kafka
OPTIONS (
'keyFormat' = 'varchar',
'valueFormat' = 'json-flat',
'auto.offset.reset' = 'earliest',
'bootstrap.servers' = 'localhost:19092');

步骤 5:用推荐数据丰富流(推荐器)

对于这一步,我们创建另一个地图:

CREATE or REPLACE MAPPING recommender (
__key BIGINT,
user_id VARCHAR,
extra1 VARCHAR,
extra2 VARCHAR,
extra3 VARCHAR )
TYPE IMap
OPTIONS (
'keyFormat'='bigint',
'valueFormat'='json-flat');

我们在 Map 中添加一些值:

INSERT INTO recommender VALUES
(1, 'user_1', 'Soup','Onion_rings','Coleslaw'),
(2, 'user_2', 'Salad', 'Coleslaw', 'Soup'),
(3, 'user_3', 'Zucchini_fries','Salad', 'Coleslaw'),
(4, 'user_4', 'Onion_rings','Soup', 'Jalapeno_poppers'),
(5, 'user_5', 'Zucchini_fries', 'Salad', 'Coleslaw'),
(6, 'user_6', 'Soup', 'Zucchini_fries', 'Coleslaw'),
(7, 'user_7', 'Onion_rings', 'Soup', 'Jalapeno_poppers'),
(8, 'user_8', 'Jalapeno_poppers', 'Coleslaw', 'Zucchini_fries'),
(9, 'user_9', 'Onion_rings','Jalapeno_poppers','Soup');

第 6 步:使用 SQL 合并两个映射

基于上面两个Map,我们发送以下SQL查询:

SELECT
    pizzastream.user_id AS user_id,
    recommender.extra1 as extra1,
    recommender.extra2 as extra2,
    recommender.extra3 as extra3,
     pizzastream.pizza AS pizza
FROM pizzastream
JOIN recommender
ON recommender.user_id = recommender.user_id 
AND recommender.extra2 = 'Soup';

第7步:将组合数据流发送到Redpanda

为了将结果发送回 Redpanda,我们在 Hazelcast 中创建一个 Jet 作业,将 SQL 查询结果存储到一个新的 Map 中,然后存储到 Redpanda 中:

CREATE OR REPLACE MAPPING recommender_pizzastream(
timestamp_ TIMESTAMP,
user_id VARCHAR,
extra1 VARCHAR,
extra2 VARCHAR,
extra3 VARCHAR,
pizza VARCHAR
)
TYPE Kafka
OPTIONS (
'keyFormat' = 'int',
'valueFormat' = 'json-flat',
'auto.offset.rest' = 'earliest',
'bootstrap.servers' = 'localhost:19092'
);

CREATE JOB recommender_job AS SINK INTO recommender_pizzastream SELECT
    pizzastream.timestamp_ as timestamp_,
    pizzastream.user_id AS user_id,
    recommender.extra1 as extra1,
    recommender.extra2 as extra2,
    recommender.extra3 as extra3,
    pizzastream.pizza AS pizza
FROM pizzastream
JOIN recommender
ON recommender.user_id = recommender.user_id 
AND recommender.extra2 = 'Soup';

结论

在这篇文章中,我们解释了如何使用 Redpanda 和 Hazelcast 构建披萨外卖服务。

Redpanda 通过将披萨订单作为高吞吐量流摄取、可靠地存储它们并允许 Hazelcast 以可扩展的方式使用它们来增加价值。一旦食用,Hazelcast 就会利用上下文数据丰富披萨订单(立即向用户推荐开胃菜),并将丰富的数据发送回 Redpanda。

Hazelcast 允许您快速构建资源高效的实时应用程序。您可以以任何规模部署它,从小型边缘设备到大型云实例集群。Hazelcast 节点集群共享数据存储和计算负载,可以动态扩展和缩减。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/880305.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SOLIDWORKS PDM—文件版本的管控

SOLIDWORKS产品数据管理 (PDM) 解决方案可帮助您控制设计数据,并且从本质上改进您的团队就产品开发进行管理和协作的方式。使用 SOLIDWORKS PDM Professional,您的团队能够:1. 安全地存储和索引设计数据以实现快速检索;2. 打消关于…

4WRZ25E3-220-5X/6A24NZ4/D3M不带位移反馈比例阀放大器

该先导阀是一个由比例电磁铁控制的三通减压阀,它的作用是将一个输入信号转化为一个与其成比例的压力输出信号,可用于所有的4WRZ...和5WRZ...型比例阀的控制。比例电磁铁是可调试,湿式直流电磁铁结构,带中心螺纹,线圈可…

淘宝搜索店铺列表API:关键字搜索店铺信息 获取店铺主页 店铺所在地 服务评级

接口名称:item_search_seller 基本功能介绍 该API可以通过传入关键字,获取到淘宝商城的店铺列表,支持翻页显示。指定参数page获取到指定页的数据。返回的店铺信息包括:店铺名、店铺ID、店铺主页、宝贝图片、掌柜名字、店铺所在地…

照明灯具哪个品牌好?护眼台灯该怎么选

现在儿童近视率越来越高了,用眼过度疲劳是导致近视的主要因素,学习环境的光线是否合适,都会直接影响用眼的疲劳程度。所以给孩子营造一个良好的学习环境非常重要!一款护眼台灯可以很好的预防近视,为大家推荐五款护眼台…

探索数字孪生的数据之美:实时、多源、多维的未来

在数字孪生的世界里,数据不再是孤立的数字,而是构成了一个真实、动态的虚拟映像,其独特的特点为现代社会带来了前所未有的机遇。 首先,数字孪生的数据特点之一是实时性。在制造业中,数字孪生可以通过实时传感器数据&am…

4WRAP6W7-08-30=G24K4/M=00比例先导阀控制放大器

先导控制阀是直动式比例阀。控制边的尺寸经过优化,可用作比例方向阀型号 4WRKE 的先导控制阀。 比例电磁铁为带可拆卸线圈的耐压密闭型湿式插脚交流线圈。 它们可将电流按比例转换为机械力。电流强度的增加会导致磁力相应增加。设定的磁力会在整个控制行程中保持不…

华为AI战略的CANN

基于TVM的华为昇腾体系中—— 异构计算架构(CANN)是对标英伟达的CUDA CuDNN的核心软件层,向上支持多种AI框架,向下服务AI处理器,发挥承上启下的关键作用,是提升昇腾AI处理器计算效率的关键平台 主要包括有…

Java SpringBoot Vue ERP系统

系统介绍 该ERP系统基于SpringBoot框架和SaaS模式,支持多租户,专注进销存财务生产功能。主要模块有零售管理、采购管理、销售管理、仓库管理、财务管理、报表查询、系统管理等。支持预付款、收入支出、仓库调拨、组装拆卸、订单等特色功能。拥有商品库存…

【网络基础】应用层协议

【网络基础】应用层协议 文章目录 【网络基础】应用层协议1、协议作用1.1 应用层需求1.2 协议分类 2、HTTP & HTTPS2.1 HTTP/HTTPS 简介2.2 HTTP工作原理2.3 HTTPS工作原理2.4 区别 3、URL3.1 编码解码3.2 URI & URL 4、HTTP 消息结构4.1 HTTP请求方法4.2 HTTP请求头信…

虹科干货 | 化身向量数据库的Redis Enterprise——快速、准确、高效的非结构化数据解决方案!

用户期望在他们遇到的每一个应用程序和网站都有搜索功能。然而,超过80%的商业数据是非结构化的,以文本、图像、音频、视频或其他格式存储。Redis Enterprise如何实现矢量相似性搜索呢?答案是,将AI驱动的搜索功能集成到Redis Enter…

聊聊计算机技术

目录 1.计算机的概念 2.计算机的发展过程 3.计算机的作用 4.计算机给人类带来的福利 1.计算机的概念 计算机是一种用于处理和存储数据的电子设备。它能够执行各种操作,比如计算、逻辑操作、数据存储和检索等。计算机由硬件和软件两部分组成。 计算机的硬件包括中…

SAP ABAP 直接把内表转换成PDF格式(smartform的打印函数输出OTF格式数据)

直接上代码: REPORT zcycle055.DATA: lt_tab TYPE TABLE OF zpps001. DATA: ls_tab TYPE zpps001.ls_tab-werks 1001. ls_tab-gamng 150.00. ls_tab-gstrp 20201202. ls_tab-aufnr 000010000246. ls_tab-auart 标准生产. ls_tab-gltrp 20201205. ls_tab-matn…

【史上最全】计算机的编年史

前几天我写算力简史的时候,顺便整理了一份计算机技术的编年史(将近一万字)。今天发给大家,以供参考。 1614年苏格兰人约翰纳皮尔(John Napier)发表了一篇论文,其中提到他发明了一种可以计算四则…

【JavaEE进阶】SpringBoot 日志

文章目录 一. 日志有什么用?二. 自定义日志打印1. 日志的使用与打印 三. 日志级别1. 日志级别有什么用?2. 日志级别的分类及使用 四. 日志持久化五. 更简单的日志输出---Lombok1. Lombok的使用2. lombok原理解释2.1 Lombok更多注解说明 一. 日志有什么用? 在Java中&#xf…

企业做直播时如何选择适合自己的直播平台?

企业做直播时如何选择适合自己的直播平台? 可以通过对比不同直播平台的技术能力、服务质量、安全性等方面的内容,选择最适合自己的直播平台。 企业做直播如何选择直播平台 我的文章推荐: [视频图文] 线上研讨会是什么,企业对内对…

python3装饰器理解与实战

前言 装饰器本质上是一个Python函数,它可以让其他函数在不需要做任务代码变动的前提下增加额外功能,装饰器的返回值也是一个函数对象。它经常用于有切面需求的场景,比如:插入日志、性能测试、事务处理、缓存、权限校验等场景。装…

Kubuesphere部署Ruoyi:持久化存储配置

按照如下教程配置NFS 先服务器&#xff1a;搭建 NFS 服务器 后客户端&#xff1a;安装 NFS Client 按照链接操作以后&#xff0c;在客户端上面把目录挂载到服务端 rootclient_banana:/# mount 172.25.110.41:/mnt/nfs_share /mnt/client_floder 客户端: mount <server-ip…

微服务系列文章之 Springboot+Vue实现登录注册

一、springBoot 创建springBoot项目 分为三个包&#xff0c;分别为controller&#xff0c;service&#xff0c; dao以及resource目录下的xml文件。 UserController.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 …

代理模式概述

1.代理模式概述 学习内容 1&#xff09;概述 为什么要有 “代理” &#xff1f; 生活中就有很多例子&#xff0c;比如委托业务&#xff0c;黄牛&#xff08;票贩子&#xff09;等等代理就是被代理者没有能力或者不愿意去完成某件事情&#xff0c;需要找个人代替自己去完成这…

地理数据的双重呈现:GIS与数据可视化

前一篇文章带大家了解了GIS与三维GIS的关系&#xff0c;本文就GIS话题带大家一起探讨一下GIS和数据可视化之间的关系。 GIS&#xff08;地理信息系统&#xff09;和数据可视化在地理信息科学领域扮演着重要的角色&#xff0c;它们之间密切相关且相互增强。GIS是一种用于采集、…