使用AWS Glue与AWS Kinesis构建的流式ETL作业(二)——数据处理

news2025/1/16 7:55:25

大纲

  • 2 数据处理
    • 2.1 架构
    • 2.2 AWS Glue连接和创建
      • 2.2.1 创建AWS RedShift连接
      • 2.2.2 创建AWS RDS连接(以PG为例)
    • 2.3 创建AWS Glue Job
    • 2.4 编写脚本
        • 2.4.1 以AWS RedShift为例
        • 2.4.2 以PG为例
    • 2.5 运行脚本

2 数据处理

2.1 架构

在这里插入图片描述

2.2 AWS Glue连接和创建

下文中提供了AWS RedShift和PG数据库的连接创建过程,在实际使用中我们可以二选一。

2.2.1 创建AWS RedShift连接

前置条件:需要有一个AWS RedShift。
AWS RedShift具体的创建过程本文不表。需要注意的是:RedShift需要创建一个终端节点,具体的方法请看《Glue连接RedShift的前置条件:创建终端节点》
由于Glue Job 在运行的时候,是在独立的服务器上,因此不能直接访问到私有子网中的服务。于是借助Glue连接,可以使得Job在运行时连接AWS服务。

步骤图例
1、创建连接在这里插入图片描述
2、输入连接名称,连接类型选择“Amazon Redshift”在这里插入图片描述
3、选择RedShift集群,输入配置的用户名密码在这里插入图片描述
4、审核在这里插入图片描述
5、测试连接在这里插入图片描述

2.2.2 创建AWS RDS连接(以PG为例)

步骤图例
1、入口在这里插入图片描述
2 、输入连接名称,连接类型选择“JDBC”在这里插入图片描述
3、输入连接PG的JDBC,语法请看使用连接,输入用户名密码,VPC和子网需要选择PG的VPC和子网。并且至少一个选定的安全组必须为所有 TCP 端口指定自引用入站规则在这里插入图片描述
4、检查无误后完成在这里插入图片描述

2.3 创建AWS Glue Job

步骤图例
1、入口在这里插入图片描述
2、若要在Job中引入其他python包,请在安全配置里面添加作业参数:–additional-python-modules:SQLAlchemy== 1.3.16,psycopg2-binary==2.8.5(值请自定义)在这里插入图片描述
3在这里插入图片描述

2.4 编写脚本

2.4.1 以AWS RedShift为例
import sys
import datetime
import boto3
import base64
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import *

import json
import datetime
import time

args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
sourceData = glueContext.create_data_frame.from_catalog( \
    database = "【你的数据库名称】", \ 
    table_name = "【表的名称】", \
    transformation_ctx = "datasource0", \
    additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "true"})
    
LINCAN_SHARE_PAGE = "lincan_share_page"


share_page_dot = [
    ("url","string","url","string"),
    ("ip","string","ip","string"),
    ("country","string","country","string"),
    ("city","string","city","string"),
    ("user_id","string","user_id","string"),  
    ("dot_type","string","dot_type","string"), 
    ("header","string","header","string"),  
    ("header_appname","string","header_appname","string"),  
    ("header_ismobile","string","header_ismobile","string"),  
    ("header_appversion","string","header_appversion","string"),  
    ("header_useragent","string","header_useragent","string"),
    ("content","string","content","string"),  
    ("content_url","string","content_url","string"),  
    ("content_index","string","content_index","string"),  
    ("content_title","string","content_title","string"),  
    ("create_time","string","create_time","timestamp")
    ]

def get_connect(table):
    connection_options = {
        "url": "jdbc:redshift://【RedShift终端节点】:5439/dev",
        "user": "【RedShift用户名】",
        'database': '【数据库】', 
        "password": "【密码】",
        "dbtable":table,
        "redshiftTmpDir":  args["TempDir"]
        } 
    return connection_options

class Handle:
    def __init__(self,dynamic_frame):
        self._time_now = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
        self.dynamic_frame = dynamic_frame
        
    def run(self):
        raise


class SharePageDot(Handle):
    def run(self):
        print("start share_page dot")
        def handle(rec):
            message = eval(rec["logEvents.val.message"])
            message["header_appname"] = message["header"]["appName"]
            message["header_ismobile"] = message["header"]["isMobile"]
            message["header_appversion"] = message["header"]["appVersion"]
            message["header_useragent"] = message["header"]["userAgent"]
            message["header"] = json.dumps(message["header"])
            
            message["content_url"] = message["content"]["url"]
            message["content_index"] = message["content"]["index"]
            message["content_title"] = message["content"]["title"]
            message["content"] = json.dumps(message["content"])
            message["create_time"] = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
            # message["create_time"] = time.time()
            return message
        self.dynamic_frame.printSchema()
        mapped_dyF =  Map.apply(frame = self.dynamic_frame, f = handle)
        mapped_dyF.printSchema()
        if not mapped_dyF:
            return
        applymapping0 = ApplyMapping.apply(frame = mapped_dyF, mappings = share_page_dot, transformation_ctx = "applymapping0")
        datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = applymapping0, catalog_connection = "redshift", connection_options = get_connect("share_page_dot"),redshift_tmp_dir= args["TempDir"], transformation_ctx="datasink1")
        print("end share_page dot")

def processBatch(data_frame, batchId):
    if (data_frame.count() > 0):
        print("start")
        logEvents = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame").select_fields("logEvents")
        dyf_relationize  = logEvents.relationalize("logEvents",args["TempDir"]+"/relationalize")
        
        dyf_selectFromCollection = SelectFromCollection.apply(dyf_relationize, 'logEvents_logEvents')
        
        # 筛选
        sac_or_mon_dyF = Filter.apply(frame = dyf_selectFromCollection, f = lambda x: x["logEvents.val.id"]!="")
        # page dot
        page_dot = Filter.apply(frame = sac_or_mon_dyF, f = lambda x: eval(x["logEvents.val.message"])["dot_type"]==LINCAN_PAGE_DOT)
        # active dot
        active_dot = Filter.apply(frame = sac_or_mon_dyF, f = lambda x: eval(x["logEvents.val.message"])["dot_type"]==LINCAN_ACTIVE_DOT)
        # share page dot
        share_page = Filter.apply(frame = sac_or_mon_dyF, f = lambda x: eval(x["logEvents.val.message"])["dot_type"]==LINCAN_SHARE_PAGE)
        
        #if page_dot.count()>0 :
        #    PageDot(page_dot).run()
        #if active_dot.count()>0:
         #   ActiveDot(active_dot).run()
        if share_page.count()>0:
            SharePageDot(share_page).run()
        print("end")


glueContext.forEachBatch(frame = sourceData, batch_function = processBatch, options = {"windowSize": "100 seconds",  "checkpointLocation": args["TempDir"] + "/checkpoint/"})
job.commit()
2.4.2 以PG为例

说明:在此脚本中,引入了python其他的包。写入PG使用的是sqlalchemy,是为了实现有则更新,无则写入的操作。若无特殊要求,可参考 “2.4.1”

import sys
import datetime
import boto3
import base64
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import *
import boto3
import json
import datetime
import time
import sqlalchemy
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.dialects import postgresql
import threading
import copy
import datetime
args = getResolvedOptions(sys.argv, ['TempDir', 'JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

sourceData = glueContext.create_data_frame.from_catalog( \
    database = "【你的数据库名称】", \ 
    table_name = "【表的名称】", \
    transformation_ctx = "datasource0", \
    additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "true"})


class DBWrite:
    url = 'postgresql+psycopg2://【user+passsword@PG终端节点】:5432/crowd'
    base = None
    session = None
    engine = None
    @classmethod
    def execute(cls):

        cls.engine = sqlalchemy.create_engine(cls.url)
        metadata = sqlalchemy.schema.MetaData(bind=cls.engine)
        metadata.reflect(cls.engine, schema = "public")
        cls.base = automap_base(metadata = metadata)
        cls.base.prepare()
        cls.session = sessionmaker(bind = cls.engine)
    

DBWrite.execute()
def processBatch(data_frame, batchId):
    if (data_frame.count() > 0):
        print("start",str(datetime.datetime.now()))
        session = DBWrite.session()

        def save_to_pg(row):
            row = json.loads(row)
            insert_data = {
                "create_time": row["create_time"],
                "update_time": row["update_time"],
                "valid": True,
                "crowd_type": row["crowd_type"],
                "name": row["name"],
                "support_num": row["support_num"],
                "target_amount": row["target_amount"],
                "status": row["status"],
                "surplus_day": row["surplus_day"],
                "crowd_category": row["crowd_category"],
                "current_amount": row["current_amount"],
                "address": row["address"],
                "author": row["author"],
                "image": row["image"],
                "comment_num": row["comment_num"],
                "create_data": row["create_time"][:-9],
                "unique_key":row["unique_key"]
            }
            update_data = copy.deepcopy(insert_data)
            del update_data["create_time"]
            del update_data["create_data"]
            
            Crowd = DBWrite.base.classes.crowd_crowd
            insert_stmt = insert(Crowd).values(**insert_data)
            insert_stmt = insert_stmt.on_conflict_do_update(
                index_elements = ["unique_key"],
                set_ = update_data
            )
            session.execute(insert_stmt)

        def handle(rec):
            message = rec["logEvents.val.message"]
            index = message.find("|312F14DS|")
            message = message[index+10:]
            message = message.replace(
                ": true", ": True").replace(": false", ": False")
            message = eval(message)
            return message

        logEvents = DynamicFrame.fromDF(
            data_frame, glueContext, "from_data_frame").select_fields("logEvents")
        dyf_relationize = logEvents.relationalize(
            "logEvents", args["TempDir"]+"/relationalize")

        dyf_selectFromCollection = SelectFromCollection.apply(
            dyf_relationize, 'logEvents_logEvents')

        # 筛选
        sac_or_mon_dyF = Filter.apply(
            frame=dyf_selectFromCollection, f=lambda x: x["logEvents.val.id"] != "")

        mapped_dyF = Map.apply(frame=sac_or_mon_dyF, f=handle)
        

        mapped_dyF = mapped_dyF.toDF()
        for info in (mapped_dyF.toJSON().take(mapped_dyF.toJSON().count()+1)):
            save_to_pg(info)
        session.commit()
        session.close()
        print("end",str(datetime.datetime.now()))

glueContext.forEachBatch(frame=sourceData, batch_function=processBatch, options={
                         "windowSize": "100 seconds",  "checkpointLocation": args["TempDir"] + "/checkpoint/"})
job.commit()

2.5 运行脚本

我们创建的是Spark Stream 类型的Job,因此Job会一直运行。定时的从AWS Kinesis Data Stream中获取数据进行微批量处理。
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1287574.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DSSS技术和OFDM技术

本内容为学习笔记,内容不一定正确,请多处参考进行理解 https://zhuanlan.zhihu.com/p/636853588 https://baike.baidu.com/item/OFDM/5790826?frge_ala https://zhuanlan.zhihu.com/p/515701960?utm_id0 一、 DSSS技术 信号替代:DSSS技术为…

谈一谈内存池

文章目录 一,什么是内存池二,进程地址空间中是如何解决内存碎片问题的三,malloc的实现原理四,STL中空间配置器的实现原理五,高并发内存池该内存池的优势在哪里内存池的设计框架内存申请流程ThreadCache层CentreCache层…

论文阅读:一种通过降低噪声和增强判别信息实现细粒度分类的视觉转换器

论文标题: A vision transformer for fine-grained classification by reducing noise and enhancing discriminative information 翻译: 一种通过降低噪声和增强判别信息实现细粒度分类的视觉转换器 摘要 最近,已经提出了几种基于Vision T…

Linux 中用户与权限

1.添加用户 useradd 1)创建用户 useradd 用户名 2)设置用户密码 passwd 用户名 设置密码是便于连接用户时使用到,如我使用物理机链接该用户 ssh 用户名 ip 用户需要更改密码的话,使用 passwd 指令即可 3)查看用户信息 id 用…

【数据结构(六)】希尔排序、快速排序、归并排序、基数排序的代码实现(3)

文章目录 1. 希尔排序1.1. 简单插入排序存在的问题1.2. 相关概念1.3. 应用实例1.3.1. 交换法1.3.1.1. 逐步推导实现方式1.3.1.2. 通用实现方式1.3.1.3. 计算时间复杂度 1.3.2. 移动法 2. 快速排序2.1. 相关概念2.2. 实例应用2.2.1. 思路分析2.2.2. 代码实现 2.3. 计算快速排序的…

Spatial Data Analysis(三):点模式分析

Spatial Data Analysis(三):点模式分析 ---- 1853年伦敦霍乱爆发 在此示例中,我将演示如何使用 John Snow 博士的经典霍乱地图在 Python 中执行 KDE 分析和距离函数。 感谢 Robin Wilson 将所有数据数字化并将其转换为友好的 G…

(04730)电路分析基础之电阻、电容及电感元件

04730电子技术基础 语雀(完全笔记) 电阻元件、电感元件和电容元件的概念、伏安关系,以及功率分析是我们以后分析电 路的基础知识。 电阻元件 电阻及其与温度的关系 电阻 电阻元件是对电流呈现阻碍作用的耗能元件,例如灯泡、…

基于STM32驱动的压力传感器实时监测系统

本文介绍了如何使用STM32驱动压力传感器进行实时监测。首先,我们会介绍压力传感器的工作原理和常见类型。然后,我们将介绍如何选择合适的STM32单片机和压力传感器组合。接下来,我们会详细讲解如何使用STM32驱动压力传感器进行数据采集和实时监…

根文件系统软件运行测试

一. 简介 前面几篇文章学习了制作一个可以在开发板上运行的,简单的根文件系统。 本文在上一篇文章学习的基础上进行的,文章地址如下: 完善根文件系统-CSDN博客 本文对根文件系统软件运行进行测试。 我们使用 Linux 的目的就是运行我们自…

vue3 setup语法糖 多条件搜索(带时间范围)

目录 前言: setup介绍: setup用法: 介绍: 前言: 不管哪个后台管理中都会用到对条件搜索带有时间范围的也不少见接下来就跟着我步入vue的多条件搜索(带时间范围) 在 Vue 3 中,你…

[JavaScript前端开发及实例教程]计算器井字棋游戏的实现

计算器&#xff08;网页内实现效果&#xff09; HTML部分 <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>My Calculator&l…

Unity3D对CSV文件操作(创建、读取、写入、修改)

系列文章目录 Unity工具 文章目录 系列文章目录前言一、Csv是什么&#xff1f;二、创建csv文件2-1、构建表数据2-2、创建表方法2-3、完整的脚本&#xff08;第一种方式&#xff09;2-4、运行结果2-5、完整的脚本&#xff08;第二种方式&#xff09;2-6、运行结果2-7、想用哪种…

基于STM32 HAL库的光电传感器驱动程序实例

本文将使用STM32 HAL库编写一个光电传感器的驱动程序示例。首先&#xff0c;我们会介绍光电传感器的工作原理和应用场景。然后&#xff0c;我们将讲解如何选择合适的STM32芯片和光电传感器组合。接下来&#xff0c;我们会详细介绍使用STM32 HAL库编写光电传感器驱动程序的基本步…

AVFormatContext封装层:理论与实战

文章目录 前言一、封装格式简介1、FFmpeg 中的封装格式2、查看 FFmpeg 支持的封装格式 二、API 介绍三、 实战 1&#xff1a;解封装1、原理讲解2、示例源码 13、运行结果 14、示例源码 25、运行结果 2 三、 实战 2&#xff1a;转封装1、原理讲解2、示例源码3、运行结果 前言 A…

Docker中部署ElasticSearch 和Kibana,用脚本实现对数据库资源的未授权访问

图未保存&#xff0c;不过文章当中的某一步骤可能会帮助到您&#xff0c;那么&#xff1a;感恩&#xff01; 1、docker中拉取镜像 #拉取镜像 docker pull elasticsearch:7.7.0#启动镜像 docker run --name elasticsearch -d -e ES_JAVA_OPTS"-Xms512m -Xmx512m" -e…

删除误提交的 git commit

背景描述 某次的意外 commit 中误将密码写到代码中并且 push 到了 remote repo 里面, 本文将围绕这个场景讨论如何弥补. 模拟误提交操作 在 Gitee 创建一个新的 Repo, clone 到本地 git clone https://gitee.com/lpwm/myrepo.git创建两个文件, commit 后 push 到 remote 作…

JSON 语法详解:轻松掌握数据结构(下)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

CSS中 设置文字下划线 的几种方法

在网页设计和开发中&#xff0c;我们经常需要对文字进行样式设置&#xff0c;包括字体,颜色&#xff0c;大小等&#xff0c;其中&#xff0c;设置文字下划线是一种常见需求 一 、CSS种使用 text-decoration 属性来设置文字的装饰效果&#xff0c;包括下划线。 常用的取值&…

JFrog----基于Docker方式部署JFrog

文章目录 1 下载镜像2 创建数据挂载目录3 启动 JFrog服务4 浏览器登录5 重置密码6 设置 license7 设置 Base URL8 设置代理9 选择仓库类型10 预览11 查看结果 1 下载镜像 免费版 docker pull docker.bintray.io/jfrog/artifactory-oss体验版&#xff1a; docker pull releas…

【网络奇缘】- 如何自己动手做一个五类|以太网|RJ45|网络电缆

​ ​ &#x1f308;个人主页: Aileen_0v0&#x1f525;系列专栏: 一见倾心,再见倾城 --- 计算机网络~&#x1f4ab;个人格言:"没有罗马,那就自己创造罗马~" 本篇文章关于计算机网络的动手小实验---如何自己动手做一个网线&#xff0c; 也是为后面的物理层学习进…