데이터가 크지 않다면, pandas + jupyter의 조합으로 데이터를 분석하는 것이 편할 때가 있다.
단, 데이터 pipeline이 서버 상에서 이루어지는 것이 보통이고, AWS를 사용한다면 S3에 저장될 가능성이 높은데,
작업을 할때 마다 일일이 다운 받기는 귀찮은 작업이다.
준비
jupyter를 실행하고 있는 instance에 S3의 target bucket을 읽을 수 있도록 role을 부여한다(IAM)
import boto3
import pyarrow
import pyarrow.parquet as parquet
import pyarrow.orc as orc
import pandas as pd
def read_surprise_table(obj):
buf = obj.get()['Body'].read()
reader = pyarrow.BufferReader(buf)
data = orc.ORCFile(reader)
df = data.read().to_pandas()
print("df.columns: ", df.columns)
print("df.shape: ", df.shape)
return df
s3 = boto3.resource('s3')
bucket = 'bucket_name' # bucket name
s3bucket = s3.Bucket(bucket)
merged_df = pd.DataFrame()
path = 'folder1/folder2/dt=' + today_str + '/' # bucket name을 제외한 object path
# 데이터가 여러 파일로 나누어진 경우 loop돌면서 읽고 하나의 dataframe으로 concat한다.
for obj in s3bucket.objects.filter(Delimiter='/', Prefix=path):
print("bucket:{}, key:{}".format(obj.bucket_name, obj.key))
if obj.key.endswith("_SUCCESS") :
continue
tmp = read_surprise_table(obj)
if merged_df.shape[0] == 0 :
merged_df = tmp
else :
merged_df = pd.concat([merged_df, tmp], axis=0)
print("merged_df.columns: ", merged_df.columes)
print("merged_df.shape: ", merged_df.shape)