文章目录
- aws(学习笔记第三十三课) 深入使用cdk
- 学习内容:
- 1. 使用`aws athena`
- 1.1 什么是`aws athena`
- 1.2 什么是`aws glue`
- 1.2 为什么`aws athena`和`aws glue`一起使用
- 2. 开始练习`aws athena`
- 2.1 代码链接
- 2.2 整体架构
- 2.3 代码解析
- 2.3.1 创建测试数据的`S3 bucket`
- 2.3.2 创建保存查询结果的`S3 bucket`
- 2.3.3 将示例的程序`json`数据文件同期到`S3 bucket`
- 2.3.4 创建`aws glue`的`cfnDatabase`
- 2.3.5 创建`aws glue crawler`需要的权限`Role`
- 2.3.6 创建`aws glue crawler`
- 2.3.7 创建`aws athena work group`
- 2.3.8 创建`aws athena query`
- 2.3.9 调整执行顺序
- 2.4 开始执行`aws cdk for athena`
- 2.4.1 执行部署
- 2.4.2 执行`crawler`爬虫
- 2.4.3 查看`aws athena`的`queries`
- 2.4.4 执行`aws athena`的`queries`
- 2.4.5 查看`aws athena`的`queries`执行结果
aws(学习笔记第三十三课) 深入使用cdk
- 使用
cdk生成athena以及aws glue crawler
学习内容:
- 使用
aws athena+aws glue crawler
1. 使用aws athena
1.1 什么是aws athena
aws athena是aws提供的数据分析service,可以使用SQL语言对S3上保存的数据进行分析。
managed service,所以不需要维护。- 基于
OpenSource的框架构筑 - 基于处理的数据量进行收费
- 对数据提供加密功能
注意 和RDB不能进行JOIN操作,所以只能提供对csv和json进行数据查询
1.2 什么是aws glue
aws glue是aws提供的managed ETL service。能够简单的进行分析数据的准备和load。table和schema关联的metadata能够作为aws glue catalog data进行保存。
1.2 为什么aws athena和aws glue一起使用
aws athena结合aws glue能够将aws glue作成的database或者schema,使用aws athena进行查询。
2. 开始练习aws athena
2.1 代码链接
代码链接aws-cdk-examples
2.2 整体架构

2.3 代码解析
2.3.1 创建测试数据的S3 bucket
# creating the buckets where the logs will be placed
logs_bucket = s3.Bucket(self, 'logs-bucket',
bucket_name=f"auditing-logs-{self.account}",
removal_policy=RemovalPolicy.DESTROY,
auto_delete_objects=True
)

2.3.2 创建保存查询结果的S3 bucket
# creating the bucket where the queries output will be placed
query_output_bucket = s3.Bucket(self, 'query-output-bucket',
bucket_name=f"auditing-analysis-output-{self.account}",
removal_policy=RemovalPolicy.DESTROY,
auto_delete_objects=True
)

2.3.3 将示例的程序json数据文件同期到S3 bucket
# uploading the log files to the bucket as examples
s3_deployment.BucketDeployment(self, 'sample-files',
destination_bucket=logs_bucket,
sources=[s3_deployment.Source.asset('./log-samples')],
content_type='application/json',
retain_on_delete=False
)

2.3.4 创建aws glue的cfnDatabase
# creating the Glue Database to serve as our Data Catalog
glue_database = glue.CfnDatabase(self, 'log-database',
catalog_id=self.account,
database_input=glue.CfnDatabase.DatabaseInputProperty(
name="log-database"))

2.3.5 创建aws glue crawler需要的权限Role
# creating the permissions for the crawler to enrich our Data Catalog
glue_crawler_role = iam.Role(self, 'glue-crawler-role',
role_name='glue-crawler-role',
assumed_by=iam.ServicePrincipal(service='glue.amazonaws.com'),
managed_policies=[
# Remember to apply the Least Privilege Principle and provide only the permissions needed to the crawler
iam.ManagedPolicy.from_managed_policy_arn(self, 'AmazonS3FullAccess',
'arn:aws:iam::aws:policy/AmazonS3FullAccess'),
iam.ManagedPolicy.from_managed_policy_arn(self, 'AWSGlueServiceRole',
'arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole')
])
这里需要两个policy,AmazonS3FullAccess和AWSGlueServiceRole。

2.3.6 创建aws glue crawler
# creating the Glue Crawler that will automatically populate our Data Catalog. Don't forget to run the crawler
# as soon as the deployment finishes, otherwise our Data Catalog will be empty. Check out the README for more instructions
glue.CfnCrawler(self, 'logs-crawler',
name='logs-crawler',
database_name=glue_database.database_input.name,
role=glue_crawler_role.role_name,
targets={
"s3Targets": [
{"path": f's3://{logs_bucket.bucket_name}/products'},
{"path": f's3://{logs_bucket.bucket_name}/users'}
]
})
这里,aws glue crawler执行ETL Extract Transform Load,将S3 bucket里面的products和users的数据文件,经过转换将json数据文件load到glue database。

2.3.7 创建aws athena work group
# creating the Athena Workgroup to store our queries
work_group = athena.CfnWorkGroup(self, 'log-auditing-work-group',
name='log-auditing',
work_group_configuration=athena.CfnWorkGroup.WorkGroupConfigurationProperty(
result_configuration=athena.CfnWorkGroup.ResultConfigurationProperty(
output_location=f"s3://{query_output_bucket.bucket_name}",
encryption_configuration=athena.CfnWorkGroup.EncryptionConfigurationProperty(
encryption_option="SSE_S3"
))))

aws athena通过work group进行管理,创建了workgroup之后,在里面继续创建query。
2.3.8 创建aws athena query
# creating an example query to fetch all product events by date
product_events_by_date_query = athena.CfnNamedQuery(self, 'product-events-by-date-query',
database=glue_database.database_input.name,
work_group=work_group.name,
name="product-events-by-date",
query_string="SELECT * FROM \"log-database\".\"products\" WHERE \"date\" = '2024-01-19'")
# creating an example query to fetch all user events by date
user_events_by_date_query = athena.CfnNamedQuery(self, 'user-events-by-date-query',
database=glue_database.database_input.name,
work_group=work_group.name,
name="user-events-by-date",
query_string="SELECT * FROM \"log-database\".\"users\" WHERE \"date\" = '2024-01-22'")
# creating an example query to fetch all events by the user ID
all_events_by_userid_query = athena.CfnNamedQuery(self, 'all-events-by-userId-query',
database=glue_database.database_input.name,
work_group=work_group.name,
name="all-events-by-userId",
query_string="SELECT * FROM (\n"
" SELECT transactionid, userid, username, domain, datetime, action FROM \"log-database\".\"products\" \n"
"UNION \n"
" SELECT transactionid, userid, username, domain, datetime, action FROM \"log-database\".\"users\" \n"
") WHERE \"userid\" = '123'")
2.3.9 调整执行顺序
# adjusting the resource creation order
product_events_by_date_query.add_dependency(work_group)
user_events_by_date_query.add_dependency(work_group)
all_events_by_userid_query.add_dependency(work_group)
2.4 开始执行aws cdk for athena
2.4.1 执行部署
python -m venv .venv
source .venv/Scripts/activate # windows platform
pip install -r requirements.txt
cdk synth
cdk --require-approval never deploy
2.4.2 执行crawler爬虫

默认crawler是不启动的,需要run起来。

正常执行完毕。数据都由S3 bucket的json文件,经过ETL,进入到aws glue database里面了。

2.4.3 查看aws athena的queries
AWS Athena > 查询编辑器 > 已保存的查询 > 工作组 > log auditing

2.4.4 执行aws athena的queries

2.4.5 查看aws athena的queries执行结果




















