今の急速に進化するデータ主導の世界では、リアルタイムのデータ処理はさまざまなビジネス分野で不可欠です。
システムパフォーマンスの監視から顧客行動の分析まで、リアルタイムでデータを処理する能力は、タイムリーな意思決定のための貴重な洞察を提供します。
ただし、見落とされがちな重要な側面の一つは、リアルタイムのデータ処理パイプラインで障害を効果的に管理することです。
この記事では、堅牢な再試行戦略を実装することの重要性について説明したいと思います。AWS ECS App Mesh を活用し、障害を効果的に処理する方法を示したいと思います。
AWS ECS App Mesh リトライ戦略
Amazon Elastic Container Service (ECS) は、完全に管理されたコンテナオーケストレーションサービスであり、クラスター上で Docker コンテナを実行、停止、および管理することを可能にします。これにより、クラスター管理インフラストラクチャをインストール、運用、およびスケールする必要がなくなります。
AWS App Mesh は、分散アプリケーションアーキテクチャ内のサービス間通信を管理するための専門的なインフラストラクチャレイヤーであるサービスメッシュのカテゴリに属します。
基本的に、AWS App Mesh は、サービスディスカバリー、負荷分散、暗号化、認証、および可観測性などの機能を提供することにより、アプリケーションのネットワーキング面を簡素化します。
要するに、AWS App Mesh はマイクロサービス間の通信を合理化し、開発者がネットワーク設定を心配することなくアプリケーションロジックの構築に集中できるようにします。
このアプローチは、最新の分散システムの信頼性、セキュリティ、および可観測性を向上させます。他の一般的なサービスメッシュの実装には、Istio や Linkerd などがあります。
App Mesh は、サービスディスカバリー、トラフィック管理、可観測性などの機能を提供する一方で、再試行ポリシーによる障害対応の能力がデータの整合性とシステムの信頼性を確保するために重要です。
リトライ戦略の重要性
リアルタイムデータ処理では、ネットワークの問題、サービスの中断、または一時的なエラーにより障害が避けられないことがあります。
そのため、効果的なリトライ戦略を実装することが重要です。以下の点を考慮してください:
-
リトライ回数の決定: リトライの回数は、データの重要性、一時的な障害の発生確率、下流プロセスへの影響などに基づいて決定します。データの確実な配信を確保しつつ、無限のリトライによるリソースの浪費を防ぐためのバランスを見つけることが重要です。
-
指数バックオフ: 指数バックオフ戦略を採用することで、高負荷時に下流システムの過負荷を防ぎます。リトライ間隔を徐々に増やすことで、さらなる障害の発生を減少させることができます。
-
デッドレターキュー(DLQ): DLQを実装することで、失敗したメッセージを捕捉し、さらなる分析や手動介入が可能になります。DLQで特定された問題に迅速に対応するため、堅牢な監視とアラート機構を設定することが重要です。
App Mesh リトライポリシーのテスト
リアルタイムデータ処理パイプラインで障害シナリオをシミュレートして、App Meshのリトライポリシーをテストしてみましょう。
Terraformを使用して、データインジェスチョンサービスとデータプロセッシングサービスの2つのサービスを設定します。
-
データインジェスチョンサービス: このサービスは外部からデータを受信し、それを処理パイプラインに転送します。
-
データプロセッシングサービス: このサービスは、リアルタイムで受信したデータを分析し、異常検出や集約などのタスクを実行します。
Terraform の設定
Terraform を使用して、データインジェスチョンサービスとデータプロセッシングサービスのインフラストラクチャを定義し、App Mesh の設定を行います。
詳細な Terraform 構成については省略しますが、以下に主要なコンポーネントの概要を示します。
- App Mesh
App Meshの核心コンポーネントから始めましょう。
- サービスメッシュ: App Mesh全体を構成するサービスの論理的な境界です。
resource "aws_appmesh_mesh" "app-mesh" {
name = "${var.env}-${var.project}-app-mesh"
spec {
egress_filter {
type = "DROP_ALL" # メッシュ内の他のリソースへの仮想ノードからの出口のみを許可
}
}
}
- バーチャルサービス: メッシュ内で実行されているデータインジェスチョンサービスとデータプロセッシングサービスの抽象的な表現です。
# data-processing バーチャルサービス
resource "aws_appmesh_virtual_service" "data-processing-service" {
name = "${local.services.data-processing}.${var.env}.${var.internal_domain}"
mesh_name = aws_appmesh_mesh.app-mesh.id
spec {
provider {
virtual_router {
virtual_router_name = aws_appmesh_virtual_router.data-processing-service.name
}
}
}
}
# data-ingestion バーチャルサービス
# ...
バーチャルルーターは、バーチャルサービスのトラフィックを処理する責任があります。後で設定で定義され、バーチャルサービスのルーティングルールやリトライポリシーを持ちます。
- バーチャルノード: 抽象化されたバーチャルサービスの背後にある具体的な実装です。それぞれのバーチャルノードは特定のECSサービスを指し示し、実際のコードが実行されます。
# data-ingestion バーチャルノード
resource "aws_appmesh_virtual_node" "data-ingestion-service" {
name = "${var.env}-${var.project}-data-ingestion-service"
mesh_name = aws_appmesh_mesh.app-mesh.id
spec {
backend {
virtual_service {
virtual_service_name = aws_appmesh_virtual_service.data-processing-service.name
}
}
listener {
port_mapping {
port = var.ecs_services["data-ingestion"].app_port
protocol = "http"
}
timeout {
http {
per_request {
value = var.ecs_services["data-ingestion"].app_mesh_timeout.value
unit = var.ecs_services["data-ingestion"].app_mesh_timeout.unit
}
}
}
}
service_discovery {
aws_cloud_map {
service_name = aws_appmesh_virtual_service.data-processing-service.name
namespace_name = aws_service_discovery_private_dns_namespace.internal.name
}
}
logging {
access_log {
file {
path = "/dev/stdout"
}
}
}
}
}
# data-processing バーチャルノード
# ...
この例では、データプロセッシングサービスとデータインジェスチョンサービスのためのバーチャルノードを定義しました。
データインジェスチョンのバーチャルノードでは、データプロセッシングのバーチャルサービスをバックエンドとして指定し、そのバーチャルノードが送信する出口トラフィックの先としています。
また、バーチャルノードのリスナーも指定しました。これにより、受信トラフィックのポートとプロトコルが定義されています。
さらに、このバーチャルノードのサービスディスカバリーメカニズムも指定しました。この場合、AWS Cloud Map が使用されています。
- ルート: サービス間のトラフィックの流れ方を指定します。サービス名、属性、または重み付け分散などの基準に基づいてルーティングルールを定義できます。
# data-processing サービス App Mesh バーチャルルータ
resource "aws_appmesh_virtual_router" "data-processing-service" {
name = "${var.env}-${var.project}-data-processing-service-virtual-router"
mesh_name = aws_appmesh_mesh.app-mesh.id
spec {
listener {
port_mapping {
port = var.ecs_services["data-processing"].app_port
protocol = "http"
}
}
}
}
# data-processing サービス App Mesh ルート
resource "aws_appmesh_route" "data-processing-service" {
name = "${var.env}-${var.project}-data-processing-service"
mesh_name = aws_appmesh_mesh.app-mesh.id
virtual_router_name = aws_appmesh_virtual_router.data-processing-service.name
spec {
http_route {
match {
prefix = "/"
}
retry_policy {
http_retry_events = [
"gateway-error",
]
max_retries = 12
per_retry_timeout {
unit = "s"
value = 5
}
tcp_retry_events = [
"connection-error",
]
}
action {
weighted_target {
virtual_node = aws_appmesh_virtual_node.data-processing-service.name
weight = 1
}
}
}
priority = 1
}
}
# data-ingestion サービス App Mesh バーチャルルータ
# ...
# data-ingestion サービス App Mesh ルート
# ...
上記のルートは、すべての受信 HTTP トラフィックを一致させ、リトライを行いながらデータプロセッシングサービスのバーチャルノードにルーティングします。
リトライポリシーでは、ゲートウェイエラーと接続エラー時に最大12回のリトライを行い、1回のリトライのタイムアウトを5秒に設定します。
アクションで指定された重み付きターゲットは、トラフィックをルーティングするバーチャルノードを示します。ここでは、重みを1に設定しており、すべてのトラフィックをデータプロセッシングサービスのバーチャルノードにルーティングすることを意味します。
バーチャルサービスを定義する際に、バーチャルノードをバーチャルサービスのプロバイダとして定義し、バーチャルルーターを指定しない場合、ルーティングルールを細かく制御することができないことに注意してください。
# data-processing バーチャルサービス
resource "aws_appmesh_virtual_service" "data-processing-service" {
name = "${local.services.data-processing}.${var.env}.${var.internal_domain}"
mesh_name = aws_appmesh_mesh.app-mesh.id
spec {
provider {
virtual_node {
virtual_node_name = aws_appmesh_virtual_node.data-processing-service.name
}
}
}
}
- Cloud Map サービスディスカバリー: これにより、サービスはカスタム DNS名を使用してお互いを発見し、通信することができます。
# Private DNS Namespace
resource "aws_service_discovery_private_dns_namespace" "internal" {
name = "${var.env}.${var.internal_domain}"
description = "${var.env}-${var.project}-private-dns-namespace"
vpc = var.aws_vpc-vpc-id
}
# data-processing
resource "aws_service_discovery_service" "data-processing" {
name = local.services.data-processing
dns_config {
namespace_id = aws_service_discovery_private_dns_namespace.internal.id
dns_records {
ttl = 10
type = "A"
}
routing_policy = "MULTIVALUE"
}
health_check_custom_config {
failure_threshold = 1
}
lifecycle {
create_before_destroy = true
}
}
- AWS ECS
これからデータインジェスチョンサービスとデータプロセッシングサービスのためのECSクラスター構成を定義します。
- ECS Cluster
# ECS クラスター
resource "aws_ecs_cluster" "ecs-cluster" {
name = "${var.env}-${var.project}-ecs-cluster"
setting {
name = "containerInsights"
value = "enabled"
}
}
# ECS クラスターキャパシティプロバイダー
resource "aws_ecs_cluster_capacity_providers" "ecs-cluster" {
cluster_name = aws_ecs_cluster.ecs-cluster.name
capacity_providers = ["FARGATE"]
default_capacity_provider_strategy {
weight = 1
capacity_provider = "FARGATE"
}
}
- ECS Services
# data-processing サービス
resource "aws_ecs_service" "data-processing-service" {
name = local.services.data-processing
cluster = aws_ecs_cluster.ecs-cluster.id
task_definition = aws_ecs_task_definition.ecs-data-processing-task.arn
launch_type = "FARGATE"
deployment_maximum_percent = var.ecs_services["data-processing"].deployment_maximum_percent
deployment_minimum_healthy_percent = var.ecs_services["data-processing"].deployment_minimum_healthy_percent
desired_count = var.ecs_services["data-processing"].desired_count
force_new_deployment = true
lifecycle {
ignore_changes = [desired_count, task_definition]
}
network_configuration {
subnets = var.aws_subnet-protected-ids
security_groups = [aws_security_group.ecs-service.id]
}
service_registries {
registry_arn = aws_service_discovery_service.data-processing.arn
}
}
# data-ingestion サービス
# ...
ECSクラスターを作成し、データプロセッシングサービスとデータインジェスチョンサービス用のECSサービスを定義しました。
- ECSタスク定義
# data-processing タスク定義書
resource "aws_ecs_task_definition" "ecs-data-processing-task" {
family = "${var.env}-${var.project}-data-processing-service"
requires_compatibilities = ["FARGATE"]
cpu = var.ecs_services["data-processing"].task_definition.cpu
memory = var.ecs_services["data-processing"].task_definition.memory
network_mode = "awsvpc"
execution_role_arn = aws_iam_role.task-exec.arn
task_role_arn = aws_iam_role.ecs-task.arn
container_definitions = jsonencode([
{
name = "data-processing-service"
image = "${aws_ecr_repository.data-processing-service.repository_url}:${var.app_version}"
essential = true
environment = [
{
name = "APP_PORT"
value = tostring(var.ecs_services["data-processing"].app_port)
}
]
portMappings = [
{
containerPort = var.ecs_services["data-processing"].app_port
protocol = "tcp"
}
]
healthCheck = {
command = [
"CMD-SHELL",
"curl -s http://localhost:${var.ecs_services["data-processing"].app_port}/health-check || exit 1"
]
interval = 20
retries = 5
startPeriod = 10
timeout = 5
}
depends_on = [
{
"containerName" : "envoy",
"condition" : "HEALTHY"
}
]
logConfiguration = {
logDriver = "awslogs"
options = {
awslogs-group = aws_cloudwatch_log_group.data-processing-service.name
awslogs-region = var.region
awslogs-stream-prefix = "awslogs-data-processing"
}
}
},
{
name = "envoy"
image = "840364872350.dkr.ecr.${data.aws_region.current.name}.amazonaws.com/aws-appmesh-envoy:v1.27.2.0-prod"
essential = true
cpu = var.ecs_services["data-processing"].task_definition.envoy_cpu
memory = var.ecs_services["data-processing"].task_definition.envoy_memory
environment = [
{
name = "APPMESH_RESOURCE_ARN",
value = aws_appmesh_virtual_node.data-processing-service.arn
},
{
name = "ENVOY_LOG_LEVEL",
value = "info"
},
{
name = "ENVOY_INITIAL_FETCH_TIMEOUT",
value = "30"
},
{
name = "ENABLE_ENVOY_XRAY_TRACING",
value = "1"
},
]
portMappings = [
{
protocol = "tcp",
containerPort = 9901
},
{
protocol = "tcp",
containerPort = 15000
},
{
protocol = "tcp",
containerPort = 15001
}
]
healthCheck = {
command = [
"CMD-SHELL",
"curl -s http://localhost:9901/server_info | grep state | grep -q LIVE"
],
interval = 5,
retries = 3,
startPeriod = 60,
timeout = 2
}
user = "1337"
logConfiguration = {
logDriver = "awslogs"
options = {
awslogs-group = aws_cloudwatch_log_group.data-processing-service.name
awslogs-region = var.region
awslogs-stream-prefix = "awslogs-envoy-data-processing"
}
}
},
{
name = "xray-daemon"
image = "public.ecr.aws/xray/aws-xray-daemon:latest"
essential = false
cpu = var.ecs_services["data-processing"].task_definition.xray_cpu
memoryReservation = var.ecs_services["data-processing"].task_definition.xray_memory
portMappings = [
{
hostPort = 2000
containerPort = 2000
protocol = "udp"
}
]
user = "1337"
logConfiguration = {
logDriver = "awslogs"
options = {
awslogs-group = aws_cloudwatch_log_group.data-processing-service.name
awslogs-region = var.region
awslogs-stream-prefix = "awslogs-xray-data-processing"
}
}
}
])
proxy_configuration {
type = "APPMESH"
container_name = "envoy"
properties = {
AppPorts = var.ecs_services["data-processing"].app_port
EgressIgnoredIPs = "169.254.170.2,169.254.169.254"
IgnoredUID = "1337"
ProxyEgressPort = 15001
ProxyIngressPort = 15000
EgressIgnoredPorts = 22
}
}
}
# data-ingestion タスク定義書
# ...
データプロセッシングおよびデータインジェスチョンサービスのタスク定義の詳細:
-
個々のコンテナ構成:
- 使用するDockerイメージ、環境変数、通信のためのポートマッピング、適切な動作を確認するためのヘルスチェック、分析のためのログ設定が指定されます。
-
App Meshの統合:
- Envoyサイドカープロキシを使用して、App Meshサービスメッシュを介しての受信および送信トラフィックをインターセプトおよびルーティングします。Envoyは高性能のデータプレーンコンポーネントとして機能し、メッシュ内でのネットワーク相互作用を管理します。詳細については、公式Envoyドキュメントをご参照ください。
-
分散トレーシング:
- 各種サービス間のリクエストフローを追跡するためのxray-daemonコンテナを含みます。これにより、AWS X-Rayを通じてシステムのパフォーマンスと潜在的な問題に関する貴重な洞察が得られます。
以下は、データインジェスチョンサービスおよびデータプロセッシングサービスのTerraform設定の概要です:
バックエンドサービス
次に、データインジェスチョンおよびデータプロセッシングサービスのバックエンドをGolangで実装していきましょう。結局のところ、Goは素晴らしい選択です!😄
- データインジェスチョンサービス
func main() {
// ルーターを作成
router := mux.NewRouter()
// ルートを定義
router.HandleFunc("/health-check", HealthCheckHandler).Methods("GET")
router.HandleFunc("/ev", ElectricVehicleHandler).Methods("POST")
// サーバーを起動
log.Fatal(http.ListenAndServe(":3000", router))
}
// ElectricVehicleHandler /ev エンドポイントを処理
func ElectricVehicleHandler(w http.ResponseWriter, r *http.Request) {
log.Printf("Request: %s %s", r.Method, r.URL.Path)
// リクエスト内容を読む
body, err := io.ReadAll(r.Body)
if err != nil {
log.Println(err)
http.Error(w, "Failed to read request body", http.StatusInternalServerError)
return
}
// JSON ペイロードをデコード
var payload ElectricVehiclePayload
err = json.Unmarshal(body, &payload)
if err != nil {
log.Println(err)
http.Error(w, "Failed to decode JSON payload", http.StatusBadRequest)
return
}
// 必要な前処理を行う
// ...
// データ処理サービスエンドポイントへのリクエストを作成
req, err := http.NewRequest("POST", dataProcessingEndpoint, bytes.NewBuffer(body))
if err != nil {
log.Println(fmt.Errorf("failed to create request to data processing service: %w", err))
http.Error(w, "Failed to POST to data processing service", http.StatusInternalServerError)
return
}
// 受信リクエストから送信リクエストにヘッダーを複製
req.Header = r.Header.Clone()
// データ処理サービスへのHTTP POSTリクエスト
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Println(fmt.Errorf("failed to create request to data processing service: %w", err))
http.Error(w, "Failed to POST to data processing service", http.StatusInternalServerError)
return
}
// 応答ステータスコードが200でないかどうかを確認
if resp.StatusCode != http.StatusOK {
msg := fmt.Sprintf("Data processing service returned non-200 status code: %d", resp.StatusCode)
log.Print(msg)
http.Error(w, msg, resp.StatusCode)
return
}
// データ処理サービスからの応答を現在の応答ライターにコピー
_, err = io.Copy(w, resp.Body)
if err != nil {
log.Println(err)
http.Error(w, "Failed to copy response", http.StatusInternalServerError)
return
}
}
データインジェスチョンサービスは、/ev
エンドポイント経由で受信するPOSTリクエストを待ち受けるシンプルなHTTPサーバーです。
このサービスは、外部からのデータ(例えば、電気自動車データ)を受信する役割を担っています。
この例では、ElectricVehicleHandler
関数が受信したEVデータを処理します。具体的には、JSONペイロードをデコードし、必要な前処理を実行した後、そのリクエストをデータプロセッシングサービスに転送します。
- データプロセッシングサービス
func main() {
// ルーターを作成
router := mux.NewRouter()
// ルートを定義
router.HandleFunc("/health-check", HealthCheckHandler).Methods("GET")
router.HandleFunc("/ev", ElectricVehicleDataProcessingHandler).Methods("POST")
// サーバーを起動
log.Fatal(http.ListenAndServe(":3000", router))
}
// ElectricVehicleDataProcessingHandler EVデータを扱う
func ElectricVehicleDataProcessingHandler(w http.ResponseWriter, r *http.Request) {
// リクエスト内容をログに記録
log.Printf("Request: %s %s", r.Method, r.URL.Path)
// x-503ヘッダーが設定されている場合は503を返
if value, ok := r.Header["X-503"]; ok {
log.Printf("X-503 header is set with values: %v", value)
w.WriteHeader(http.StatusServiceUnavailable)
_, err := w.Write([]byte("Data Processing Service will return 503 ==> called with x-503 header set."))
if err != nil {
log.Println(err)
}
return
}
// リクエスト内容を読む
body, err := io.ReadAll(r.Body)
if err != nil {
log.Println(err)
http.Error(w, "Failed to read request body", http.StatusInternalServerError)
return
}
// JSON ペイロードをデコード
var payload ElectricVehiclePayload
err = json.Unmarshal(body, &payload)
if err != nil {
log.Println(err)
http.Error(w, "Failed to decode JSON payload", http.StatusBadRequest)
return
}
// ペイロードを処理
// ...
// クライアントへの返答
w.WriteHeader(http.StatusOK)
_, err = w.Write([]byte("Payload processed successfully"))
if err != nil {
log.Println(err)
return
}
}
データプロセッシングサービスも、/ev
エンドポイント経由で受信するPOSTリクエストを待ち受けるシンプルなHTTPサーバーです。
障害シナリオをシミュレートするために、ElectricVehicleDataProcessingHandler
関数は、x-503ヘッダーが設定されている場合に503ステータスコードを返します。
このサービスは、リアルタイムで受信データを分析し、異常検出や集計などのタスクを実行する役割を担っています。
terraform apply
を実行し、バックエンドサービスを展開した後、ダッシュボードからサービスが実行され、健全であることを確認できます。
App Meshの設定が正しく行われていることも確認しましょう。
障害シナリオのシミュレーション
Data Processing サービスに x-503 ヘッダーを渡して、503 ステータスコードを返すことで、障害シナリオをシミュレートします。
AWS セッションマネージャーを使用して保護されたサブネットにアクセスし、まずはサービスへの呼び出しが Envoy プロキシを経由していることを確認しましょう。
サービスが健全であり、呼び出しがEnvoyプロキシを介して行われていることが確認できました。
-
server: envoy
: リクエストがEnvoyプロキシによって処理されていることを示します。 -
x-envoy-upstream-service-time: 1
: Envoyプロキシがアップストリームサービス(この場合はバックエンドサービス)と通信するのにかかる時間(ミリ秒単位)を示します。
次に、x-503ヘッダーを設定してデータインジェスチョンサービスを呼び出し、App Meshのリトライポリシーの振る舞いを観察します。
curl --location 'http://data-ingestion.dev.smn-app-mesh-ecs.internal:3000/ev' \
--header 'Content-Type: application/json' \
--header 'x-503: true' \
--data '{
"vehicle_id": "EV-001",
"timestamp": "2024-02-15T10:30:00Z",
"location": {
"latitude": 37.7749,
"longitude": -122.4194
},
"battery": {
"percentage": 75,
"voltage": 390,
"temperature": 25.3
},
"speed": 60,
"odometer": 12500
}
'
- データインジェスチョンサービスのログ:
- データプロセッシングサービスのログ:
1回のリクエストをデータインジェスチョンサービスに送信したところ、データプロセッシングサービスが503ステータスコードを返したため、リクエストはリトライポリシーに従って12回リトライされました。
この場合、データプロセッシングサービスはx-503ヘッダーが設定されていると503ステータスコードを返すように実装されているため、最終的に12回のリトライ後にリクエストは失敗しました。
実際のシナリオでは、データプロセッシングサービスは一時的な障害から回復し、リクエストは最終的に正常に処理されるでしょう。
App Meshのリトライポリシーがない場合、データプロセッシングサービスへのリクエストは失敗し、データインジェスチョンサービスはクライアントにエラーを返すことになります。
App Meshのリトライポリシーがない場合、データインジェスチョンサービスのGolangコードでHTTPリトライメカニズムを実装する必要がありました。App Meshのリトライポリシーを使用することで、より堅牢で集中管理されたアプローチが提供され、開発者はネットワーク設定ではなくアプリケーションロジックに集中することができます。
終わりに
AWS ECS App Meshと効果的なリトライポリシーを組み合わせることは、マイクロサービス間のリアルタイムデータ通信の耐久性と信頼性を確保するために不可欠です。これにより、組織は障害の影響を軽減し、重要なビジネスプロセスにおけるデータの整合性を維持することができます。