読者です 読者をやめる 読者になる 読者になる

sometimes I laugh

高専卒WEBエンジニアがいろいろ残しておくブログ

AWS Data Pipelineで日次バッチ処理取得データをS3にポストする

platform AWS Data Pipeline

f:id:watass:20141103203916j:plain

最近コードを書いてはいるものの、なかなかブログにできるネタがないため、またブログを書くとなれば結局AWSとかインフラ側の話になっちゃうんですね。
さてさて、今回はCDPで問題になりがちなバッチ処理、ジョブスケジューリングです。単純に実現するならばインスタンスを立ててcronで実行する、という形になりますが、実行保証もないし、インスタンス落ちたら終わりだし、AWSのベストプラクティス的にはありえないです。
ではどうするのかといえば、SQSやSWFで冗長性を確保することですが、どちらも実装するにはちょっと面倒臭い。単純にある時間になったらデータを取得してきて、S3に投げるだけのバッチ処理を実装するのに、そんな苦労はしたくないし・・・
と思っていたとき、思わぬサービスを見落としていました。そう、AWS Data Pipelineです。本記事ではAWS Data Pipelineを使って日次バッチ処理で取得したデータをS3にポストするまでをやってみたいと思います。

AWS Data Pipelineとは

aws.amazon.com

AWS Data Pipelineとは、AWSのサービス、またはオンプレミスのリソース間のデータ処理、データ移動を簡単に設計できるフルマネージドサービスです。
まさに今回のような「日次バッチ処理で取得したデータをストレージにポストする」みたいなデータ取得、データ移動をAWSがサポートするサービスです。その他にも、S3上でバックアップを作成する、S3からEMRにログを流しこむ、などなど、ビックデータ時代、とりあえずS3の時代にはぜひとも抑えておきたいサービスですね。

冒頭でも書きましたが、cronで処理するような内容をインスタンスのcronにやらせずに、AWS Data Pipelineに行わせることによって、cronタスクの実行保証する仕組みをこちらで実装する必要なく、Data Pipelineが責任持って実行してくれるわけです。ありがたい。

料金はPipelineで実行されるアクティビティの数と実行頻度に応じて決定されます。日次で一回より多く実行される場合は高頻度のアクティビティとなるようです。ちなみに、無料枠だと低頻度のアクティビティ5つまで実行できます。
注意したいことは、PipelineのアクティビティでEC2などのリソースを使用する場合、その利用料も請求されるということです。インスタンスを立ち上げたり、シャットダウンしたりを繰り返すと思わぬ額になりますのでお気をつけください。

やりたいこと

ここで達成すべきタスクを明確にしておきます。やりたいことは本ブログのフィード(http://sil.hatenablog.com/feed)を日次で取得して、S3にポストすることです。このタスクをAWS Data Pipelineだけを使って実現します。なお、ポスト先のバケットは事前に作っておきます。

f:id:watass:20150330212921p:plain

Pipelineを作成する

前置きはこの辺にして、早速AWS Data Pipelineを使っていきましょう。マネージメントコンソールからクリック。Get started nowからPipelineを作成します。

f:id:watass:20150330211037p:plain

Create Pipelineから各項目を埋めていきます。

Main

基本的なPipelineの設定情報です。

Name

Pipelineの名前です。今回は「get_feed_to_s3」とします。

Description

Pipelineの説明です。任意なので特に無くてもOKです。

Source

Pipelineで使用するスクリプト?の指定です。Build using a templateを選択すると、既存のよく使うテンプレートを利用できます。個人的にはAWS CLIを指定できるのが結構便利そうです。他にはImport a definitionで外部ソースを参照できたりするようなのですが、今回はBuild using Architectで自作していきます。

Schedule

Pipelineを実行するスケジュールの設定です。

Run

once on pipeline activationかon a scheduleを設定できます。前者なら一回だけ実行され、後者なら指定した間隔で実行できます。前者を設定した場合には本項目の設定は終わりです。今回はテストなので前者にしておきます。

Run every

定期実行時に設定できます。毎回どのスパンで実行するか?ということが設定できます。

Starting

on pipeline activationか日付指定を設定できます。前者なら作成時から有効で、後者なら指定日付からPipelineが有効化されます。

Ending

neverか実行回数指定か、日付指定を設定できます。neverなら制限なし、実行回数を指定すれば、その回数だけ実行可能で、日付を指定すればその日までPipelineが有効になります。

Pipeline Configuration

Pipelineのその他設定です。

Logging

ログ出力設定です。Enableにするとログ出力先のS3バケットを指定できます。色々不安なのでしっかり指定しておくことをおすすめします。

Security/Access

セキュリティ関係の設定です。

IAM roles

IAMロールを指定できます。Defaultだと特に指定されないのかと思います。CustomにするとPipeline自体のIAMロールと、Pipelineで立ち上げられるインスタンスのIAMロールも指定できます。今回はDefaultでいいです。

Tags

タグ付けです。今は特に設定しません。


以上の項目を設定したら、Edit in Architectを押してアーキテクトの編集を行いましょう。

Architectを編集する

アーキテクトの編集画面に移ると、ちょっと不思議な画面になります。これがData Pipelineの特徴のひとつで、GUIベースでデータフロー図を書くことで、アーキテクトを作成できるというものです。
とりあえずざっと基本方針だけ書きますと、シェルスクリプトを実行するアクティビティを作成し、その実行結果をデータノード(S3)に出力するという感じです。各項目については以下で詳細に説明します。

Activities

データ操作、データ移動を定義するアクティビティです。歯車で表現されています。今回はgetFeedと名づけたアクティビティをひとつだけ追加しておきます。

Name

アクティビティの名前です。getFeedとしておきます。

Type

アクティビティの種類です。今回はシェルスクリプトを実行するアクティビティになりますので、ShellCommandActivityを指定しておきます。

Schedule

アクティビティの実行間隔です。前項目で一回だけ実行するようにしてありますので、RunOnceが指定できます。

Command

ShellCommandActivityで実行できるシェルスクリプトです。今回は以下のようなシェルスクリプトを書いておきます。

wget -P ${OUTPUT1_STAGING_DIR}/$(date +%Y%m%d)/ http://sil.hatenablog.com/feed

wgetのPオプションで保存先ディレクトリを指定します。ポイントはOUTPUT1_STAGING_DIRという変数を指定していることです。これを使うことで出力先のデータノードをローカルと同じように扱うことができます。今回の場合はS3ですね。ただし、この指定はStageの値をTrueにしておく必要がありますのでご注意ください。
ちなみに、dateコマンドを使うようにすることで日付別のディレクトリに保存できます。

Runs On

アクティビティが実行されるリソースを指定できます。リソースは別項目のResorcesで作成できますので、作成したものを指定します。

Output

アクティビティのデータ出力先です。データノードを追加してあると指定できます。今回は後々追加するfeed-storeという名前のデータノードを指定します。

Stage

アクティビティで使用されるリソースをステージングとして利用するかどうかを指定できます。Trueにすることでシェルスクリプト内でOUTPUT1_STAGING_DIRやINPUT1_STAGING_DIRなどの特殊変数を利用できます。

DataNodes

データを保持するノードの定義です。今回はfeed-storeと名づけて一つだけ作成します。

Name

データノードの名前です。今回はfeed-storeとします。

Type

データノードの種類です。今回はS3を使うのでS3DataNodeを指定します。

Schedule

データノードの使用間隔?です。よくわかりませんがRunOnceでOK。

Directory Path

データノードの保存先です。S3のパスを指定しておきましょう。

Schedules

実行間隔に関する設定です。前項目で設定していますので、特に弄らなくてOKです。

Resources

アクティビティの実行に使用するリソースの定義です。今回はwget-instanceと名づけてひとつだけ作成します。

Name

リソースの名前です。今回はwget-instanceとしておきます。

Type

リソースのタイプです。今回はec2を使うのでEc2Resourceを指定します。

Schedule

リソースの実行間隔?です。よくわかりませんがRunOnceでOK。

Role

Data Pipelineそのものが利用するIAM roleです。前項目で指定しているので特にいじりません。

Resource Role

Data Pipelineが作成するリソースに割り当てるIAM roleです。前項目で指定しているので特にいじりません。

Preconditons

Data Pipelineを実行する前提条件を確認できる機能です。高価なリソースをバリバリ使うような処理を書く場合、なるべく確実な状況で実行したいというニーズを満たすことができます。今回は特に設定しません。

Others

ログ出力先などの設定です。前項目で指定しているので特に記載しません。

Parameters

パラメータとか設定できるみたいですが、今回は指定しません。


以上を設定すると以下のような感じになります。

f:id:watass:20150330224428p:plain

見ていただけるとわかると思いますが、やりたいことで記載したアーキテクチャがそのまま書かれていますね。このように直感的に図を書くように設計できるのがAWS Data Pipelineのメリットですね。

Pipelineの有効化

さて、アーキテクトの編集が完了したらSave pipelineをクリックしましょう。

f:id:watass:20150330224823p:plain

おや、Warningが出ていますね。これはResorceにterminateAfterが指定されていないことが原因です。terminateAfterはリソース削除前にWaitする時間を指定できるようなのですが、なぜか10secで指定したところ、エラーになってしまったので今回はこのままやらせてもらいます。本当はよくないですね。こういうWarningを見逃し続けると大事なWarningを見逃したり、こういった非推奨に対する感覚が鈍くなりますからね。反省。

とりあえず保存できたらActivateをクリックして有効化しましょう。有効化に成功すれば以下のような画面に遷移します。

f:id:watass:20150330225624p:plain

はい、インスタンスが生成されているようですね。しばらく待ちましょう。

結果確認

インスタンス生成からしばらく待ったらアクティビティの進捗を確認してみましょう。

f:id:watass:20150330225750p:plain

お、完了しているようですね。
S3のバケットを覗きに行きましょう。

f:id:watass:20150330225834p:plain

ちゃんと保存されていました!
指定したログ出力先のバケットにも出力ログが保存されています。

まとめ

  • AWS Data Pipelineで高可用性のある日次バッチ処理を実装できる
  • Data PipelineではGUIでデータフローが設計できる
  • EC2インスタンスもStageをTrueにすることでデータの操作要素として利用できる

なかなか面白いぞData Pipeline

日次バッチ処理AWS上で実現する、というニーズは前からあって、SQSやSWFなどあーでもないこーでもないと考えたり、ネット上でもジョブスケジューリングをAWSで実現するのは難しい、みたいな話を見て困惑したりしていましたが、まさかData Pipelineでこんなにスマートにできるとは思っていませんでしたね。
ちなみに、Data Pipelineではアクティビティが失敗しても3回まで実行できます。それでも失敗した場合にはSNSで通知を飛ばせたりするので、かなり満足できる実装が可能です。

問題はインスタンスを利用する場合、分散処理を実現するときにインスタンスをやたら立ち上げすぎてしまうことでしょうか。インスタンスをバリバリ使ってでも分散処理で高速化したい場合はいいんですが、大量のデータ取得をなるべくひとつのインスタンスに任せつつ、実行保証を実現するのはちょっと工夫が必要かもしれません。アクティビティが大きくなると、失敗する可能性もそれだけ上がりますからね。

ちなみにインスタンスを利用する場合に、Resourceの項目でInstance Typeからインスタンスの種類を指定できるのですが、t2.microが指定できませんでした。第二世代のインスタンスは使用できないのでしょうか?デフォルトだとm1.smallになりますが、最小ならt1.microでもいいかと思います。

最後に、今回作成したPipelineのJSON出力を参考までに残しておきます。

{
  "objects" :  [
    {
      "id" : "DefaultSchedule",
      "name" : "RunOnce",
      "occurrences" : "1",
      "startAt" : "FIRST_ACTIVATION_DATE_TIME",
      "type" : "Schedule",
      "period" : "1 Day"
    },
    {
      "id" : "ResourceId_SbKD5",
      "schedule" : { "ref" : "DefaultSchedule" },
      "name" : "wget-instance",
      "role" : "DataPipelineDefaultRole",
      "type" : "Ec2Resource",
      "resourceRole" : "DataPipelineDefaultResourceRole"
    },
    {
      "id" : "DataNodeId_bEzxG",
      "schedule" : { "ref" : "DefaultSchedule" },
      "name" : "feed-store",
      "directoryPath" : "s3://analysis-log-store/",
      "type" : "S3DataNode"
    },
    {
      "id" : "Default",
      "scheduleType" : "cron",
      "failureAndRerunMode" : "CASCADE",
      "schedule" : { "ref" : "DefaultSchedule" },
      "name" : "Default",
      "pipelineLogUri" : "s3://log-watass/",
      "role" : "DataPipelineDefaultRole",
      "resourceRole" : "DataPipelineDefaultResourceRole"
    },
    {
      "id" : "ActivityId_tOrnH",
      "schedule" : { "ref" : "DefaultSchedule" },
      "name" : "getFeed",
      "command" : "wget -P ${OUTPUT1_STAGING_DIR}/$(date +%Y%m%d)/ http://sil.hatenablog.com/feed",
      "runsOn" : { "ref" : "ResourceId_SbKD5" },
      "type" : "ShellCommandActivity",
      "output" : { "ref" : "DataNodeId_bEzxG" },
      "stage" : "true"
    }
  ],
  "parameters" :  [

  ],
  "values" : {

  }
}

参考

AWS Data Pipelineによるタスクスケジューラの利用 | Developers.IO
# AWS Data Pipelineに可能性を感じた記事です。いつものクラスメソッドさんです。
S3 - AWS 白帯シリーズ(10) Amazon Data Pipeline を触ってみる - Qiita
# Data Pipelineの使い方がわかりやすくまとめられている良記事です。
定型データ移動処理のスケジュール自動化設定サービス『AWS DataPipeline』の構成要素をひと通り整理してみた | Developers.IO
# クラスメソッドさんの記事です。かなり詳細にData Pipelineについてまとめられているのでおすすめです。
AWS Data Pipeline とは - AWS Data Pipeline
# Data PipelineのAWS公式ドキュメントです。とりあえず読んどけって感じです。
ShellCommandActivity - AWS Data Pipeline
# Data PipelineのShellCommandActivityについてのAWS公式ドキュメントです。
アクティビティによるデータとテーブルのステージング - AWS Data Pipeline
# Data PipelineでEC2を使ったデータ処理についてのAWS公式ドキュメントです。