在多云环境中使用 AWS DMS、Amazon MSK 和 Amazon Managed Servi


在多云环境中使用 AWS DMS、Amazon MSK 和 Amazon 管理服务 Apache Flink 流式传输数据变更

作者:Simran Singh Amandeep Bajwa Deevanshu Budhiraja Sasha Luthra日期:2024年8月12日类别: 高级 (300) Amazon 管理服务 Apache Flink Amazon 管理流式服务 Apache Kafka (Amazon MSK) AWS 数据库迁移服务 多云 技术教程永久链接

重点总结

多云环境的挑战:虽然在单一云服务供应商CSP上运行大多数工作负载可以充分发挥云的优势,但一些客户仍在多云环境中运营,这可能会导致使用数据进行实时分析的挑战。数据流架构:本文讨论了如何将其他云供应商的事务性数据库中的数据流更改实时传输到 AWS 的流数据解决方案。及其组成部分:展示了 AWS DMS、Amazon MSK 和 Amazon 管理服务 Apache Flink 如何在多云环境下工作,以从Google Cloud的Cloud SQL for MySQL数据库进行变更数据捕捉CDC。

当大多数工作负载都在单一云服务供应商CSP上时,客户才能充分实现云的好处;然而,有些客户发现自己处于多云环境中。如需了解可能需要多云环境的情况和关键考虑事项,请参阅 开发多云策略的最佳实践。

当工作负载及其相应的事务性数据库分布在多个云提供商之间时,可能会导致在几乎实时的基础上使用数据进行高级分析的挑战。

在本篇文章中,我们将讨论架构、方法和考虑因素,以将其他云提供商部署的事务性数据库中的数据更改流式传输到部署在 AWS 上的流数据解决方案。我们还展示了如何利用 AWS 数据库迁移服务 AWS DMS、Amazon 管理流式服务 Apache Kafka Amazon MSK和 Amazon 管理服务 Apache Flink 从 Google Cloud 的 Cloud SQL for MySQL 数据库 进行数据捕捉流式传输。此外,我们提供了补充的 Google Cloud 文档参考,以提供更多背景知识,并讨论在 Google Cloud 上设置的相关步骤。

选择架构组件的决策模型

假设您有一个在其他 CSP 上部署的电子商务应用程序。如 开发多云策略的最佳实践 中提到的,此场景可能发生在并购中,投资组合公司可能有自己的 CSP 策略。因此,工作负载可能分布在各种 CSP 上。与此同时,母公司或控股集团以及几家投资组合公司可能已经选择 AWS 作为主要的 CSP。

在这种情况下,当您需要将外部数据库中的 CDC 数据流式传输到部署在 AWS 中的流式解决方案时,您的架构可能如下图所示。

该架构具有以下关键组件:

事件源:您可以在应用程序的所有层捕获、丰富和分析实时事件,例如用户点击事件和后端 API 事件。在本篇文章中,我们专注于来自数据库层的变更数据事件。源连接器:如 MySQL 和 PostgreSQL 的数据库支持基于日志的 CDC,某些流数据库还本机支持将事件提交到流平台。根据您的源数据库,您可能需要一个连接器将变更数据流式传输到流式解决方案中。例如, Debezium 是一个开源的分布式 CDC 平台,您可以将其部署为 Kafka Connect 连接器或服务器,从源数据库中流式传输变更事件。或者,您可以使用 AWS DMS 从 Microsoft Azure 和 Google Cloud 中的第三方托管数据库服务流式传输变更数据,而无需部署和管理自定义连接器。请参阅 AWS DMS 的数据源 以获取支持的数据源信息。并非所有第三方托管数据库服务都支持使用 AWS DMS 进行 CDC。请参阅 数据迁移的源 以获取最新信息。数据流:如果您的用例是保持 AWS 数据库与外部数据库同步,AWS DMS 允许您直接创建并使用相应数据存储的目标端点。请参见 数据迁移的目标端点 以获取支持的目标列表。不过,如果您需要在将实时变更数据发送到目标例如事务性数据湖之前进行丰富、过滤或聚合,或者需要重播变更数据,则可以选择数据流服务,例如 Amazon Kinesis 数据流 或 Amazon MSK 作为 AWS DMS 的目标端点。流处理器:借助 Apache Flink 管理服务,您可以使用 Apache Flink 实时转换和分析流数据,并将应用程序与其他 AWS 服务集成。有关数据丰富模式及其实现的概述,请参考 在 Amazon 管理服务 Apache Flink 中常见的流数据丰富模式 和 实现 Apache Flink 实时数据丰富模式。目标:如前图所示的架构解锁了一系列用例,例如:实时分析:流式传输更改以启用实时报告、仪表板和基于实时变化数据的推荐。数据湖:创建低延迟的源到数据湖管道。Strangler Fig 模式:实施基于 CDC 的 Strangler Fig 模式,以迁移和重构应用程序到 AWS。

解决方案概述

在前述部分中,我们介绍了架构的关键组件。在本节中,我们将逐步执行该架构的实施过程。该解决方案仅展示了如何在 Flink Studio 笔记本中探索流数据的步骤;不过,您可以按照 Amazon 管理服务 Apache Flink 中常见的流数据丰富模式 和 实现 Apache Flink 实时数据丰富模式 的讨论进行扩展。我们首先列出先决条件和设置任务。然后,我们将引导您通过使用 AWS CloudFormation 部署解决方案。以下图显示了我们要部署的解决方案架构。

我们部署以下关键资源,如前图中编号所示:

AWS Secrets Manager 机密用于存储 Google Cloud MySQL 数据库的凭证和 SASL/SCRAM 凭证 用于 MSK 集群。一个 Amazon MSK 集群。一个 AWS DMS 复制实例。AWS 身份和访问管理 IAM角色,用于 AWS DMS 和 Apache Flink 管理服务。一个 Amazon 管理服务 Apache Flink Studio 笔记本。

先决条件

完成以下步骤以设置和配置本文描述的解决方案的先决条件:

创建 Cloud SQL for MySQL 实例,选择 MySQL 80 作为数据库版本。下载 SQL 文件 mysqlsampletablessql。从 Cloud Shell 连接到 Cloud SQL,并将下载的 SQL 文件上传到 Cloud Shell。

在 Cloud Shell 中运行以下语句以创建 salesdb 数据库并上传数据: sql source mysqlsampletablessql

选择 Launch Stack 以在 AWS 中创建 VPC、子网、路由表、互联网网关和 NAT 网关。

飞驰vnp加速器

选择 Next。

在多云环境中使用 AWS DMS、Amazon MSK 和 Amazon Managed Servi在 Stack name 中输入一个唯一名称。默认值是 MultiCloudCDCVPC。在 VPCCIDR 中输入 VPC 的起始 IP 和大小,使用 IPv4 CIDR 表示法。默认值为 10000/16。在 PublicSubnetCIDR 中输入子网的起始 IP 和大小,使用 IPv4 CIDR 表示法。默认值为 10000/24。在 PrivateSubnet1CIDR 中输入子网的起始 IP 和大小,使用 CIDR 表示法。默认值为 10010/24。在 PrivateSubnet2CIDR 中输入子网的起始 IP 和大小,使用 CIDR 表示法。默认值为 10020/24。

选择 Create Stack。等待堆栈完全部署后才能进入下一步。堆栈部署需要大约 5 分钟。

堆栈创建完成后,导航到 Outputs 标签,记下 NAT 网关的公有 IPv4 地址,以便在下一部分使用。

AWS 和 Google Cloud 环境之间的连接

您可以使用私有 IP 连接到 Cloud SQL for MySQL 实例。有关如何在 Google Cloud 和 AWS 之间创建 VPN 连接的说明,请参阅 创建 Google Cloud 和 AWS 之间的 VPN 连接。请注意以下配置:

在配置虚拟专用网关和 VPN 连接时使用上一步骤中创建的 VPC 的 VPC ID。更新与公共和私有子网相关联的路由表,以添加路由到您的虚拟专用网关,使用 Google Cloud 环境中使用的私有 IP 地址。

以下屏幕截图显示了私有路由表的详细信息,作为示例,其中 10100/16 是我们 Google Cloud 环境的 IPv4 CIDR 块。

在 Google Cloud 中运行 连接测试 以确认您可以从 AWS 网络访问数据库实例。以下屏幕截图显示了我们的连接测试结果,其中 10008 是来自 VPC 的 IPv4 CIDR 块的私有 IP 地址。

另外,如果您将 Cloud SQL 实例配置为拥有公有 IPv4 地址,且希望直接使用其公有 IP 地址进行连接,请将先前步骤中 NAT 网关的公有 IP 地址配置到数据库实例的 授权网络 设置中。

使用 AWS CloudFormation 部署变更数据流解决方案

要使用 AWS CloudFormation 部署解决方案,请完成以下步骤:

选择 Launch Stack

选择 Next。

在 Stack name 中输入一个唯一名称。默认值为 MultiCloudCDCStreaming。在 ExternalDBPassword 中输入 MySQL 数据库密码。在 ExternalDBPort 中输入数据库端口号。默认值为 3306。在 ExternalDBServerName 中输入数据库服务器名称或 IP 地址。根据您与 Google Cloud 的网络连接,这将是 MySQL 数据库的公有 IPv4 地址或私有 IPv4 地址。在 ExternalDBUsername 中输入 MySQL 数据库用户名。在 PrivateSubnetIds 中输入您创建的私有子网。在 VpcId 中选择您创建的 VPC。对于 CreateDMSVPCRole,如果您在 AWS 账户中尚未设置 dmsvpcrole,请输入 true。默认值为 false。AWS DMS 需要该角色才能使用 AWS DMS API。有关更多信息,请参见 创建与 AWS CLI 和 AWS DMS API 一起使用的 IAM 角色。确保仅在角色尚不存在时使用 true,否则部署将因不可恢复的错误失败。对于 CreateDMSCloudWatchRole,如果您在 AWS 账户中尚未设置 dmscloudwatchlogsrole,请输入 true。默认值为 false。AWS DMS 需要该角色才能使用 AWS DMS API。确保仅在角色尚不存在时使用 true,否则部署将因不可恢复的错误失败。

选中确认框并选择 Create stack。堆栈部署大约需要 30 分钟。

堆栈创建完成后,导航到 Outputs 标签,记下所创建资源的值。

打开 Amazon MSK 控制台,确保 MSK 集群处于 Active 状态,然后再进行下一部分。

设置并启动变更数据流

完成以下步骤以配置变更数据流:

在 Apache Flink 管理服务控制台中,选择导航窗格中的 Studio notebooks。

选择您通过 AWS CloudFormation 部署的笔记本即 MSFStudioAppName 的输出,并选择 Run。

等待状态显示为 Running,然后选择 Open in Apache Zeppelin,这会打开 [Apache Zeppelin UI](https//zeppelinapacheorg/docs/latest/quick