可用性の高いcron処理の為にJob Observerパターンを使う
1日に一度だけ、1時間に一度だけ、あるタイミングで処理を走らせたいというニーズは常に存在します。昔から多くのエンジニアはそういった要望に対して、サーバを用意して、crontabに独自の魔法を書くことで対応していました。
時は現代、インフラといえばAWS、GCP、Azure... サーバを使い捨て可能なリソースとして扱えることのメリットに人々は熱狂しました。また、AWSが掲げた思想、"Design for Failure"(障害の為の設計)は多くのエンジニアに受け入れられました。
ここで改めて定刻に処理を行う方法について見てみると、やっぱりインスタンスを立ててcrontabを利用する方法が使われているようです。これは決して可用性の高い実装ではありません。なんとかしなくてはいけません。
以前、同様の問題に対して、DataPipelineを用いたアプローチを検討しましたが、スケジュール処理をDataPipelineに委任しているだけで、可用性が高いとはいまいち言えませんでした。
そこで今回は、CDPでも紹介されている可用性の高いバッチ処理のデザインパターン、Job Observerパターンを使って、可用性の高いcron処理基盤を構築してみようと思います。
Job Observerパターン概要
基本的にはCDP:Job Observerパターン - AWS-CloudDesignPatternの通りですが、ポイントはキューにメッセージをエンキューするProducerにLambdaを使用しているところです。Lambdaはscheduled functionに対応しているので、これを利用してジョブの発行をします。
また、AutoScalingグループのインスタンスは基本0に設定しておいて、キューにメッセージが溜まった瞬間にCloudWatchのアラームを利用してインスタンス数を1にします。インスタンスは起動時にキューからメッセージをデキューして、cronで実行したいジョブを完了したらメッセージを削除するようにします。これによって、インスタンスが正しく起動できずに処理を行えない場合も、メッセージ自体が残る上に、AutoScalingグループのスケーリングポリシーにしたがって、正常なインスタンスを保持するために新しいインスタンスが起動するので、問題なく処理を行うことができます。
キューの作成
まずはキューを作成します。SQSを使います。
特にパラメータには拘る必要はないかと思います。メッセージ保持期間について少し検討するぐらいかと。今回は日次バッチ処理を想定しているので、3時間も保持しておけば十分です。
Lambda Functionの作成
続いて、Lambda Functionを作成します。今回はscheduled functionを作成するので、Select blueprintでlambda-canaryを選択します。
Configure event sourcesではどの間隔でLambda Functionを実行するか設定します。見ればわかると思いますが、時刻によってLambda Functionを実行するためにCloudWatch Eventsを利用しているんですね。
Schedule expressionではcron方式の記述もできます。ただ、CloudWatch Eventsのcron表記は通常のcron方式とは微妙に異なるので注意してください。
Schedule Expression Syntax for Rules - Amazon CloudWatch
設定ができたら、実行するスクリプトを記述します。lambda-canaryを選択すると、Pythonのサンプルスクリプトが表示されるので、それを利用します。こんな感じです。
from datetime import datetime import logging import boto3 AWS_SQS_ENDPOINT = 'https://sqs.ap-northeast-1.amazonaws.com/XXXXXXXXXXXX/batch-test' logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): logger.info('Task Running...') client = boto3.client('sqs') message = 'batch job in ' + datetime.now().strftime("%Y/%m/%d %H:%M:%S") try: res = client.send_message( QueueUrl = AWS_SQS_ENDPOINT, MessageBody = message, MessageAttributes = { 'Name': { 'StringValue': 'batch_job', 'DataType': 'String', } } ) except Exception as e: logger.error(e) raise e else: logger.info('Successfully Completed!') return res
AWS_SQS_ENDPOINTには先程作成したSQSのエンドポイントを宣言します。loggerを使用していますが、これによって出力されるログはCloudWatch Logsに自動的に流し込まれるので、実行に失敗したこと自体もアラームを作成することで監視ができます。これは嬉しいですね。
また、本スクリプトでは、SQSにメッセージをエンキューするので、パーミッションの設定が適切でなくてはいけません。Lambda function handler and roleの設定で、適切なRoleを指定してください。今回はlambda_basic_executionを作成して、SQSのフルアクセス権をアタッチしました。
Advanced Settingでタイムアウトの時間が指定できますが、ここも拘らなくて大丈夫だと思います。最大で5分まで設定できるので、今回は5分にしておきました。
以上でLambda Functionの作成は完了です。せっかくなのでテストしておきます。Configure test eventを選択してSchedued Eventを設定し、Save and Testを実行します。
しばらくすると、Lambda Functionの画面の一番下に実行結果が表示されます。
成功していますね。ついでにキューのメッセージ数も確認しておきましょう。利用可能なメッセージ数が1になっているはずです。メッセージはUIから削除しておいてください。
Consumerインスタンスの準備
LambdaでProducer-ConsumerパターンにおけるProducer側ができたので、今度はConsumer側にあたるインスタンスの準備をします。スペックには特に制限はありませんが、SQSからメッセージをデキューすることができるように、適切なIAMロールを割り当てておきます。
スクリプトにはPythonを使用するので、boto3をインストールしておきましょう。
$ sudo pip install boto3
なお、リージョンだけは指定する必要があるので、aws configureでap-northeast-1を指定します。アクセスキーは設定しなくて大丈夫です。
$ aws configure AWS Access Key ID [None]: AWS Secret Access Key [None]: Default region name [None]: ap-northeast-1 Default output format [None]:
肝心のスクリプトはこんな感じです。
consumer.py
import boto3 AWS_SQS_ENDPOINT = 'https://sqs.ap-northeast-1.amazonaws.com/XXXXXXXXXXXX/batch-test' def queue_task_exists(): client = boto3.client('sqs') res = client.receive_message( QueueUrl = AWS_SQS_ENDPOINT, ) if len(res) == 2: client.delete_message( QueueUrl = AWS_SQS_ENDPOINT, ReceiptHandle = res['Messages'][0]['ReceiptHandle'], ) return True else: return False def work_job(): client = boto3.client('s3') res = client.put_object( Bucket = 'feed-store', Key = 'dummy-feed', ) if __name__ == '__main__': if queue_task_exists(): work_job()
キューにメッセージが存在すればメッセージを削除してTrueを返す関数を定義し、その関数の返り値がTrueならばwork_job関数を実行する単純なスクリプトです。メッセージのレスポンスにはメタデータが含まれるので、メッセージを受け取れなかった場合のリストの長さは1になることに注意してください。
また、今回は例外についてガン無視していますが、実際に使用する場合にはqueue_task_exists関数は例外を返しうると考えて実装してください。今回はバッチ処理として実行したい内容にはS3へのオブジェクト作成を選びました。実際にはここに好きなタスクを記述してください。
後はインスタンス起動時にスクリプトを実行できるように、crontabの@rebootに仕込みます。
@reboot python /home/ec2-user/consumer.py
念のために再起動して動作が確認できたら、インスタンスのAMIを生成して、インスタンス自体は削除してしまいましょう。ここまででConsumerインスタンスの準備は完了です。
CloudWatchアラームを作成
次に、キューの状態を監視するCloudWatchアラームを作成します。SQSのApproximateNumberOfMessagesVisibleメトリクスを選択します。
アラームのしきい値には0以上を選択します。常にアラームが立って気持ち悪いですが、我慢してください。通知設定は何も設定しません。後ほど、AutoScalingグループをひも付けますが、今は何も指定せずに、単に警告を立てるだけのアラームになります。以上でとりあえず設定完了です。
AutoScalingグループを作成
最後に、Consumerインスタンスを管理するためのAutoScalingグループを作成します。起動設定を作成した上でAutoScalingグループを作成する必要があります。
起動設定はほとんど通常のインスタンス作成と変わりません。ただ、AMIは先程作成したConsumerインスタンスのAMIを選択してください。起動設定が完了したら、それを選択してAutoScalingグループを作成していきます。
無駄にインスタンスを起動したくないので、グループサイズは0にしておきます。VPC設定を作成していれば、どこにインスタンスを起動するか選択できますので、VPCとサブネットを選択します。Availability Zoneごとにサブネットを作成しておき、それらをすべて選択すれば、一方のAvailability Zoneで大規模な障害があっても、バッチ処理を行うことができます。
最も重要なのはスケーリングポリシーの設定です。先ほど作成したCloudWatchアラームを元にスケーリングポリシーを設定します。スケーリング範囲は0から1にして、段階スケーリングポリシーを適用します。具体的にはメッセージ数が0ならば、インスタンス数を0に、1以上ならばインスタンス数を1にするように設定します。
以上ですべて設定完了です。後はLambda Functionで指定した時間になれば、S3にオブジェクトが作成されます!
まとめ
- cronにJob Observerパターンを利用するならばProducerにLambdaを使う
- AutoScalingグループを基本0にして、キューにメッセージが溜まったらCloudWatchアラームからインスタンスを起動させる
- Consumer側はキューを確認し、メッセージが存在すれば削除して処理を行う
可用性は高いが、やたらと複雑
今回は従来、EC2だけで完結していた処理をあえて、Lambda、CloudWatch、SQS、AutoScaling、EC2を組み合わせて実行するようにしました。確かに可用性の高い仕組みにはなりましたが、構築コスト、運用コストなどを考えると、場合によってはEC2のcronで実行して、実行結果をCloudWatch Logsに流し込み、そのメトリクスを監視することで問題が発生した場合には手動で対応する、という方法もありといえばありなんじゃないかなと思ってきました(よいアーキテクチャとは言えませんが)。
また、サービスをまたぐことで、どこかのサービスがダメになることでシステムが止まる、時間になってから実行されるまでのタイムラグが長い(今回の場合は10分くらいかかりました)などの問題も残ります。とはいえ、実行ログをCloudWatch Logsで管理できる、ジョブが増えてもインスタンスを増やすことでスケールアウトできるなどのメリットもあるので、こういった構築パターンも検討できるといいかなと思います。