MHiroのエンジニアブログ

埼玉某所に住むインフラエンジニアのブログ

CloudwatchLogsに出力されてるログをS3に送ってAthenaで分析する

はじめに

タイトル通りの事をします。

構成としてはこのような形。
詳細は後述しますが、データ整形用にAWS Lambda(以下Lambda)が必要となります。

f:id:mhirox:20210822165219p:plain

似たような構成を試したい方の参考となるよう、色々とメモ書きを残します。
なお、説明は上記データの流れに沿って行います。実際の構築順序とは異なることをご了承ください。構築するのであればS3から順に遡って行き、最後にAthena作るのが良いです。

目次

各種設定

Amazon CloudWatch

Amazon Kinesis Data Firehose(以下Firehose)に送るにあたり、サブスクリプションフィルタを指定します。

サブスクリプションフィルタの設定

設定する為に、先にFirehoseを作成する必要があります。
Firehoseが構築済であれば、CloudTrailログを出力しているロググループの管理画面を開き、右下の「作成>Create kinesis Firehose subscription filter」を押下。

f:id:mhirox:20210822161156p:plain

その後はコンソールで指定された値を登録。

IAM RoleはAmazon CloudWatch Logs UserGuideの例3手順8-手順11を参考にしてください。

ログ形式は「その他」でパターンには何も記載せず、サブスクリプションフィルタ名だけ設定。

Amazon Kinesis Data Firehose

設定する為に、先にLambdaを作成する必要があります。 また、以下に注意してください。

  • Lambdaを指定すること
  • データ圧縮は使用しない事(CloudWatch Logsから送信されたデータは、既にgzipレベル6圧縮で圧縮されてます。)

迷うことがあればAmazon CloudWatch Logs UserGuideの例3を参考にしてください。
(例はCLIで操作してますが、設定値はコンソール上でも同一です。)

Lambdaが必要な理由

BlackBeltのQAに以下の記載があります。

Q8.
Amazon Kinesis Firehose を利用してCloudWatch LogsをS3に転送してそれをAthena で分析したいのですが、
Kinesis Firehoseを通すと{json}{json}のように1行に複数のJSONオブジェクトが保存されるようです。
このデータを効率的にAthenaで分析するにはどういった方法がありますか??
A8.
Amazon Kinesis FirehoseにはData TransformationをAWS Lambdaで行う機能がございますので,こちらを使って所望の形式に変換すると良いです.

出典: AWS Solutions Architectブログ - AWS Black Belt Online Seminar「Amazon Athena」の資料およびQA公開

AWS Lambda

設計図(ブループリント)をそのまま利用することが出来ます。
Lambdaの画面から「関数の作成」を選択した後、以下の画面で「設計図の使用>kinesis-firehose-cloudwatch-logs-processor-python」を選択します。

f:id:mhirox:20210822163150p:plain

私はこのLambdaの実行IAMロールを新規作成しました。また、作成後にLambdaの実行時間を1分に伸ばしてあげてください。そのほかはデフォルトでOKです。

Amazon S3

普通に構築するだけでOKです。必要に応じて暗号化なりライフサイクルルールなり設定してください。

Amazon Athena

以下のクエリを順に実行します。

1. データベースの作成

CREATE DATABASE <TABLE名> ;

2. テーブルの作成

以下の値はご自身の環境に合わせて設定ください。

  • データベース名
  • テーブル名
  • S3バケットパス

また、データの開始日が2020年1月1日以前である場合、projection.datehour.rangeも任意に変更してください。

CREATE EXTERNAL TABLE IF NOT EXISTS <データベース名>.<テーブル名> (
eventversion STRING,
useridentity STRUCT<
               type:STRING,
               principalid:STRING,
               arn:STRING,
               accountid:STRING,
               invokedby:STRING,
               accesskeyid:STRING,
               userName:STRING,
sessioncontext:STRUCT<
attributes:STRUCT<
               mfaauthenticated:STRING,
               creationdate:STRING>,
sessionissuer:STRUCT<  
               type:STRING,
               principalId:STRING,
               arn:STRING, 
               accountId:STRING,
               userName:STRING>>>,
eventtime STRING,
eventsource STRING,
eventname STRING,
awsregion STRING,
sourceipaddress STRING,
useragent STRING,
errorcode STRING,
errormessage STRING,
requestparameters STRING,
responseelements STRING,
additionaleventdata STRING,
requestid STRING,
eventid STRING,
resources ARRAY<STRUCT<
               ARN:STRING,
               accountId:STRING,
               type:STRING>>,
eventtype STRING,
apiversion STRING,
readonly STRING,
recipientaccountid STRING,
serviceeventdetails STRING,
sharedeventid STRING,
vpcendpointid STRING
) PARTITIONED BY (
  datehour string 
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) 
LOCATION 's3://<S3バケットパス>/'
TBLPROPERTIES (
 "projection.enabled" = "true",
 "projection.datehour.type" = "date",
 "projection.datehour.range" = "2020/01/01/00,NOW",
 "projection.datehour.format" = "yyyy/MM/dd/HH",
 "projection.datehour.interval" = "1",
 "projection.datehour.interval.unit" = "HOURS",
 "storage.location.template" = "s3://<S3バケットパス>/${datehour}"
  );

おわりに

特にこのブログにまとめておきたかったのは、Firehose経由でS3に送った際、Athenaでクエリ出来ない事とその解決方法です。
(オブジェクトが全て1行にまとまってしまうことへの対応方法。)

とはいえ、AWSは常に進化し続けるサービスです。近い将来このブログ記事が不要となる事を祈っています。