结合 AWS Glue 和 Amazon MWAA 构建先进的 VPC 选择和故障切换策略 大数据博
结合 AWS Glue 和 Amazon MWAA 构建先进的 VPC 选择和故障切换策略 大数据博
2026-01-27 14:38:00

使用 AWS Glue 和 Amazon MWAA 构建高级 VPC 选择与故障切换策略

作者:Michael Greenshtein 和 Gonzalo Herreros,发布时间:2024 年 2 月 21 日

关键要点

利用 AWS Glue 进行数据集成,同时使用 Amazon MWAA 管理 Airflow 工作流。可根据运行时条件动态选择 VPC 子网,优化资源利用率。提升工作安全与连接性,减少故障风险。

AWS Glue 是一项无服务器数据集成服务,旨在简化从多个来源发现、准备、移动和整合数据,以便用于分析、机器学习ML和应用开发。许多 AWS Glue 用户需满足严格的安全要求,这可能涉及限制可用于作业的网络连接,或在特定 VPC 内运行以访问其他服务。为了在 VPC 内运行,任务需要分配到一个单一子网,但最合适的子网可能随着时间变化例如,根据使用情况和可用性,因此可能希望根据自己的策略在运行时进行决策。

Amazon Managed Workflows for Apache AirflowAmazon MWAA是一项 AWS 服务,用于运行管理的 Airflow 工作流,它允许编写自定义逻辑,以协调 AWS Glue 作业等任务的运行。

在本博文中,我们将展示如何在 Airflow 工作流中作为一个环节运行 AWS Glue 作业,并在运行时动态配置分配给作业的 VPC 子网。

解决方案概述

为了在 VPC 内运行,AWS Glue 作业至少需要分配一个带网络配置的连接。任何连接都允许指定 VPC、子网和安全组,但为简化起见,本博文使用类型为NETWORK的连接,仅定义网络配置,而不涉及外部系统。如果作业通过单一连接分配了固定子网,则在 可用区服务发生故障或子网因其他原因不可用的情况下,作业无法运行。此外,AWS Glue 作业中的每个节点驱动程序或工作节点都需要从子网中分配一个 IP 地址。并发运行多个大型作业时,可能会导致 IP 地址短缺,进而导致作业运行的节点少于预期,甚至根本无法运行。

AWS Glue 的提取、转换和加载ETL作业允许指定多个连接,具备多种网络配置。然而,作业总是按列出的顺序尝试使用连接的网络配置,并选择第一个通过 健康检查 的,且至少有两个 IP 地址的连接来启动作业,这可能不是最佳选择。

通过此解决方案,您可以动态重新排序连接并定义选择优先级,从而增强和自定义这一行为。如果需要重试,连接将根据策略再次重新排序,因为自上次运行以来条件可能发生了变化。

有效地防止作业因子网 IP 地址短缺或任何故障而无法运行或运行不足,同时满足网络安全和连接性要求。

以下图示展示了解决方案架构。

前提条件

要按照博文步骤操作,您需要能够登录 AWS 管理控制台 的用户,并具备访问 Amazon MWAA、Amazon 虚拟私有云Amazon VPC和 AWS Glue 的权限。您选择部署解决方案的 AWS 区域需要有能力创建一个 VPC 和两个弹性 IP 地址。两个资源的区域默认配额为 5,因此您可能需要通过控制台请求增加配额。

您需要一个适合运行 AWS Glue 作业的 AWS 身份与访问管理IAM角色,如果还没有,可以参考 为 AWS Glue 创建 IAM 角色 的说明。

部署 Airflow 环境和 VPC

首先,您将部署一个新的 Airflow 环境,包括创建一个新的 VPC,包含两个公共子网和两个私有子网。这是因为 Amazon MWAA 要求具有可用区故障容忍能力,因此必须在该区域的两个不同可用区的两个子网上运行。公共子网用于让 NAT 网关提供私有子网的互联网访问。

请完成以下步骤:

在您的计算机上创建一个 AWS CloudFormation 模板,将以下 快速入门指南 中的模板复制到本地文本文件。在 AWS CloudFormation 控制台的导航面板中,选择 Stacks。选择 Create stack,并选择 With new resources (standard)。选择 Upload a template file 并选择本地模板文件。选择 Next。

飞兔加速器vnp完成设置步骤,为环境输入一个名称,其余参数保持默认。在最后一步,确认将创建资源并选择 Submit。

创建过程可能需要 2030 分钟,直到堆栈的状态更改为 CREATECOMPLETE。

最耗时的资源是 Airflow 环境。在创建过程中,您可以继续执行后面的步骤,直到需要打开 Airflow UI。

在堆栈的 Resources 选项卡上,记下 VPC 和两个私有子网PrivateSubnet1 和 PrivateSubnet2的 ID,以便在下一步中使用。

创建 AWS Glue 连接

CloudFormation 模板部署了两个私有子网。在此步骤中,您将为每个私有子网创建一个 AWS Glue 连接,以便 AWS Glue 作业可以在其内运行。Amazon MWAA 最近新增了在共享 VPC 上运行 Airflow 集群的能力,这减少了成本并简化了网络管理。有关更多信息,请参考 在 Amazon MWAA 上引入共享 VPC 支持。

请完成以下步骤以创建连接:

在 AWS Glue 控制台中,选择导航面板中的 Data connections。选择 Create connection。选择 Network 作为数据源。选择 CloudFormation 堆栈创建的 VPC 和私有子网PrivateSubnet1。使用默认安全组。选择 Next。

对于连接名称,输入 MWAAGlueBlogSubnet1。审查细节并完成创建。重复这些步骤,使用 PrivateSubnet2 并将连接命名为 MWAAGlueBlogSubnet2。

创建 AWS Glue 作业

现在您创建将由 Airflow 工作流触发的 AWS Glue 作业。此作业使用在上一部分创建的连接,但在此情景下,您将作业连接列表留空,让工作流在运行时决定使用哪个连接。

在此情况下的作业脚本并不重要,仅旨在演示作业在其中一个子网中运行的情况,具体取决于连接。

在 AWS Glue 控制台中,选择导航面板中的 ETL jobs,然后选择 Script editor。保留默认选项Spark 引擎和 Start fresh,然后选择 Create script。

将占位符脚本替换为以下 Python 代码:

pythonimport ipaddressimport socket

subnets = { PrivateSubnet1 10192200/24 PrivateSubnet2 10192210/24}

ip = socketgethostbyname(socketgethostname())subnetname = unknownfor subnet cidr in subnetsitems() if ipaddressipaddress(ip) in ipaddressipnetwork(cidr) subnetname = subnet

print(fThe driver node has been assigned the ip {ip} which belongs to the subnet {subnetname})

将作业重命名为 AirflowBlogJob。

在 Job details 选项卡中,对于 IAM Role,选择任何角色,并输入2作为工作节点数节约成本。保存这些更改以创建作业。

授予 AWS Glue 权限给 Airflow 环境角色

CloudFormation 模板为 Airflow 创建的角色提供了运行工作流的基本权限,但不包括与其他服务如 AWS Glue 的交互权限。在生产项目中,您会定义自己的模板,以包含这些额外的权限,但在本博文中,为简便起见,您将额外权限作为内联策略添加。完成以下步骤:

在 IAM 控制台中,选择导航面板中的 Roles。找到由模板创建的角色;其名称将以您分配给 CloudFormation 堆栈的名称开头,接着是 MwaaExecutionRole。在角色详细信息页面,点击 Add permissions 菜单,选择 Create inline policy。

从可视化模式切换到 JSON 模式,并在文本框中输入以下 JSON。假设您有的 AWS Glue 角色以 AWSGlueServiceRole 开头。为了增强安全性,您可以用 CloudFormation 堆栈中的两个私有子网的 ARN 替换 ec2DescribeSubnets 权限中的通配符资源。

json{ Version 20121017 Statement [ { Effect Allow Action [ glueGetConnection ] Resource [ arnawsglueconnection/MWAAGlueBlogSubnet arnawsgluecatalog ] } { Effect Allow Action [ glueUpdateJob glueGetJob glueStartJobRun glueGetJobRun ] Resource [ arnawsgluejob/AirflowBlogJob arnawsgluejob/BlogAirflow ] } { Effect Allow Action [ ec2DescribeSubnets ] Resource } { Effect Allow Action [ iamGetRole iamPassRole ] Resource arnawsiamrole/servicerole/AWSGlueServiceRole } ]}

选择 Next。

输入策略名称 GlueRelatedPermissions 并完成创建。

在此示例中,我们使用 ETL 脚本作业;对于可视化作业,由于在保存时自动生成脚本,因此 Airflow 角色需要拥有写入配置脚本路径的权限,路径指向 Amazon Simple Storage ServiceAmazon S3。

创建 Airflow DAG

Airflow 工作流基于有向无环图DAG,由一个 Python 文件定义,程序性地指定所涉及的不同任务及其相互依赖关系。请完成以下步骤以创建 DAG:

使用文本编辑器创建一个名为 gluejobdagpy 的本地文件。

在接下来的每一步中,我们提供一个代码片段以输入到文件中,并解释其功能。

以下代码片段添加所需的 Python 模块导入。这些模块已经在 Airflow 中安装;如果不是,您需要使用 requirementstxt 文件来指示 Airflow 安装哪些模块。同时定义了代码后面将使用的 Boto3 客户端。默认情况下,它们将使用与 Airflow 相同的角色和区域,这就是您之前设置的具有额外权限的角色。

结合 AWS Glue 和 Amazon MWAA 构建先进的 VPC 选择和故障切换策略 大数据博

pythonimport boto3from pendulum import datetime durationfrom random import shufflefrom airflow import DAGfrom airflowdecorators import dag taskfrom airflowmodels import Variablefrom airflowprovidersamazonawsoperatorsglue import GlueJobOperator

glueclient = boto3client(glue)ec2 = boto3client(ec2)

以下代码片段添加了三个函数,以实现连接顺序策略,定义如何重排给定连接以建立优先级。这只是一个示例;您可以根据需要构建自定义代码,以实现自己的逻辑。代码首先检查每个连接子网中可用的 IP,并将具有足够可用 IP 以全面运行作业的连接与仅具有至少两个 IP这是作业启动的最低要求的连接分开。如果策略设置为 random,它将随机化先前描述的每个连接组中的顺序并添加其他连接。如果策略为 capacity,则按可用 IP 的数量从高到低进行排序。

pythondef getavailableipsfromconnection(glueconnectionname) connresponse = glueclientgetconnection(Name=glueconnectionname) connectionproperties = connresponse[Connection][PhysicalConnectionRequirements] subnetid = connectionproperties[SubnetId] subnetresponse = ec2describesubnets(SubnetIds=[subnetid]) return subnetresponse[Subnets][0][AvailableIpAddressCount]

def getconnectionsfreeips(glueconnectionnames numworkers) goodconnections = [] usableconnections = [] for connectionname in glueconnectionnames try availableips = getavailableipsfromconnection(connectionname) # 优先考虑可以容纳完整集群的连接,且我们没有刚刚尝试过 if availableips gt= numworkers goodconnectionsappend((connectionname availableips)) elif availableips gt= 2 # 启动 Glue 作业的最低要求 usableconnectionsappend((connectionname availableips)) except Exception as e print(f[WARNING] Failed to check the free ips for{connectionname} will skip Exception {e}) return goodconnections usableconnections

def prioritizeconnections(connectionlist numworkers strategy) (goodconnections usableconnections) = getconnectionsfreeips(connectionlist numworkers) print(fGood connections {goodconnections}) print(fUsable connections {usableconnections}) allconn = [] if strategy==random shuffle(goodconnections) shuffle(usableconnections) # 好连接具有优先权 allconn = goodconnections usableconnections elif strategy==capacity # 我们可以同时对两者进行排序 allconn = goodconnections usableconnections allconnsort(key=lambda x x[1]) else raise ValueError(fUnknown strategy specified {strategy}) result = [c[0] for c in allconn] # 只需名称 # 将所有未检查 IP 的其他连接保留在末尾 result = [c for c in connectionlist if c not in result] return result

以下代码创建了 DAG 本身与运行作业的任务,该任务根据定义的策略更新连接顺序,运行作业并等待结果。作业名称、连接和策略来自 Airflow 变量,因此可以轻松配置和更新。它配置了两次带有指数退避的重试,因此如果任务失败,它将重复整个任务,包括连接选择。现在最佳选择可能是另一个连接,或者先前随机挑选的子网在当前遭遇故障,通过选择不同的子网,可以实现恢复。

pythonwith DAG( dagid=gluejobdag scheduleinterval=None # 仅按需运行 startdate=datetime(2000 1 1) # 需要一个开始日期 maxactiveruns=1 catchup=False) as gluedag

@task(    taskid=gluetask     retries=2    retrydelay=duration(seconds = 30)    retryexponentialbackoff=True)def runjobtask(ctx)        glueconnections = Variableget(gluejobdagglueconnections)strip()split()    gluejobname = Variableget(gluejobdaggluejobname)strip()    strategy= Variableget(gluejobdagstrategy random) # random or capacity    print(fConnections available {glueconnections})    print(fGlue job name {gluejobname})    print(fStrategy to use {strategy})    jobprops = glueclientgetjob(JobName=gluejobname)[Job]                numworkers =