作者:Michael Greenshtein 和 Gonzalo Herreros,发布时间:2024 年 2 月 21 日
AWS Glue 是一项无服务器数据集成服务,旨在简化从多个来源发现、准备、移动和整合数据,以便用于分析、机器学习ML和应用开发。许多 AWS Glue 用户需满足严格的安全要求,这可能涉及限制可用于作业的网络连接,或在特定 VPC 内运行以访问其他服务。为了在 VPC 内运行,任务需要分配到一个单一子网,但最合适的子网可能随着时间变化例如,根据使用情况和可用性,因此可能希望根据自己的策略在运行时进行决策。
Amazon Managed Workflows for Apache AirflowAmazon MWAA是一项 AWS 服务,用于运行管理的 Airflow 工作流,它允许编写自定义逻辑,以协调 AWS Glue 作业等任务的运行。
在本博文中,我们将展示如何在 Airflow 工作流中作为一个环节运行 AWS Glue 作业,并在运行时动态配置分配给作业的 VPC 子网。
为了在 VPC 内运行,AWS Glue 作业至少需要分配一个带网络配置的连接。任何连接都允许指定 VPC、子网和安全组,但为简化起见,本博文使用类型为NETWORK的连接,仅定义网络配置,而不涉及外部系统。如果作业通过单一连接分配了固定子网,则在 可用区服务发生故障或子网因其他原因不可用的情况下,作业无法运行。此外,AWS Glue 作业中的每个节点驱动程序或工作节点都需要从子网中分配一个 IP 地址。并发运行多个大型作业时,可能会导致 IP 地址短缺,进而导致作业运行的节点少于预期,甚至根本无法运行。
AWS Glue 的提取、转换和加载ETL作业允许指定多个连接,具备多种网络配置。然而,作业总是按列出的顺序尝试使用连接的网络配置,并选择第一个通过 健康检查 的,且至少有两个 IP 地址的连接来启动作业,这可能不是最佳选择。
通过此解决方案,您可以动态重新排序连接并定义选择优先级,从而增强和自定义这一行为。如果需要重试,连接将根据策略再次重新排序,因为自上次运行以来条件可能发生了变化。
有效地防止作业因子网 IP 地址短缺或任何故障而无法运行或运行不足,同时满足网络安全和连接性要求。
以下图示展示了解决方案架构。
要按照博文步骤操作,您需要能够登录 AWS 管理控制台 的用户,并具备访问 Amazon MWAA、Amazon 虚拟私有云Amazon VPC和 AWS Glue 的权限。您选择部署解决方案的 AWS 区域需要有能力创建一个 VPC 和两个弹性 IP 地址。两个资源的区域默认配额为 5,因此您可能需要通过控制台请求增加配额。
您需要一个适合运行 AWS Glue 作业的 AWS 身份与访问管理IAM角色,如果还没有,可以参考 为 AWS Glue 创建 IAM 角色 的说明。
首先,您将部署一个新的 Airflow 环境,包括创建一个新的 VPC,包含两个公共子网和两个私有子网。这是因为 Amazon MWAA 要求具有可用区故障容忍能力,因此必须在该区域的两个不同可用区的两个子网上运行。公共子网用于让 NAT 网关提供私有子网的互联网访问。
请完成以下步骤:
在您的计算机上创建一个 AWS CloudFormation 模板,将以下 快速入门指南 中的模板复制到本地文本文件。在 AWS CloudFormation 控制台的导航面板中,选择 Stacks。选择 Create stack,并选择 With new resources (standard)。选择 Upload a template file 并选择本地模板文件。选择 Next。 飞兔加速器vnp完成设置步骤,为环境输入一个名称,其余参数保持默认。在最后一步,确认将创建资源并选择 Submit。创建过程可能需要 2030 分钟,直到堆栈的状态更改为 CREATECOMPLETE。
最耗时的资源是 Airflow 环境。在创建过程中,您可以继续执行后面的步骤,直到需要打开 Airflow UI。
在堆栈的 Resources 选项卡上,记下 VPC 和两个私有子网PrivateSubnet1 和 PrivateSubnet2的 ID,以便在下一步中使用。CloudFormation 模板部署了两个私有子网。在此步骤中,您将为每个私有子网创建一个 AWS Glue 连接,以便 AWS Glue 作业可以在其内运行。Amazon MWAA 最近新增了在共享 VPC 上运行 Airflow 集群的能力,这减少了成本并简化了网络管理。有关更多信息,请参考 在 Amazon MWAA 上引入共享 VPC 支持。
请完成以下步骤以创建连接:
在 AWS Glue 控制台中,选择导航面板中的 Data connections。选择 Create connection。选择 Network 作为数据源。选择 CloudFormation 堆栈创建的 VPC 和私有子网PrivateSubnet1。使用默认安全组。选择 Next。 对于连接名称,输入 MWAAGlueBlogSubnet1。审查细节并完成创建。重复这些步骤,使用 PrivateSubnet2 并将连接命名为 MWAAGlueBlogSubnet2。现在您创建将由 Airflow 工作流触发的 AWS Glue 作业。此作业使用在上一部分创建的连接,但在此情景下,您将作业连接列表留空,让工作流在运行时决定使用哪个连接。
在此情况下的作业脚本并不重要,仅旨在演示作业在其中一个子网中运行的情况,具体取决于连接。
在 AWS Glue 控制台中,选择导航面板中的 ETL jobs,然后选择 Script editor。保留默认选项Spark 引擎和 Start fresh,然后选择 Create script。将占位符脚本替换为以下 Python 代码:
pythonimport ipaddressimport socket
subnets = { PrivateSubnet1 10192200/24 PrivateSubnet2 10192210/24}
ip = socketgethostbyname(socketgethostname())subnetname = unknownfor subnet cidr in subnetsitems() if ipaddressipaddress(ip) in ipaddressipnetwork(cidr) subnetname = subnet
print(fThe driver node has been assigned the ip {ip} which belongs to the subnet {subnetname})
将作业重命名为 AirflowBlogJob。
在 Job details 选项卡中,对于 IAM Role,选择任何角色,并输入2作为工作节点数节约成本。保存这些更改以创建作业。CloudFormation 模板为 Airflow 创建的角色提供了运行工作流的基本权限,但不包括与其他服务如 AWS Glue 的交互权限。在生产项目中,您会定义自己的模板,以包含这些额外的权限,但在本博文中,为简便起见,您将额外权限作为内联策略添加。完成以下步骤:
在 IAM 控制台中,选择导航面板中的 Roles。找到由模板创建的角色;其名称将以您分配给 CloudFormation 堆栈的名称开头,接着是 MwaaExecutionRole。在角色详细信息页面,点击 Add permissions 菜单,选择 Create inline policy。从可视化模式切换到 JSON 模式,并在文本框中输入以下 JSON。假设您有的 AWS Glue 角色以 AWSGlueServiceRole 开头。为了增强安全性,您可以用 CloudFormation 堆栈中的两个私有子网的 ARN 替换 ec2DescribeSubnets 权限中的通配符资源。
json{ Version 20121017 Statement [ { Effect Allow Action [ glueGetConnection ] Resource [ arnawsglueconnection/MWAAGlueBlogSubnet arnawsgluecatalog ] } { Effect Allow Action [ glueUpdateJob glueGetJob glueStartJobRun glueGetJobRun ] Resource [ arnawsgluejob/AirflowBlogJob arnawsgluejob/BlogAirflow ] } { Effect Allow Action [ ec2DescribeSubnets ] Resource } { Effect Allow Action [ iamGetRole iamPassRole ] Resource arnawsiamrole/servicerole/AWSGlueServiceRole } ]}
选择 Next。
输入策略名称 GlueRelatedPermissions 并完成创建。在此示例中,我们使用 ETL 脚本作业;对于可视化作业,由于在保存时自动生成脚本,因此 Airflow 角色需要拥有写入配置脚本路径的权限,路径指向 Amazon Simple Storage ServiceAmazon S3。
Airflow 工作流基于有向无环图DAG,由一个 Python 文件定义,程序性地指定所涉及的不同任务及其相互依赖关系。请完成以下步骤以创建 DAG:
使用文本编辑器创建一个名为 gluejobdagpy 的本地文件。在接下来的每一步中,我们提供一个代码片段以输入到文件中,并解释其功能。
以下代码片段添加所需的 Python 模块导入。这些模块已经在 Airflow 中安装;如果不是,您需要使用 requirementstxt 文件来指示 Airflow 安装哪些模块。同时定义了代码后面将使用的 Boto3 客户端。默认情况下,它们将使用与 Airflow 相同的角色和区域,这就是您之前设置的具有额外权限的角色。

pythonimport boto3from pendulum import datetime durationfrom random import shufflefrom airflow import DAGfrom airflowdecorators import dag taskfrom airflowmodels import Variablefrom airflowprovidersamazonawsoperatorsglue import GlueJobOperator
glueclient = boto3client(glue)ec2 = boto3client(ec2)
以下代码片段添加了三个函数,以实现连接顺序策略,定义如何重排给定连接以建立优先级。这只是一个示例;您可以根据需要构建自定义代码,以实现自己的逻辑。代码首先检查每个连接子网中可用的 IP,并将具有足够可用 IP 以全面运行作业的连接与仅具有至少两个 IP这是作业启动的最低要求的连接分开。如果策略设置为 random,它将随机化先前描述的每个连接组中的顺序并添加其他连接。如果策略为 capacity,则按可用 IP 的数量从高到低进行排序。
pythondef getavailableipsfromconnection(glueconnectionname) connresponse = glueclientgetconnection(Name=glueconnectionname) connectionproperties = connresponse[Connection][PhysicalConnectionRequirements] subnetid = connectionproperties[SubnetId] subnetresponse = ec2describesubnets(SubnetIds=[subnetid]) return subnetresponse[Subnets][0][AvailableIpAddressCount]
def getconnectionsfreeips(glueconnectionnames numworkers) goodconnections = [] usableconnections = [] for connectionname in glueconnectionnames try availableips = getavailableipsfromconnection(connectionname) # 优先考虑可以容纳完整集群的连接,且我们没有刚刚尝试过 if availableips gt= numworkers goodconnectionsappend((connectionname availableips)) elif availableips gt= 2 # 启动 Glue 作业的最低要求 usableconnectionsappend((connectionname availableips)) except Exception as e print(f[WARNING] Failed to check the free ips for{connectionname} will skip Exception {e}) return goodconnections usableconnections
def prioritizeconnections(connectionlist numworkers strategy) (goodconnections usableconnections) = getconnectionsfreeips(connectionlist numworkers) print(fGood connections {goodconnections}) print(fUsable connections {usableconnections}) allconn = [] if strategy==random shuffle(goodconnections) shuffle(usableconnections) # 好连接具有优先权 allconn = goodconnections usableconnections elif strategy==capacity # 我们可以同时对两者进行排序 allconn = goodconnections usableconnections allconnsort(key=lambda x x[1]) else raise ValueError(fUnknown strategy specified {strategy}) result = [c[0] for c in allconn] # 只需名称 # 将所有未检查 IP 的其他连接保留在末尾 result = [c for c in connectionlist if c not in result] return result
以下代码创建了 DAG 本身与运行作业的任务,该任务根据定义的策略更新连接顺序,运行作业并等待结果。作业名称、连接和策略来自 Airflow 变量,因此可以轻松配置和更新。它配置了两次带有指数退避的重试,因此如果任务失败,它将重复整个任务,包括连接选择。现在最佳选择可能是另一个连接,或者先前随机挑选的子网在当前遭遇故障,通过选择不同的子网,可以实现恢复。
pythonwith DAG( dagid=gluejobdag scheduleinterval=None # 仅按需运行 startdate=datetime(2000 1 1) # 需要一个开始日期 maxactiveruns=1 catchup=False) as gluedag
@task( taskid=gluetask retries=2 retrydelay=duration(seconds = 30) retryexponentialbackoff=True)def runjobtask(ctx) glueconnections = Variableget(gluejobdagglueconnections)strip()split() gluejobname = Variableget(gluejobdaggluejobname)strip() strategy= Variableget(gluejobdagstrategy random) # random or capacity print(fConnections available {glueconnections}) print(fGlue job name {gluejobname}) print(fStrategy to use {strategy}) jobprops = glueclientgetjob(JobName=gluejobname)[Job] numworkers =