Goalist Developers Blog

serverlessとstep functionを利用した情報補完処理を作りました

お久しぶりです、ゴーリストのチナパです!

この度、ゴーリストで「自動化100」というテーマでエンジニア達がみんな合わせて様々プロセスを自動化しようとしています。(年内で100個のチャレンジです!)

大きな図から始めさせてください。 今年、ゴーリストのHR事業部で「プロダクトバリュー最大化」を目指し、提供しているデータをより価値高くしようとしています。長い話を簡潔にいうと、会社名からその会社のお電話番号を調べるため、様々なクローラーを利用して、プロセスを作成しました。ただし、これは現在一個ずつコマンドを叩いて実行しなきゃいけないので、最適ではありません。その現状から、AWSのstep functionを利用して、設定ファイルにしたがって、最初から最後まで勝手に動くようにしました。

f:id:c-pattamada:20200811114644p:plain
step functions!

細かい課題設定でいうと、いくつかのFargate/lambdaコンテナーを設定通り動くようにしたいです。(そもそもFargateでどうやってクローラー実行できるのとか気になる方はこちらを参考にしてください)。

最終形態はこちらです。

成功する時:

f:id:c-pattamada:20200811111317p:plain

そしてうまく行かない時も、静かに死ぬではなく、通知がきます。

f:id:c-pattamada:20200811111415p:plain

どうやってstep functionを利用できた?

一つのブログでとても抑えきれなさそうですので、いくつかの便利なところを紹介します。 step function を設定するためのserverless.yml の一部をサンプルとして用意しました。4つのとても便利なところをハイライトしました。

stepFunctions:
  stateMachines:
    addTelephoneFromCompanyName:
      definition:
        Comment: "Will do cool things."
        StartAt: MyTask
        States:
          SuccessNotification:
            Type: Task
            Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage}-successNotification"
            Parameters:
              "Payload.$": "$"
            End: True
          MyTask:
            Type: Task
            Resource: "arn:aws:states:::ecs:runTask.waitForTaskToken" # ➀
            Next: SuccessNotification
            ResultPath: "$.my_output"  # ②
            Catch:                                  #③
              - ErrorEquals:
                  - States.ALL
                Next: FailureNotification  # 失敗した際のstepを適当に懐ける
                ResultPath: $.Error
            HeartbeatSeconds: 300
            Parameters:
              LaunchType: FARGATE
              Cluster: !GetAtt FargateECSCluster.Arn
              TaskDefinition: !Ref FargateECSTaskDefinition
              NetworkConfiguration:
                AwsvpcConfiguration:
                  AssignPublicIp: ENABLED
                  SecurityGroups:
                    - !Ref FargateSG
                  Subnets:
                    - !Ref FargateSubnet
              Overrides:
                ContainerOverrides:
                  - Name: "hp_crawler_container"
                    Command:
                      - "python"
                      - "run.py"
                    Environment:  # ④
                      - Name: "TASK_TOKEN_ENV_VARIABLE"
                        "Value.$": "$$.Task.Token"
                      - Name: "FEED_BUCKET_NAME"
                        Value: "company-hp-crawler-serverless"
                      - Name: "SETTINGS_FILE"
                        "Value.$": "$.settings_file"
                      - Name: "INPUT_FILE"
                        "Value.$": "$.input_file"

上記のようなサンプルで➀から④のところが肝だなと思いました。

➀ waitForToken設定

ecsをが実行し、そのあとで、次のステップを行うため、waitForTaskTokenのやり方でやっていました。ecs側でsend_task_successを呼ぶ必要があります

② ResultPath

ResultPath: "$.my_output"  # ②

step functions のResultPathを利用したら、stateのアウトプットがそのパスのしたで出力されます。そして、stateのインプットを次のstateのも使う必要があれば、できます。 上記のinputが

{"my_input": "hi"}

であれば、アウトプットがこうなります:

{"my_input": "hi", "my_output": "..."}

ResultPathを利用しないと、インプットが上書きされます。

③ Catch:

何かの状況でstateが失敗した時、別処理を行うために使います。 私のお経験で3つの利用で失敗することがあります ・まずは、Taskをかいしする前の問題: 例ecsの定義に問題がある場合(存在しないなど) こちらはAmazon側でかってに認識してエラーが出るので、CatchがあればOKです。 ・ecsの中の処理が失敗する、や自分のコードにエラーが出る。 こちらはコードの中でtry・catch処理を行い、エラーが出る場合、send_task_failure を使ってます。 ・クローラーの場合、エラーが出ずにIOに立ち止まったり、他のトラブルもたまにありますので、定期的にsend_task_heartbeat をよび、state definitionで"HeartbeatSeconds: 300"でも設定すると、タイムアウトエラー的に機能します。

上記のどれかのやり方でエラーが出る場合、Catch処理を実行し、失敗の通知を送るように設定できてます。

④ ecsの環境変数

ecsにstateから変数を渡す時はecsの環境変数を使うとうまく行きました。InputPathを利用し、設定できます。つまり、 inputが

{"my_input": "hi"}

"$.my_input" を設定したら、環境変数に"hi"という文字列を設定できます。

まとめ

ほぼまる1ヶ月かかった自動化でしたので、具体的な説明はブログでまとめづらかったですが、上記の4つのtipsを使うと、 ecs + step functionでの開発をしようとしている方が少しでも楽になるかなと思ってます!

これからも自動化の話題いっぱい投稿するつもりなので、よろしくお願いします!