文章目录
- aws(学习笔记第三十三课) 深入使用cdk
- 学习内容:
- 1. 使用`aws athena`
- 1.1 什么是`aws athena`
- 1.2 什么是`aws glue`
- 1.2 为什么`aws athena`和`aws glue`一起使用
- 2. 开始练习`aws athena`
- 2.1 代码链接
- 2.2 整体架构
- 2.3 代码解析
- 2.3.1 创建测试数据的`S3 bucket`
- 2.3.2 创建保存查询结果的`S3 bucket`
- 2.3.3 将示例的程序`json`数据文件同期到`S3 bucket`
- 2.3.4 创建`aws glue`的`cfnDatabase`
- 2.3.5 创建`aws glue crawler`需要的权限`Role`
- 2.3.6 创建`aws glue crawler`
- 2.3.7 创建`aws athena work group`
- 2.3.8 创建`aws athena query`
- 2.3.9 调整执行顺序
- 2.4 开始执行`aws cdk for athena`
- 2.4.1 执行部署
- 2.4.2 执行`crawler`爬虫
- 2.4.3 查看`aws athena`的`queries`
- 2.4.4 执行`aws athena`的`queries`
- 2.4.5 查看`aws athena`的`queries`执行结果
aws(学习笔记第三十三课) 深入使用cdk
- 使用
cdk
生成athena
以及aws glue crawler
学习内容:
- 使用
aws athena
+aws glue crawler
1. 使用aws athena
1.1 什么是aws athena
aws athena
是aws
提供的数据分析service
,可以使用SQL
语言对S3
上保存的数据进行分析。
managed service
,所以不需要维护。- 基于
OpenSource
的框架构筑 - 基于处理的数据量进行收费
- 对数据提供加密功能
注意 和RDB
不能进行JOIN操作,所以只能提供对csv
和json
进行数据查询
1.2 什么是aws glue
aws glue
是aws
提供的managed ETL service
。能够简单的进行分析数据的准备和load
。table
和schema
关联的metadata
能够作为aws glue catalog data
进行保存。
1.2 为什么aws athena
和aws glue
一起使用
aws athena
结合aws glue
能够将aws glue
作成的database
或者schema
,使用aws athena
进行查询。
2. 开始练习aws athena
2.1 代码链接
代码链接aws-cdk-examples
2.2 整体架构
2.3 代码解析
2.3.1 创建测试数据的S3 bucket
# creating the buckets where the logs will be placed
logs_bucket = s3.Bucket(self, 'logs-bucket',
bucket_name=f"auditing-logs-{self.account}",
removal_policy=RemovalPolicy.DESTROY,
auto_delete_objects=True
)
2.3.2 创建保存查询结果的S3 bucket
# creating the bucket where the queries output will be placed
query_output_bucket = s3.Bucket(self, 'query-output-bucket',
bucket_name=f"auditing-analysis-output-{self.account}",
removal_policy=RemovalPolicy.DESTROY,
auto_delete_objects=True
)
2.3.3 将示例的程序json
数据文件同期到S3 bucket
# uploading the log files to the bucket as examples
s3_deployment.BucketDeployment(self, 'sample-files',
destination_bucket=logs_bucket,
sources=[s3_deployment.Source.asset('./log-samples')],
content_type='application/json',
retain_on_delete=False
)
2.3.4 创建aws glue
的cfnDatabase
# creating the Glue Database to serve as our Data Catalog
glue_database = glue.CfnDatabase(self, 'log-database',
catalog_id=self.account,
database_input=glue.CfnDatabase.DatabaseInputProperty(
name="log-database"))
2.3.5 创建aws glue crawler
需要的权限Role
# creating the permissions for the crawler to enrich our Data Catalog
glue_crawler_role = iam.Role(self, 'glue-crawler-role',
role_name='glue-crawler-role',
assumed_by=iam.ServicePrincipal(service='glue.amazonaws.com'),
managed_policies=[
# Remember to apply the Least Privilege Principle and provide only the permissions needed to the crawler
iam.ManagedPolicy.from_managed_policy_arn(self, 'AmazonS3FullAccess',
'arn:aws:iam::aws:policy/AmazonS3FullAccess'),
iam.ManagedPolicy.from_managed_policy_arn(self, 'AWSGlueServiceRole',
'arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole')
])
这里需要两个policy
,AmazonS3FullAccess
和AWSGlueServiceRole
。
2.3.6 创建aws glue crawler
# creating the Glue Crawler that will automatically populate our Data Catalog. Don't forget to run the crawler
# as soon as the deployment finishes, otherwise our Data Catalog will be empty. Check out the README for more instructions
glue.CfnCrawler(self, 'logs-crawler',
name='logs-crawler',
database_name=glue_database.database_input.name,
role=glue_crawler_role.role_name,
targets={
"s3Targets": [
{"path": f's3://{logs_bucket.bucket_name}/products'},
{"path": f's3://{logs_bucket.bucket_name}/users'}
]
})
这里,aws glue crawler
执行ETL Extract Transform Load
,将S3 bucket
里面的products
和users
的数据文件,经过转换将json
数据文件load
到glue database
。
2.3.7 创建aws athena work group
# creating the Athena Workgroup to store our queries
work_group = athena.CfnWorkGroup(self, 'log-auditing-work-group',
name='log-auditing',
work_group_configuration=athena.CfnWorkGroup.WorkGroupConfigurationProperty(
result_configuration=athena.CfnWorkGroup.ResultConfigurationProperty(
output_location=f"s3://{query_output_bucket.bucket_name}",
encryption_configuration=athena.CfnWorkGroup.EncryptionConfigurationProperty(
encryption_option="SSE_S3"
))))
aws athena
通过work group
进行管理,创建了workgroup
之后,在里面继续创建query
。
2.3.8 创建aws athena query
# creating an example query to fetch all product events by date
product_events_by_date_query = athena.CfnNamedQuery(self, 'product-events-by-date-query',
database=glue_database.database_input.name,
work_group=work_group.name,
name="product-events-by-date",
query_string="SELECT * FROM \"log-database\".\"products\" WHERE \"date\" = '2024-01-19'")
# creating an example query to fetch all user events by date
user_events_by_date_query = athena.CfnNamedQuery(self, 'user-events-by-date-query',
database=glue_database.database_input.name,
work_group=work_group.name,
name="user-events-by-date",
query_string="SELECT * FROM \"log-database\".\"users\" WHERE \"date\" = '2024-01-22'")
# creating an example query to fetch all events by the user ID
all_events_by_userid_query = athena.CfnNamedQuery(self, 'all-events-by-userId-query',
database=glue_database.database_input.name,
work_group=work_group.name,
name="all-events-by-userId",
query_string="SELECT * FROM (\n"
" SELECT transactionid, userid, username, domain, datetime, action FROM \"log-database\".\"products\" \n"
"UNION \n"
" SELECT transactionid, userid, username, domain, datetime, action FROM \"log-database\".\"users\" \n"
") WHERE \"userid\" = '123'")
2.3.9 调整执行顺序
# adjusting the resource creation order
product_events_by_date_query.add_dependency(work_group)
user_events_by_date_query.add_dependency(work_group)
all_events_by_userid_query.add_dependency(work_group)
2.4 开始执行aws cdk for athena
2.4.1 执行部署
python -m venv .venv
source .venv/Scripts/activate # windows platform
pip install -r requirements.txt
cdk synth
cdk --require-approval never deploy
2.4.2 执行crawler
爬虫
默认crawler
是不启动的,需要run
起来。
正常执行完毕。数据都由S3 bucket
的json
文件,经过ETL
,进入到aws glue database
里面了。
2.4.3 查看aws athena
的queries
AWS Athena
> 查询编辑器 > 已保存的查询 > 工作组 > log auditing