API Gateway WebSocket APIを.NET6.0で開発

AWS API Gateway WebSocket APIから起動するLambdaを.NET6.0で開発する方法を試しました。
※この記事ではWebSocket APIの基本的な仕組みについては解説しません。

WebSocket APIでできること

API GatewayはREST API(以下、REST)やHTTP API、WebSocket API(以下、WS)を提供できるフルマネージド型のサービスです。
従来は仮想サーバやコンテナ上にAPIを提供するためのミドルウェアを用意していましたが、API Gatewayを利用すればサーバレスでAPIの提供が可能になります。

業務システムの開発では、RESTをよく使いますが、WSはどういった用途で使えるでしょうか。
WSで最も特徴的な機能はサーバからクライアントへ通信を行うことができる点です。

AさんとBさんがチャットをすることを考えます。
Aさんがメッセージを送信し、Bさんの画面に表示する場合、RESTだとBさん側からデータを取りに行く必要があります。
一方、WSではBさんに対してサーバからメッセージを送信することができるので、Bさんがリロードボタンを押さなくてもAさんのメッセージを表示できます。

WSを使えば次のような機能を実現できます。

  • チャット
    Webアプリケーション上にチャット機能を実現できます。
    WSの用途としてまず最初に思い浮かぶのはこの機能ではないでしょうか。
    少し応用した例としてお絵描きチャットなどでもWSが利用されています。

電気通信事業として届け出が必要
クローズドチャットを実装する場合、たとえ個人開発であったとしても国(総務省)へ届け出が必要になる場合があるので注意が必要です。
違反した場合は刑事罰が適用される可能性もあります。

また、単に届け出するだけではなく適切に事業を行うことが求められます。
「通信を検閲してはいけない」や「通信に関して知り得た他人の秘密を守る」、「重大事故が発生した場合は遅滞なく報告する」などです。

具体的には、チャットや掲示板のうち「特定のユーザー同士しか閲覧できない場合」が対象となります。
ただし、社内アプリで自社内のユーザーしか利用しない場合は対象外となります。

※この記事の内容は法的な保証を行うものではありませんので、細かな条件は電気通信事業法をご参照いただくか、自社の顧問弁護士に相談してみてください。個人開発の場合は責任をもって条文を理解したうえで総務省に相談してみてください。

  • 1つのデータの共同編集
    複数の人が同じ顧客のデータを更新する場合、排他制御を考える必要があります。
    例えば、Aさんがデータを更新し始めたときにそれ以外の人の入力項目をロックしてしまうことも可能です。

  • バックグラウンド実行
    大量のデータから帳票を作成して出力する場合、リクエストを受信したらサーバ側で帳票を作成、作成が完了したらダウンロード可能になる、という具合の処理が考えられます。
    WSなら帳票作成になったとたんにダウンロードボタンを有効にすることが可能です。

  • サーバサイドレンダリング(キー入力を送信して処理結果を返却)
    クライアントサイドでゲームの計算処理を行わずにサーバサイドで計算処理を行い、リアルタイムに結果を反映できます。
    これはゲーム等で用いられており、有名なゲームだとグランブルーファンタジーがそうです。

  • メトリクス
    IoTからの情報をリアルタイムにクライアントへ送信できます。
    畑の状態管理などに利用できそうですね。

WebSocketがはじかれてしまう?
通信を監視してフィルタリングするサービスによってはWS非対応のケースがあります。
iFilter@Cloud
WSをフィルタリングの対象外にする設定が必要です。

また、アプリケーション側もWS非対応のケースを想定して、WSでエラーが発生した場合にロングポーリングへ切り替える実装も検討してください。

全体像

アーキテクチャ(API Gateway, Lambda, DynamoDBを作成)

.NET6.0の範囲は接続(入室)、切断(退室)用Lambdaとメッセージ送信用Lambdaの3つです。

DynamoDBは接続情報を保持するためのに使います。
接続情報さえ管理できれば良いのでRDS等でも問題ありませんが、接続情報くらいしか管理しないのであれば安価でスケールしやすいDynamoDBがよさそうです。

API Gatewayへの認証にはCognito Authorizerが使えると楽なのですが、WebSocketAPIでは対応していません。
Cognitoを使用する場合はCustom AuthorizerでCognitoと連携する必要があります。
Authorizerも.NET6.0で開発していくのですが、こちらは次回の記事にしようと思います。

処理の流れは以下の通り

  • 入室
  1. ClientからOnConnectのWebSocketリクエストを送信

  2. OnConnect LambdaがAPI Gatewayから渡されるユーザーの接続IDをDynamoDBに保存

  • メッセージ送信
  1. 何等かのトリガーでSendMessageのリクエストを送信
    ※WebSocketでリクエストを受け取っても良いですし、REST APIやS3トリガー等でも問題ないです。

  2. SendMessage LambdaがDynamoDBから接続中のユーザーを探してメッセージを送信

  3. Clientにメッセージが届くので、内容に応じて処理を行う

  • 退室
  1. ClientからDisConnectのWebSocketリクエストを送信

  2. OnDisconnect LambdaがAPI Gatewayから渡されるユーザー接続IDをDynamoDBから削除

AWSリソースの準備

以下のSAM Templateを用意しました。
template.yaml

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Sample SAM Template for sam-websocket

Parameters:
  TableName:
    Type: String
    Default: 'ws_connections'
  StageName:
    Type: String
    Default: 'Prod'

Globals:
  Function:
    Timeout: 10
    MemorySize: 256
    Runtime: dotnet6
    Environment:
      Variables:
        TABLE_NAME: !Ref TableName

Resources:
# Lambda関数の定義
  OnConnectFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: ./src/OnConnect/
      Handler: OnConnect::OnConnect.Function::FunctionHandler
      Policies:
      - DynamoDBCrudPolicy:
          TableName: !Ref TableName
  # API GatewayからLambda Functionを実行するための権限をあげます
  # 実運用ではSourceArnを明示的に指定して最小権限の許可としてください
  OnConnectPermission:
    Type: AWS::Lambda::Permission
    DependsOn:
      - SimpleWebSocket
    Properties:
      Action: lambda:InvokeFunction
      FunctionName: !Ref OnConnectFunction
      Principal: apigateway.amazonaws.com

  OnDisconnectFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: ./src/OnDisconnect/
      Handler: OnDisconnect::OnDisconnect.Function::FunctionHandler
      Policies:
      - DynamoDBCrudPolicy:
          TableName: !Ref TableName
  # API GatewayからLambda Functionを実行するための権限をあげます
  # 実運用ではSourceArnを明示的に指定して最小権限の許可としてください
  OnDisonnectPermission:
    Type: AWS::Lambda::Permission
    DependsOn:
      - SimpleWebSocket
    Properties:
      Action: lambda:InvokeFunction
      FunctionName: !Ref OnDisconnectFunction
      Principal: apigateway.amazonaws.com

  SendMessageFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: ./src/SendMessage/
      Handler: SendMessage::SendMessage.Function::FunctionHandler
      Policies:
      - DynamoDBCrudPolicy:
          TableName: !Ref TableName
      # Lambda FunctionからAPI Gatewayに接続しているユーザーへのメッセージ送信権限をあげます
      - Statement:
        - Effect: Allow
          Action:
          - 'execute-api:ManageConnections'
          Resource:
          - !Sub 'arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${SimpleWebSocket}/*'
      Environment:
        Variables:
          API_URL: !Ref SimpleWebSocket
          API_STAGE_NAME: !Ref StageName

  # DynamoDBのテーブル定義:今回はルームIDなど、送信先を絞り込むためのキーは作成していません。
  ConnectionsTable:
    Type: AWS::Serverless::SimpleTable
    Properties:
      PrimaryKey:
        Name: connectionId
        Type: String
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5
      TableName: !Ref TableName

# API Gatewayの定義
# 設定の詳細はSAM TemplateではなくCloudFormation Templateを参照してください
  SimpleWebSocket:
    Type: AWS::ApiGatewayV2::Api
    Properties:
      Name: SimpleWebSocket
      ProtocolType: WEBSOCKET
      RouteSelectionExpression: "$request.body.action"

  # API GatewayとLambda Functionを紐づけるためにはルートと統合のリソースが必要です。
  ConnectRoute:
    Type: AWS::ApiGatewayV2::Route
    Properties:
      ApiId: !Ref SimpleWebSocket
      RouteKey: $connect
      Target: !Join
        - '/'
        - - 'integrations'
          - !Ref ConnectInteg
  ConnectInteg:
    Type: AWS::ApiGatewayV2::Integration
    Properties:
      ApiId: !Ref SimpleWebSocket
      IntegrationType: AWS_PROXY
      IntegrationUri:
        Fn::Sub:
            arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${OnConnectFunction.Arn}/invocations

  # API GatewayとLambda Functionを紐づけるためにはルートと統合のリソースが必要です。
  DisconnectRoute:
    Type: AWS::ApiGatewayV2::Route
    Properties:
      ApiId: !Ref SimpleWebSocket
      RouteKey: $disconnect
      Target: !Join
        - '/'
        - - 'integrations'
          - !Ref DisconnectInteg
  DisconnectInteg:
    Type: AWS::ApiGatewayV2::Integration
    Properties:
      ApiId: !Ref SimpleWebSocket
      IntegrationType: AWS_PROXY
      IntegrationUri:
        Fn::Sub:
            arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${OnDisconnectFunction.Arn}/invocations

  Deployment:
    Type: AWS::ApiGatewayV2::Deployment
    DependsOn:
    - ConnectRoute
    - DisconnectRoute
    Properties:
      ApiId: !Ref SimpleWebSocket
  Stage:
    Type: AWS::ApiGatewayV2::Stage
    Properties:
      StageName: !Ref StageName
      DeploymentId: !Ref Deployment
      ApiId: !Ref SimpleWebSocket

.NET6.0でLambdaを開発

必要なパッケージはこちらです。

// 共通で必要です
Amazon.Lambda.Core
Amazon.Lambda.Serialization.SystemTextJson
AWSSDK.DynamoDBv2

// SendMessage以外で必要です
Amazon.Lambda.APIGatewayEvents

// SendMessageのみで必要です
AWSSDK.ApiGatewayManagementApi

以下、.NETのコードですが最低限の処理内容だけに削っています。
例外処理や同一ユーザーが入室してきた場合の考慮等も必要です。

  • OnConnect
    入室時の処理内容はシンプルで、DynamoDBにconnectionIdを登録しているだけです。

OnConnect/Function.cs

using Amazon.Lambda.APIGatewayEvents;
using Amazon.Lambda.Core;
using Amazon.DynamoDBv2;
using Amazon.DynamoDBv2.Model;

[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace OnConnect
{
  public class Function
  {
    public async Task<APIGatewayProxyResponse> FunctionHandler(APIGatewayProxyRequest apigProxyEvent, ILambdaContext context)
    {
      IAmazonDynamoDB ddbClient = new AmazonDynamoDBClient();

      Dictionary<string, AttributeValue> attributes = new Dictionary<string, AttributeValue>();
      attributes["connectionId"] = new AttributeValue
      {
        // APIGatewayから渡される情報の中にクライアントを識別するためのConnectionIdが入っています
        S = apigProxyEvent.RequestContext.ConnectionId
      };

      PutItemRequest ddbRequest = new PutItemRequest()
      {
        TableName = Environment.GetEnvironmentVariable("TABLE_NAME"),
        Item = attributes
      };

      // 接続情報をDynamoDBに保存
      // いわゆる入室処理
      PutItemResponse ddbResponse = await ddbClient.PutItemAsync(ddbRequest);

      // DynamoDBへの保存成否をによって処理結果を返却してください(ここでは一律成功で返却)
      return new APIGatewayProxyResponse
      {
        StatusCode = 200,
        Headers = new Dictionary<string, string> { { "Content-Type", "application/json" } }
      };
    }
  }
}
  • OnDisconnect
    退室時もシンプルで、DynamoDBにから該当するconnectionIdを削除しているだけです。

OnDisconnect/Function.cs

using Amazon.Lambda.APIGatewayEvents;
using Amazon.Lambda.Core;
using Amazon.DynamoDBv2;
using Amazon.DynamoDBv2.Model;

[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace OnDisconnect
{
  public class Function
  {
    public async Task<APIGatewayProxyResponse> FunctionHandler(APIGatewayProxyRequest apigProxyEvent, ILambdaContext context)
    {
      IAmazonDynamoDB ddbClient = new AmazonDynamoDBClient();

      Dictionary<string, AttributeValue> attributes = new Dictionary<string, AttributeValue>();
      attributes["connectionId"] = new AttributeValue
      {
        // APIGatewayから渡される情報の中にクライアントを識別するためのConnectionIdが入っています
        S = apigProxyEvent.RequestContext.ConnectionId
      };

      var ddbDeleteRequest = new DeleteItemRequest
      {
        TableName = Environment.GetEnvironmentVariable("TABLE_NAME"),
        Key = attributes
      };

      // 接続情報をDynamoDBから削除
      // いわゆる退室処理
      await ddbClient.DeleteItemAsync(ddbDeleteRequest);

      // DynamoDBへの保存成否をによって処理結果を返却してください(ここでは一律成功で返却)
      return new APIGatewayProxyResponse
      {
        StatusCode = 200,
        Headers = new Dictionary<string, string> { { "Content-Type", "application/json" } }
      };
    }
  }
}
  • SendMessage
    今回はManagement Consoleから直接メッセージを送信し、接続しているユーザーに届くかを試します。
    そのため、第1引数も単純に文字列で受け取って戻り値も何も返しません。
    トリガーしたいイベントに応じて引数のモデルを変えるかobjectで引き受けて変換してください。

SendMessage/Function.cs

using System.Text;

using Amazon.ApiGatewayManagementApi;
using Amazon.ApiGatewayManagementApi.Model;
using Amazon.Lambda.Core;
using Amazon.DynamoDBv2;
using Amazon.DynamoDBv2.Model;

[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace SendMessage
{
  public class Function
  {
    public async Task FunctionHandler(string input, ILambdaContext context)
    {
      IAmazonDynamoDB ddbClient = new AmazonDynamoDBClient();

      // 送信対象先を取得
      // 実際はRoomIDなどで条件を指定したり、特定のユーザーのみを検索したりします
      // 取得データが1MBを超える場合はLastEvaluateKeyを使用してループ処理で1MB以降のデータを取得する必要があります
      var scanResponse = await ddbClient.ScanAsync(new ScanRequest
      {
        // 直書きしてますが、実際には環境変数やクラスを定義しておくなりしてください
        TableName = Environment.GetEnvironmentVariable("TABLE_NAME"),
        ProjectionExpression = "connectionId"
      });

      var apiClient = new AmazonApiGatewayManagementApiClient(new AmazonApiGatewayManagementApiConfig
      {
        ServiceURL =  $"https://{Environment.GetEnvironmentVariable("API_URL")}.execute-api.ap-northeast-1.amazonaws.com/{Environment.GetEnvironmentVariable("API_STAGE_NAME")}"
      });

      // 送信するメッセージはMemoryStream形式で用意
      // 今回は固定で'test message'と送っています
      var stream = new MemoryStream(UTF8Encoding.UTF8.GetBytes(input));

      // 送信対象先毎にメッセージを送信しています。
      foreach (var item in scanResponse.Items)
      {
        await apiClient.PostToConnectionAsync(new PostToConnectionRequest
        {
          ConnectionId = item["connectionId"].S,
          Data = stream
        });
      }
    }
  }
}

動作確認

  • 接続
    接続確認にはwscatを使います。
# インストールしていない場合のみ
PS D:\dev\lambdaDotnetSample> npm install -g wscat

# <ApiGatewayId>はご自身の環境に合わせて変更してください。
PS D:\dev\lambdaDotnetSample> wscat -c wss://<ApiGatewayId>.execute-api.ap-northeast-1.amazonaws.com/Prod
Connected (press CTRL+C to quit)
>

DynamoDBのテーブルを覗くとconnectionIdが登録されていることがわかります。 DynamoDBのテーブルにconnectionIdが登録されていることを確認

  • メッセージ送信
    今回作成したLambda Function(SendMessage)はトリガーを設定していないのでManagement Consoleのテスト機能を使って実行します。

Management ConsoleでLambdaをテスト実行

先ほど接続したターミナルを見ると、メッセージが届いていることがわかります。

PS D:\dev\lambdaDotnetSample> wscat -c wss://<ApiGatewayId>.execute-api.ap-northeast-1.amazonaws.com/Prod
Connected (press CTRL+C to quit)
< テストメッセージを送信していますよ
>
  • 切断
    接続しているターミナルを閉じるかCTRL+Cを入力することで切断されます。

DynamoDBのテーブルを覗くとconnectionIdが削除されていることがわかります。 DynamoDBのテーブルからconnectionIdが削除されていることを確認

まとめ

.NET6.0でWebSocketの接続先にメッセージを送るときのポイントは以下の通りです。

using Amazon.ApiGatewayManagementApi;
using Amazon.ApiGatewayManagementApi.Model;

// API Gatewayに接続するためのクライアント
// AmazonApiGatewayManagementApiClient 

// API Gatewayに接続されている先にメッセージを送信するメソッド
// AmazonApiGatewayManagementApiClient.PostToConnectionAsync

以上