使用 Kafka 和 CDC 将数据从 MongoDB Atlas 流式传输到 SingleStore Kai

news2025/1/22 22:50:31

SingleStore 提供了变更数据捕获 (CDC) 解决方案,可将数据从 MongoDB 流式传输到 SingleStore Kai。在本文中,我们将了解如何将 Apache Kafka 代理连接到 MongoDB Atlas,然后使用 CDC 解决方案将数据从 MongoDB Atlas 流式传输到 SingleStore Kai。我们还将使用 Metabase 为 SingleStore Kai 创建一个简单的分析仪表板。

介绍

CDC 是一种跟踪数据库或系统中发生的更改的方法。SingleStore 现在提供了与 MongoDB 配合使用的 CDC 解决方案。

为了演示 CDC 解决方案,我们将使用Kafka 代理将数据流式传输到 MongoDB Atlas 集群,然后使用 CDC 管道将数据从 MongoDB Atlas 传播到 SingleStore Kai。我们还将使用 Metabase 创建一个简单的分析仪表板。

图 1 显示了我们系统的高级架构。

高层架构

图 1. 高级架构(来源:SingleStore)。

我们将在以后的文章中重点介绍使用 CDC 解决方案的其他场景。

MongoDB Atlas

我们将在 M0 沙箱中使用 MongoDB Atlas。我们将在Database Access下配置具有atlasAdmin权限的管理员用户。我们将暂时允许从网络访问下的任何地方(IP 地址 0.0.0.0/0)进行访问。我们将记下用户名密码主机

Apache Kafka

我们将配置 Kafka 代理将数据流式传输到MongoDB Atlas中。我们将使用 Jupyter Notebook 来实现此目的。

首先,我们将安装一些库:

!pip install pymongo kafka-python --quiet

接下来,我们将连接到 MongoDB Atlas 和 Kafka 代理:

from kafka import KafkaConsumer
from pymongo import MongoClient

try:
    client = MongoClient("mongodb+srv://<username>:<password>@<host>/?retryWrites=true&w=majority")
    db = client.adtech
    print("Connected successfully")
except:
    print("Could not connect")

consumer = KafkaConsumer(
    "ad_events",
    bootstrap_servers = ["public-kafka.memcompute.com:9092"]

我们将用我们之前从 MongoDB Atlas 保存的值替换<username>,<password>和。<host>

最初,我们将 100 条记录加载到 MongoDB Atlas 中,如下所示:

MAX_ITERATIONS = 100

for iteration, message in enumerate(consumer, start = 1):
    if iteration > MAX_ITERATIONS:
        break

    try:
        record = message.value.decode("utf-8")
        user_id, event_name, advertiser, campaign, gender, income, page_url, region, country = map(str.strip, record.split("\t"))

        events_record = {
            "user_id": int(user_id),
            "event_name": event_name,
            "advertiser": advertiser,
            "campaign": int(campaign.split()[0]),
            "gender": gender,
            "income": income,
            "page_url": page_url,
            "region": region,
            "country": country
        }

        db.events.insert_one(events_record)
    except Exception as e:
        print(f"Iteration {iteration}: Could not insert data - {str(e)}")

数据应该成功加载,我们应该看到一个名为 的数据库,adtech其中包含一个名为 的集合events。集合中的文档在结构上应类似于以下示例:

_id: ObjectId('64ec906d0e8c0f7bcf72a8ed')
user_id: 3857963415
event_name: "Impression"
advertiser: "Sherwin-Williams"
campaign: 13
gender: "Female"
income: "25k and below",
page_url: "/2013/02/how-to-make-glitter-valentines-heart-boxes.html/"
region: "Michigan"
country: "US"
这些文档代表广告活动事件。该events集合存储 的详细信息advertiser以及campaign有关用户的各种人口统计信息,例如genderincome

SingleStore Kai

上一篇文章介绍了创建免费 SingleStoreDB 云帐户的步骤。我们将使用以下设置:

  • 工作区组名称: CDC 演示组
  • 云提供商: AWS
  • 区域:美国东部 1(弗吉尼亚北部)
  • 工作区名称: cdc-demo
  • 尺码: S-00
  • 设置:
    - SingleStore Kai 选择

一旦工作区可用,我们将记下密码主机该主机可从CDC Demo Group > Overview > Workspaces > cdc-demo > Connect > Connect Directly > SQL IDE > Host获取。稍后我们将需要元数据库的此信息。我们还将通过在CDC 演示组 > 防火墙下配置防火墙来暂时允许从任何地方进行访问。

从左侧导航窗格中,我们选择DEVELOP > SQL Editor来创建adtech数据库link,如下所示:

CREATE DATABASE IF NOT EXISTS adtech;
USE adtech;

DROP LINK adtech.link;

CREATE LINK adtech.link AS MONGODB
CONFIG '{"mongodb.hosts": "<primary>:27017, <secondary>:27017, <secondary>:27017",
        "collection.include.list": "adtech.*",
        "mongodb.ssl.enabled": "true",
        "mongodb.authsource": "admin",
        "mongodb.members.auto.discover": "false"}'
CREDENTIALS '{"mongodb.user": "<username>",
            "mongodb.password": "<password>"}';

CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK adtech.link '*' FORMAT AVRO;
我们将用我们之前从 MongoDB Atlas 保存的值替换<username>和。<password>我们还需要将<primary><secondary>和的值替换<secondary>为 MongoDB Atlas 中每个值的完整地址。

我们现在将检查是否有任何表,如下所示:

SHOW TABLES;

这应该显示一张名为events

+------------------+
| Tables_in_adtech |
+------------------+
| events           |
+------------------+

我们将检查表的结构:

DESCRIBE events;

输出应如下所示:

+-------+------+------+------+---------+-------+
| Field | Type | Null | Key  | Default | Extra |
+-------+------+------+------+---------+-------+
| _id   | text | NO   | UNI  | NULL    |       |
| _more | JSON | NO   |      | NULL    |       |
+-------+------+------+------+---------+-------+

接下来,我们将检查是否有pipelines

SHOW PIPELINES;

这将显示events当前调用的一个管道Stopped

+---------------------+---------+-----------+
| Pipelines_in_adtech | State   | Scheduled |
+---------------------+---------+-----------+
| events              | Stopped | False     |
+---------------------+---------+-----------+

现在我们将启动events管道:

START ALL PIPELINES;

并且状态应更改为Running

+---------------------+---------+-----------+
| Pipelines_in_adtech | State   | Scheduled |
+---------------------+---------+-----------+
| events              | Running | False     |
+---------------------+---------+-----------+

如果我们现在运行以下命令:

SELECT COUNT(*) FROM events;

它应该返回 100 作为结果:

+----------+
| COUNT(*) |
+----------+
|      100 |
+----------+

我们将检查表中的一行events,如下所示:

SELECT * FROM events LIMIT 1;

输出应类似于以下内容:

+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| _id                                  | _more                                                                                                                                                                                                                                                                   |
+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| {"$oid": "64ec906d0e8c0f7bcf72a8f7"} | {"_id":{"$oid":"64ec906d0e8c0f7bcf72a8f7"},"advertiser":"Wendys","campaign":13,"country":"US","event_name":"Click","gender":"Female","income":"75k - 99k","page_url":"/2014/05/flamingo-pop-bridal-shower-collab-with.html","region":"New Mexico","user_id":3857963416} |
+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

CDC 解决方案已成功连接到 MongoDB Atlas 并将所有 100 条记录复制到 SingleStore Kai。

现在让我们使用 Metabase 创建一个仪表板。

元数据库

上一篇文章描述了如何安装、配置和创建元数据库连接的详细信息。我们将使用前一篇文章中使用的查询的细微变化来创建可视化。

1. 活动总数

SELECT COUNT(*) FROM events;

2. 各地区活动

SELECT _more::country AS `events.country`, COUNT(_more::country) AS 'events.countofevents'
FROM adtech.events AS events
GROUP BY 1;

3. Top 5 广告商活动

SELECT _more::advertiser AS `events.advertiser`, COUNT(*) AS `events.count`
FROM adtech.events AS events
WHERE (_more::advertiser LIKE '%Subway%' OR _more::advertiser LIKE '%McDonals%' OR _more::advertiser LIKE '%Starbucks%' OR _more::advertiser LIKE '%Dollar General%' OR _more::advertiser LIKE '%YUM! Brands%' OR _more::advertiser LIKE '%Dunkin Brands Group%')
GROUP BY 1
ORDER BY `events.count` DESC;

4. 按性别和收入划分的广告访问者

SELECT *
FROM (SELECT *, DENSE_RANK() OVER (ORDER BY xx.z___min_rank) AS z___pivot_row_rank, RANK() OVER (PARTITION BY xx.z__pivot_col_rank ORDER BY xx.z___min_rank) AS z__pivot_col_ordering, CASE
        WHEN xx.z___min_rank = xx.z___rank THEN 1
        ELSE 0
      END AS z__is_highest_ranked_cell
    FROM (SELECT *, Min(aa.z___rank) OVER (PARTITION BY aa.`events.income`) AS z___min_rank
        FROM (SELECT *, RANK() OVER (ORDER BY CASE
                WHEN bb.z__pivot_col_rank = 1 THEN (CASE
                    WHEN bb.`events.count` IS NOT NULL THEN 0
                    ELSE 1
                  END)
                ELSE 2
              END, CASE
                WHEN bb.z__pivot_col_rank = 1 THEN bb.`events.count`
                ELSE NULL
              END DESC, bb.`events.count` DESC, bb.z__pivot_col_rank, bb.`events.income`) AS z___rank
            FROM (SELECT *, DENSE_RANK() OVER (ORDER BY CASE
                    WHEN ww.`events.gender` IS NULL THEN 1
                    ELSE 0
                  END, ww.`events.gender`) AS z__pivot_col_rank
                FROM (SELECT _more::gender AS `events.gender`, _more::income AS `events.income`, COUNT(*) AS `events.count`
                    FROM adtech.events AS events
                    WHERE (_more::income <> 'unknown' OR _more::income IS NULL)
                    GROUP BY 1, 2) ww) bb
            WHERE bb.z__pivot_col_rank <= 16384) aa) xx) zz
WHERE (zz.z__pivot_col_rank <= 50 OR zz.z__is_highest_ranked_cell = 1) AND (zz.z___pivot_row_rank <= 500 OR zz.z__pivot_col_ordering = 1)
ORDER BY zz.z___pivot_row_rank;

图 2 显示了 AdTech 仪表板上图表大小和位置的示例。我们将自动刷新选项设置为 1 分钟。

图 2.最终仪表板。

图 2.最终仪表板。

如果我们通过更改 使用 Jupyter Notebook 将更多数据加载到 MongoDB Atlas 中  MAX_ITERATIONS,我们将看到数据传播到 SingleStore Kai 以及 AdTech 仪表板中反映的新数据。

总结

在本文中,我们创建了一个 CDC 管道,以使用 SingleStore Kai 增强 MongoDB Atlas。正如多个基准测试所强调的那样,SingleStore Kai 因其卓越的性能而可用于分析。我们还使用 Metabase 创建了一个快速的可视化仪表板,以帮助我们深入了解我们的广告活动。


作者:Akmal Chaudhri ​

更多技术干货请关注公号【云原生数据库

squids.cn,云数据库RDS,迁移工具DBMotion,云备份DBTwin等数据库生态工具。

irds.cn,多数据库管理平台(私有云)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1360246.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SparkStreaming基础解析(四)

1、 Spark Streaming概述 1.1 Spark Streaming是什么 Spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多&#xff0c;例如&#xff1a;Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如&#xff1a;map、…

网络调试 UDP1,开发板用静态地址-入门5

https://www.bilibili.com/video/BV1zx411d7eC?p11&vd_source109fb20ee1f39e5212cd7a443a0286c5 1, 开发板连接路由器 1.1&#xff0c;烧录无OS UDP例程 1.2&#xff0c;Mini USB连接电脑 1.3&#xff0c;开发板LAN接口连接路由器 2. Ping开发板与电脑之间通信* 2.1 根据…

Redis 教程

Redis 简介 Redis 是完全开源的&#xff0c;遵守 BSD 协议&#xff0c;是一个高性能的 key-value 数据库。 Redis 与其他 key - value 缓存产品有以下三个特点&#xff1a; Redis支持数据的持久化&#xff0c;可以将内存中的数据保存在磁盘中&#xff0c;重启的时候可以再次…

智能分析网关V4太阳能风光互补远程视频智能监控方案

一、背景需求 在一些偏远地区&#xff0c;也具有视频监控的需求。但是这类场景中&#xff0c;一般无法就近获取市电&#xff0c;如果要长距离拉取市电&#xff0c;建设的成本非常高且长距离传输有安全隐患&#xff0c;因此风光互补远程视频监控方案的需求也较多。利用风光电转化…

Reids在Win下无法远程访问

1.将redis在windows上启动主要做了以下配置 1.1.在redis.windows.conf中修改一下 原&#xff1a;bind 127.0.0.1 改&#xff1a;# bind 127.0.0.1 bind 0.0.0.0 原&#xff1a;protected-mode yes 改&#xff1a;protected-mode no去掉了127.0.0.1&#xff0c;加入0.0.0.0后&…

[C#]winform部署PaddleDetection的yolo印章检测模型

【官方框架地址】 https://github.com/PaddlePaddle/PaddleDetection.git 【算法介绍】 PaddleDetection 是一个基于 PaddlePaddle&#xff08;飞桨&#xff09;深度学习框架的开源目标检测工具库。它提供了一系列先进的目标检测算法&#xff0c;包括但不限于 Faster R-CNN, …

C ++类

定义一个Person类&#xff0c;私有成员int age&#xff0c;string &name&#xff0c;定义一个Stu类&#xff0c;包含私有成员double *score&#xff0c;写出两个类的构造函数、析构函数、拷贝构造和拷贝赋值函数&#xff0c;完成对Person的运算符重载(算术运算符、条件运算…

Linux———cat命令详解

目录 cat 命令是 Linux 中用于查看文件的内容或将多个文件合并输出。 基本语法&#xff1a; 常用选项&#xff1a; 示例用法&#xff1a; 查看文件的内容&#xff1a; ​编辑 将多个文件的内容合并输出&#xff1a; ​编辑 显示每一行的行号&#xff1a; ​编辑 显示非…

使用CentOS 7.6搭建HTTP隧道代理服务器

在现代网络环境中&#xff0c;HTTP隧道代理服务器因其灵活性和安全性而受到广泛关注。CentOS 7.6&#xff0c;作为一个稳定且功能强大的Linux发行版&#xff0c;为搭建此类服务器提供了坚实的基础。 首先&#xff0c;我们需要明确HTTP隧道代理的基本原理。HTTP隧道代理允许客户…

为什么云性能监控如此重要?

在当今数字化时代&#xff0c;企业越来越依赖云服务来支持其业务需求。为了确保云服务的可用性、性能和稳定性&#xff0c;云性能监控成为管理和优化云基础架构的关键一环。那么&#xff0c;为什么云性能监控如此重要?下面&#xff0c;就来看看具体介绍吧! 一、实时故障检测 云…

6 网关和配置服务器

文章目录 网关模式Spring Cloud网关Spring Cloud网关微服务其他项目的变更运行和测试小结 运行状况Spring Boot Actuator在微服务中包含Actuator 服务发现和负载均衡ConsulSpring Cloud ConsulSpring Cloud负载均衡器网关中的服务发现和负载均衡使用服务发现和负载均衡 环境配置…

c++ spdlog日志系统

非常好用的日志系统 最近用oatpp写webapi&#xff0c;但他的日志只是显示在控制台&#xff0c;并不记录到文件。 做接口的&#xff0c;肯定要记录错误日志&#xff0c;好查找问题 于是用spdlog&#xff0c;不用编译dll或lib&#xff0c; include 头文件就直接使用了&#x…

【六大排序详解】终篇 :冒泡排序 与 快速排序

终篇 :冒泡排序 与 快速排序 1 冒泡排序1.1 冒泡排序原理1.2 排序步骤1.3 代码实现 2 快速排序2.1 快速排序原理2.1.1 Hoare版本代码实现 2.1.2 hole版本代码实现 2.1.3 前后指针法代码实现 2.1.4 注意取中位数局部优化 2.1.5 非递归版本非递归原理代码实现 2.2 特性总结 谢谢阅…

springboot、spring-kafka、kafka-client的版本对应关系

在使用springboot集成kafka的时候需要注意springboot版本、引用的依赖spring-kafka版本和kafka中间件版本的对应关系&#xff0c;否则可能会因为版本不兼容导致出现错误。 1、含义说明&#xff08;摘自官网&#xff09; Spring Boot&#xff1a;是springboot的版本。Spring fo…

Element ui 改变el-transfer 穿梭框的大小

修改el-transfer 左右两个穿梭框的高度和宽度&#xff0c;具体效果如下正常大小的穿梭框修改之后的&#xff0c;主要在style中加上如下样式即可 /deep/ .el-transfer-panel{ width: 470px; /* 左右两个穿梭框的高度和宽度 */ height: 450px; } /deep/ .el-transfer-panel__li…

C语言编译器(C语言编程软件)完全攻略(第三十部分:Xcode简明教程(使用Xcode编写C语言程序))

介绍常用C语言编译器的安装、配置和使用。 三十、Xcode简明教程&#xff08;使用Xcode编写C语言程序&#xff09; 在 Mac OS X 下学习C语言使用 Xcode。Xcode 是由Apple官方开发的IDE&#xff0c;支持C、C、Objective-C、Swift等&#xff0c;可以用来开发 Mac OS X 和 iOS 上…

微信小程序使用mqtt开发可以,真机不行

以下可以解决我的问题&#xff0c;请一步一步跟着做&#xff0c;有可能版本不一样就失败了 一、下载mqtt.js 前往蓝奏云 https://wwue.lanzouo.com/iQPdc1k50hpe 下载好后将.txt改为.js 然后放入项目里 二、连接mqtt const mqtt require(../../utils/mqtt.min); let cli…

NB三人组(堆排序,归并排序,快速排序)(数据结构课设篇2,python版)(排序综合)

本篇博客主要详细讲解一下NB三人组排序&#xff0c;为什么叫NB三人组呢&#xff1f;因为他们的时间复杂度都为O(n log n)。第一篇博客讲解的是LowB三人组&#xff08;冒泡排序&#xff0c;插入排序&#xff0c;选择排序&#xff09;&#xff0c;第三篇博客会讲解其他排序&#…

Jenkins分布式实现: 构建弹性和可扩展的CI/CD环境!

Jenkins是一个流行的开源持续集成&#xff08;Continuous Integration&#xff0c;CI&#xff09;和持续交付&#xff08;Continuous Delivery&#xff0c;CD&#xff09;工具&#xff0c;它通过自动化构建、测试和部署过程&#xff0c;帮助开发团队更高效地交付软件。Jenkins的…

Linux——系统安全及应用

一、基本安全措施 1、系统账号清理 常见的非登录用户账号包括bin、daemon、 adm、lp、mail等。为了确保系统安全&#xff0c;这些用户账号的登录Shell通常是/ sbin/nologin&#xff0c;表示禁止终端登录&#xff0c;应确保不被人为改动。 //将非登陆用户的Shell设为/sbin/nolo…