はじめに
タイトル通りの事をします。
構成としてはこのような形。
詳細は後述しますが、データ整形用にAWS Lambda(以下Lambda)が必要となります。
似たような構成を試したい方の参考となるよう、色々とメモ書きを残します。
なお、説明は上記データの流れに沿って行います。実際の構築順序とは異なることをご了承ください。構築するのであればS3から順に遡って行き、最後にAthena作るのが良いです。
目次
各種設定
Amazon CloudWatch
Amazon Kinesis Data Firehose(以下Firehose)に送るにあたり、サブスクリプションフィルタを指定します。
サブスクリプションフィルタの設定
設定する為に、先にFirehoseを作成する必要があります。
Firehoseが構築済であれば、CloudTrailログを出力しているロググループの管理画面を開き、右下の「作成>Create kinesis Firehose subscription filter」を押下。
その後はコンソールで指定された値を登録。
IAM RoleはAmazon CloudWatch Logs UserGuideの例3手順8-手順11を参考にしてください。
ログ形式は「その他」でパターンには何も記載せず、サブスクリプションフィルタ名だけ設定。
Amazon Kinesis Data Firehose
設定する為に、先にLambdaを作成する必要があります。
また、以下に注意してください。
- Lambdaを指定すること
- データ圧縮は使用しない事(CloudWatch Logsから送信されたデータは、既にgzipレベル6圧縮で圧縮されてます。)
迷うことがあればAmazon CloudWatch Logs UserGuideの例3を参考にしてください。
(例はCLIで操作してますが、設定値はコンソール上でも同一です。)
Lambdaが必要な理由
BlackBeltのQAに以下の記載があります。
Q8.
Amazon Kinesis Firehose を利用してCloudWatch LogsをS3に転送してそれをAthena で分析したいのですが、
Kinesis Firehoseを通すと{json}{json}のように1行に複数のJSONオブジェクトが保存されるようです。
このデータを効率的にAthenaで分析するにはどういった方法がありますか??
A8.
Amazon Kinesis FirehoseにはData TransformationをAWS Lambdaで行う機能がございますので,こちらを使って所望の形式に変換すると良いです.
出典: AWS Solutions Architectブログ - AWS Black Belt Online Seminar「Amazon Athena」の資料およびQA公開
AWS Lambda
設計図(ブループリント)をそのまま利用することが出来ます。
Lambdaの画面から「関数の作成」を選択した後、以下の画面で「設計図の使用>kinesis-firehose-cloudwatch-logs-processor-python」を選択します。
私はこのLambdaの実行IAMロールを新規作成しました。また、作成後にLambdaの実行時間を1分に伸ばしてあげてください。そのほかはデフォルトでOKです。
Amazon S3
普通に構築するだけでOKです。必要に応じて暗号化なりライフサイクルルールなり設定してください。
Amazon Athena
以下のクエリを順に実行します。
1. データベースの作成
CREATE DATABASE <TABLE名> ;
2. テーブルの作成
以下の値はご自身の環境に合わせて設定ください。
- データベース名
- テーブル名
- S3バケットパス
また、データの開始日が2020年1月1日以前である場合、projection.datehour.rangeも任意に変更してください。
CREATE EXTERNAL TABLE IF NOT EXISTS <データベース名>.<テーブル名> ( eventversion STRING, useridentity STRUCT< type:STRING, principalid:STRING, arn:STRING, accountid:STRING, invokedby:STRING, accesskeyid:STRING, userName:STRING, sessioncontext:STRUCT< attributes:STRUCT< mfaauthenticated:STRING, creationdate:STRING>, sessionissuer:STRUCT< type:STRING, principalId:STRING, arn:STRING, accountId:STRING, userName:STRING>>>, eventtime STRING, eventsource STRING, eventname STRING, awsregion STRING, sourceipaddress STRING, useragent STRING, errorcode STRING, errormessage STRING, requestparameters STRING, responseelements STRING, additionaleventdata STRING, requestid STRING, eventid STRING, resources ARRAY<STRUCT< ARN:STRING, accountId:STRING, type:STRING>>, eventtype STRING, apiversion STRING, readonly STRING, recipientaccountid STRING, serviceeventdetails STRING, sharedeventid STRING, vpcendpointid STRING ) PARTITIONED BY ( datehour string ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( 'serialization.format' = '1' ) LOCATION 's3://<S3バケットパス>/' TBLPROPERTIES ( "projection.enabled" = "true", "projection.datehour.type" = "date", "projection.datehour.range" = "2020/01/01/00,NOW", "projection.datehour.format" = "yyyy/MM/dd/HH", "projection.datehour.interval" = "1", "projection.datehour.interval.unit" = "HOURS", "storage.location.template" = "s3://<S3バケットパス>/${datehour}" );
おわりに
特にこのブログにまとめておきたかったのは、Firehose経由でS3に送った際、Athenaでクエリ出来ない事とその解決方法です。
(オブジェクトが全て1行にまとまってしまうことへの対応方法。)
とはいえ、AWSは常に進化し続けるサービスです。近い将来このブログ記事が不要となる事を祈っています。