Tweet
Logo
    AWS Lambda と SQS を使って webhook によるスパイクに対応する
    AWS Lambda と SQS を使って webhook によるスパイクに対応する

    AWS Lambda と SQS を使って webhook によるスパイクに対応する

    • はじめに
    • 課題
    • Lambda と SQS でリクエストをスロットリングする
    • Terraform によるセットアップ
    • Lambda のソースコード
    • Lambda
    • SQS
    • API Gateway
    • 変数の設定
    • まとめ

    はじめに

    DROBE では様々な外部サービスを利用していますが、事前に設定しておく事で外部サービス側で特定のイベントが発生した際に DROBE 側に HTTP のリクエストを送ってくれる仕組みを多く利用しています。(こういった仕組みの事を一般的に webhook と呼びます)

    webhook のわかりやすい例としては、例えば SendGrid のような外部サービスでメールの配信を行っている場合に、実際にメールの配信結果がどうだったかをアプリケーションで確実に検知したいといった場合に、SendGrid での配信完了イベントをトリガーとした webhook を設定します。

    SendGrid の webhook を使ってメールの配信完了を Application で検知する例
    SendGrid の webhook を使ってメールの配信完了を Application で検知する例

    こういった仕組みを使う事で、アプリケーション側ではメールの送信をリクエストした事だけではなく、メールがしっかりと配信された事、もしくはエラーとなってしまった事などを検知し DB に保存したり CS に役立てたりなどという事が出来ます。

    この記事では webhook を使う事によって短期間に大量のアクセスが返ってきてしまうような場合に DROBE がどのように対応しているかを解説します。

    課題

    そもそも webhook で大量のアクセスが返ってきてしまう場合とはどういった場合でしょうか?先ほどの SendGrid の例で言うと、アプリケーションの機能として任意のタイミングでユーザー全員にメールマガジンを配信したい場合などが考えられます。

    例えばユーザーが 100 万人のサービスで一斉に SendGrid のようなメール送信を行う外部サービスに送信リクエストを送ったらどうなるでしょうか? もちろんサービスの規約や内部実装に依存すると思いますが、多くの大規模なサービスではある程度大量のリクエストが来ても特に問題なく捌き素直に webhook を返すという挙動になると思います。その際にアプリケーション側では大量の HTTP リクエストが一気に来る事になるので (この場合では 100 万人分の webhook が)、自身が作り出した DDOS 攻撃にアプリケーションがされされるというような状況になります。

    Lambda と SQS でリクエストをスロットリングする

    DROBE ではこの問題に対処するために Lambda と SQS を用いてリクエストをスロットリングする proxy を作って対処しています。

    webhook proxy の概要図
    webhook proxy の概要図

    挙動はいたってシンプルで、外部サービスからのリクエストが来たら AWS Lambda によってリクエストを受けて Header と Body をそのまま SQS に入れていきます。同時に別の Lambda を SQS からのリクエストをデキューし、Header と Body を使って HTTP リクエストをアプリケーションに投げなおします。

    ここで API Gateway からのリクエストを SQS に入れていくラムダ (api2sqs と呼んでいます) は並列で動作するように設定しておきつつ、SQS からメッセージをデキューしてアプリケーションに HTTP リクエストを投げ直すラムダ (sqs2upstream と呼んでいます) の並列実行数を制限しておきます。

    こうする事で、アプリケーションサーバーに一切の変更を加える事なく、webhook など大量の外部サービスからのアクセスによるアクセス増を一定のレートに制限する事が可能になります。(もちろん SQS の詰まり具合に応じて webhook がアプリケーションサーバーに届くまでに一定のタイムラグがあるのでリアルタイム性が重要な機能には使えないという欠点があります。)

    Terraform によるセットアップ

    ここからは上記の webhook proxy の terraform によるセットアップを解説していきます。

    前提として、API Gateway で利用するドメインはすでに準備が出来ていて zone までは手動で切ってあり、ドメインに来たアクセスは zone にくるようにセットアップが出来ているものとします。また SSL のための Certificate も ACM を使って事前に準備しているものとしています。

    Lambda のソースコード

    まずは API Gateway からリクエストを受け取って SQS に入れる所のラムダのソースコードを書きます。ここでは js で書いていますがどんな言語でも問題ありません。

    api2sqs/index.js

    次に SQS からデキューしてアプリケーション側に HTTP リクエストを投げ直す処理を書きます。

    sqs2upstream/index.js

    Lambda

    次に Terraform で Lambda を作ります。

    Lambda.tf

    SQS

    SQS を作ります。設定は非常にシンプルです。

    sqs.tf

    API Gateway

    最後に API Gateway を作ります。

    api_gateway.tf

    変数の設定

    最後に変数の定義を以下のように書いておしまいです。

    app_env                        = "prod"
    upstream                       = "example.com" # 向き先のドメイン
    reserved_concurrent_executions = 1 # ここで並列度の定義をする
    zone_id                        = "zone_id" # 事前に作成した ZONE の id
    domain_cert_arn                = "arn_of_domain_cert" # ACM で作った Certificate の ARN
    data.tfvars

    まとめ

    この記事では AWS Lambda と SQS を用いて、外部サービスによる大量アクセスに対応する方法について解説しました。高いリアルタイム性が要求されるようなアプリケーションには向かない方法ですが、既存のアプリケーションに一切手を加える事なく実現出来るので、状況によっては使いやすい手法なのではと考えています。

    © 2025 DROBE All rights reserved.
    const https = require('https');
    var AWS = require('aws-sdk');
    
    exports.handler = async (event) => {
      const sqsUrl = process.env.SQS_URL # ここでは terraform 側から変数として渡している
    
      const messageBody = {
        'resource': event.resource,
        'headers': JSON.parse(event.headers),
        'body': JSON.parse(event.body)
      }
      
      var params = {
        MessageBody: JSON.stringify(messageBody),
        QueueUrl: sqsUrl,
       };
      const sqs = new AWS.SQS();
      const queueResponse = await sqs.sendMessage(params).promise()
      
      return {
          "statusCode": 200
      };
    };
    const https = require('https');
    
    exports.handler = event => {
      const upstream = process.env.UPSTREAM # ここも terraform 側から変数として渡している
      const path = process.env.PATH
    
      event.Records.forEach(record => {
        
        const { body } = record;
        const parsedBody = JSON.parse(body)
    
        const data = JSON.stringify(parsedBody.body)
        const options = {
            host: upstream,
            path: path,
            port: 443,
            headers: parsedBody.headers
            method: 'POST',
        }
    
        const req = https.request(options, res => {
          console.log(`statusCode: ${res.statusCode}`)
        }).on('error', error => {
          console.error(error)
        })
    
        req.write(data)
        req.end()
      });
    
    };
    locals {
      function_name_sqs2upstream = "lambda_proxy_sqs2upstream-${var.app_env}"
    }
    
    locals {
      function_name_api2sqs = "lambda_proxy_api2sqs-${var.app_env}"
    }
    
    data "aws_iam_policy_document" "lambda_assume_role" {
      statement {
        actions = ["sts:AssumeRole"]
    
        principals {
          type = "Service"
          identifiers = [
            "lambda.amazonaws.com",
          ]
        }
      }
    }
    
    resource "aws_iam_role" "lambda_proxy_sqs2upstream" {
      name               = "lambda_proxy-sqs2upstream-${var.app_env}"
      assume_role_policy = data.aws_iam_policy_document.lambda_assume_role.json
    }
    
    resource "aws_iam_role" "lambda_proxy_api2sqs" {
      name               = "lambda_proxy-api2sqs-${var.app_env}"
      assume_role_policy = data.aws_iam_policy_document.lambda_assume_role.json
    }
    
    data "aws_iam_policy" "lambda_execution_policy" {
      arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
    }
    
    # SQS からメッセージを受けたり、メッセージを削除する権限を付与する
    data "aws_iam_policy_document" "sqs2upstream_policy_document" {
      source_json = data.aws_iam_policy.lambda_execution_policy.policy
    
      statement {
        effect = "Allow"
    
        actions = [
          "sqs:ReceiveMessage",
          "sqs:DeleteMessage",
          "sqs:GetQueueAttributes"
        ]
    
        resources = [aws_sqs_queue.sqs_proxy.arn]
      }
    }
    
    resource "aws_iam_policy" "sqs2upstream_policy" {
      name   = "sqs2upstream_policy-${var.app_env}"
      policy = data.aws_iam_policy_document.sqs2upstream_policy_document.json
    }
    
    resource "aws_iam_role_policy_attachment" "lambda_proxy_sqs2upstream" {
      role       = aws_iam_role.lambda_proxy_sqs2upstream.name
      policy_arn = aws_iam_policy.sqs2upstream_policy.arn
    }
    
    resource "aws_iam_role_policy_attachment" "lambda_proxy_sqs2upstream_vpc_execution_role" {
      role       = aws_iam_role.lambda_proxy_sqs2upstream.name
      policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole"
    }
    
    # SQS にメッセージを送ったり削除したりする権限を付与する
    data "aws_iam_policy_document" "api2sqs_policy_document" {
      source_json = data.aws_iam_policy.lambda_execution_policy.policy
    
      statement {
        effect = "Allow"
    
        actions = [
          "sqs:SendMessage",
          "sqs:DeleteMessage",
          "sqs:GetQueueAttributes"
        ]
    
        resources = [aws_sqs_queue.sqs_proxy.arn]
      }
    }
    
    resource "aws_iam_policy" "api2sqs_policy" {
      name   = "api2sqs_policy-${var.app_env}"
      policy = data.aws_iam_policy_document.api2sqs_policy_document.json
    }
    
    resource "aws_iam_role_policy_attachment" "lambda_proxy_api2sqs" {
      role       = aws_iam_role.lambda_proxy_api2sqs.name
      policy_arn = aws_iam_policy.api2sqs_policy.arn
    }
    
    data "archive_file" "lambda_proxy_sqs2upstream" {
      type        = "zip"
      source_file = "lambda-src/sqs2upstream/index.js"
      output_path = "lambda-src/sqs2upstream/index.zip"
    }
    
    resource "aws_lambda_function" "lambda_proxy_sqs2upstream" {
      filename         = data.archive_file.lambda_proxy_sqs2upstream.output_path
      function_name    = local.function_name_sqs2upstream
      role             = aws_iam_role.lambda_proxy_sqs2upstream.arn
      handler          = "index.handler"
      source_code_hash = data.archive_file.lambda_proxy_sqs2upstream.output_base64sha256
      runtime          = "nodejs12.x"
    
      publish                        = true
      reserved_concurrent_executions = var.reserved_concurrent_executions # 並列度は変数で渡す
    
      memory_size = 128
      timeout     = 3
      environment {
        variables = {
          UPSTREAM = var.upstream
        }
      }
    }
    
    resource "aws_cloudwatch_log_group" "lambda_proxy_sqs2upstream" {
      name              = "/aws/lambda/${local.function_name_sqs2upstream}"
      retention_in_days = 1
    }
    
    data "archive_file" "lambda_proxy_api2sqs" {
      type        = "zip"
      source_file = "lambda-src/api2sqs/index.js"
      output_path = "lambda-src/api2sqs/index.zip"
    }
    
    resource "aws_lambda_function" "lambda_proxy_api2sqs" {
      filename         = data.archive_file.lambda_proxy_api2sqs.output_path
      function_name    = local.function_name_api2sqs
      role             = aws_iam_role.lambda_proxy_api2sqs.arn
      handler          = "index.handler"
      source_code_hash = data.archive_file.lambda_proxy_api2sqs.output_base64sha256
      runtime          = "nodejs12.x"
    
      publish = true
    
      memory_size = 128
      timeout     = 3
      environment {
        variables = {
          SQS_URL = aws_sqs_queue.sqs_proxy.id
        }
      }
    }
    
    resource "aws_cloudwatch_log_group" "lambda_proxy_api2sqs" {
      name              = "/aws/lambda/${local.function_name_api2sqs}"
      retention_in_days = 1
    }
    resource "aws_sqs_queue" "sqs_proxy" {
      name = "proxy-queue-${var.app_env}"
      tags = {
        Environment = var.app_env
      }
    }
    
    # この設定により SQS にイベントが入ってきたら sqs2upstream 関数が呼び出される
    resource "aws_lambda_event_source_mapping" "sqs_proxy" {
      batch_size       = 1 # ここでは 1 にしているがデキューの並列度をあげたい場合はもっと大きな値にする
      event_source_arn = aws_sqs_queue.sqs_proxy.arn
      function_name    = aws_lambda_function.lambda_proxy_sqs2upstream.arn
    }
    resource "aws_apigatewayv2_api" "gw_api_proxy" {
      name          = "gateway_api_proxy-${var.app_env}"
      protocol_type = "HTTP"
    }
    
    resource "aws_apigatewayv2_stage" "gw_state_proxy" {
      api_id = aws_apigatewayv2_api.gw_api_proxy.id
    
      name        = "gateway_state_proxy-${var.app_env}"
      auto_deploy = true
    
      access_log_settings {
        destination_arn = aws_cloudwatch_log_group.gw_lg_proxy.arn
    
        format = jsonencode({
          requestId               = "$context.requestId"
          sourceIp                = "$context.identity.sourceIp"
          requestTime             = "$context.requestTime"
          protocol                = "$context.protocol"
          httpMethod              = "$context.httpMethod"
          resourcePath            = "$context.resourcePath"
          routeKey                = "$context.routeKey"
          status                  = "$context.status"
          responseLength          = "$context.responseLength"
          integrationErrorMessage = "$context.integrationErrorMessage"
          }
        )
      }
    }
    
    resource "aws_apigatewayv2_integration" "gw_integration_proxy" {
      api_id = aws_apigatewayv2_api.gw_api_proxy.id
    
      integration_uri    = aws_lambda_function.lambda_proxy_api2sqs.invoke_arn
      integration_type   = "AWS_PROXY"
      integration_method = "POST"
    }
    
    resource "aws_apigatewayv2_route" "gw_route_proxy" {
      api_id = aws_apigatewayv2_api.gw_api_proxy.id
    
      route_key = "POST /"
      target    = "integrations/${aws_apigatewayv2_integration.gw_integration_proxy.id}"
    }
    
    resource "aws_cloudwatch_log_group" "gw_lg_proxy" {
      name              = "/aws/api_gw/${aws_apigatewayv2_api.gw_api_proxy.name}"
      retention_in_days = 1
    }
    
    resource "aws_lambda_permission" "api_gw" {
      statement_id  = "AllowExecutionFromAPIGateway"
      action        = "lambda:InvokeFunction"
      function_name = aws_lambda_function.lambda_proxy_api2sqs.function_name
      principal     = "apigateway.amazonaws.com"
    
      source_arn = "${aws_apigatewayv2_api.gw_api_proxy.execution_arn}/*/*"
    }
    
    resource "aws_apigatewayv2_domain_name" "api_gw_domain" {
      domain_name = "${var.domain_name}"
    
      domain_name_configuration {
        certificate_arn = var.domain_cert_arn
        endpoint_type   = "REGIONAL"
        security_policy = "TLS_1_2"
      }
    }
    
    resource "aws_route53_record" "api_gw_domain_record" {
      name    = aws_apigatewayv2_domain_name.api_gw_domain.domain_name
      type    = "A"
      zone_id = var.zone_id
    
      alias {
        evaluate_target_health = true
        name                   = aws_apigatewayv2_domain_name.api_gw_domain.domain_name_configuration[0].target_domain_name
        zone_id                = aws_apigatewayv2_domain_name.api_gw_domain.domain_name_configuration[0].hosted_zone_id
      }
    }
    
    resource "aws_apigatewayv2_api_mapping" "api_gw_domain_mapping" {
      api_id      = aws_apigatewayv2_api.gw_api_proxy.id
      stage       = aws_apigatewayv2_stage.gw_state_proxy.id
      domain_name = aws_apigatewayv2_domain_name.api_gw_domain.id
    }