0%

AWS Lambda로 SQL 배치 작업 하기(Feat. 매출 정리)

이 글을 읽고 나면

  • AWS Lambda에 라이브러리를 사용하는 방법을 알 수 있습니다.
  • AWS Lambda로 배치 작업을 어떻게 사용하는지 알 수 있습니다.
  • PySQL과 pandas를 사용해서 SQL을 csv로 저장하는 방법을 알 수 있습니다.

배경

제가 진행하는 프로젝트에는 여러 가게들이 입점해 있습니다. 이 가게들의 매출 데이터를 정리해서 결제 금액을 정산해야 합니다. 이를 위해서 매주 월요일마다 지난 일주일 동안의 매출 데이터를 정리해서 csv 파일로 사장님에게 보내야 합니다. 매주 해야하는 작업이라 AWS Lambda를 사용해서 자동화하려고 합니다.

요구사항

  • 매주 월요일마다 지난 일주일 동안의 매출 데이터를 정리해서 csv 파일로 저장합니다.
  • csv 파일은 PM의 확인 가능하게 google drive에 저장합니다.
  • 매출 데이터는 RDS에 저장되어 있습니다.

Lambda에 라이브러리 추가하기

AWS Lambda는 외장 라이브러리를 사용할 수 없습니다. 따라서 레이어를 사용해서 라이브러리를 추가해야 합니다. 레이어를 사용하면 라이브러리를 Lambda에 추가할 수 있습니다.

  1. lambda 함수 생성
    함수 생성
    이름을 설정하고 함수를 생성합니다.

  2. 계층 생성
    계층 생성
    좌측 메뉴에서 추가 리소스 > 계층을 선택합니다.

  3. 로컬 폴더 생성
    로컬 폴더 생성
    라이브러리를 다운로드할 로컬 폴더(python)를 생성합니다.

  4. 라이브러리 다운로드

    필요한 라이브러리를 다운로드합니다.

    1
    2
    3
    4
    5
    pip3 install pymysql -t .
    pip3 install google-api-python-client -t .
    pip3 install oauth2client -t .
    pip3 install httplib2 -t .
    rm -r *.dist-info __pycache__
  5. S3에 업로드
    파일 초과
    파일 크기가 50MB를 초과하면 S3를 통해 업로드를 합니다.
    S3 업로드

  6. 계층 생성
    계층 생성

  7. 함수에 계층 추가
    계층 추가
    생성한 함수에 돌아와서 Add a layer를 클릭합니다.
    계층 추가

Lambda 함수 작성

함수 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import json
import pymysql
import shutil
from datetime import datetime, timedelta, timezone
from httplib2 import Http
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from oauth2client.service_account import ServiceAccountCredentials
import os
import csv
# Google API 요청 시 필요한 권한 유형
SCOPES = ['https://www.googleapis.com/auth/drive']

KST = timezone(timedelta(hours=9))
now_in_kst = datetime.now(KST)

weekend_in_kst = now_in_kst - timedelta(days=1)
weekstart_in_kst = now_in_kst - timedelta(days=7)


# SQL 쿼리
# 쿼리 결과를 /tmp 폴더에 csv 파일로 저장
def sql(event):
#sql 연결
conn = pymysql.connect(
host=event["hostname"]
,port=event["port"]
,user=event["username"]
,password=event["password"]
,database=event["database"]
)

# 가게 정보 -> 추후에 자동화 변경
store = {"2": "dream", "4": "bana", "5":"orda"}
try:
for key, value in store.items():
# 커서 생성
with conn.cursor() as cursor:
# SQL 쿼리 실행
sql_query = f"SQL 쿼리"
cursor.execute(sql_query)

# 결과를 CSV 파일로 저장
csv_file_path = f"/tmp/{value}.csv"
with open(csv_file_path, mode='w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 컬럼 이름 (헤더) 쓰기
writer.writerow([i[0] for i in cursor.description])
# 데이터 쓰기
for row in cursor.fetchall():
writer.writerow(row)
finally:
# 데이터베이스 연결 종료
conn.close()

# drive에 폴더를 생성하고 csv 파일을 업로드
def run(event):
sql(event)
credentials = ServiceAccountCredentials.from_json_keyfile_name('token.json', SCOPES)
http_auth = credentials.authorize(Http())

# Drive API 서비스 객체 생성
service = build('drive', 'v3', http=http_auth)

# Google Drive에 새 폴더 생성 (상위 폴더 내부에)
new_folder_metadata = {
'name': datetime.today().strftime("%y_%m_%d"),
'mimeType': 'application/vnd.google-apps.folder',
'parents': [event["parent_folder_id"]]
}
new_folder = service.files().create(body=new_folder_metadata, fields='id').execute()
new_folder_id = new_folder.get('id')
print(f'Created new folder ID: {new_folder_id}')

local_folder_path = '/tmp'

# 로컬 폴더의 파일들을 업로드
for file_name in os.listdir(local_folder_path):
file_path = os.path.join(local_folder_path, file_name)
if os.path.isfile(file_path):
file_metadata = {'name': file_name, 'parents': [new_folder_id]}
media = MediaFileUpload(file_path, mimetype='text/csv')
file_info = service.files().create(body=file_metadata, media_body=media, fields='id,webViewLink').execute()

print(f"Uploaded {file_name}:")
print("File ID:", file_info['id'])
print("Web View Link:", file_info['webViewLink'])

def lambda_handler(event, context):
run(event)
return {
'statusCode': 200,
'body': json.dumps('실행 성공!!')
}

위의 코드를 보면 event[“dict”]같이 딕셔너리 타입으로 값을 받아서 사용하는 것을 볼 수 있습니다. 이는 Lambda 함수를 실행할 때 입력값으로 넣어주는 것입니다.

  • Configure event
    이벤트
  • 아래처럼 이벤트를 생성해서 실행할 때 입력값으로 넣어줍니다.
    이벤트 생성
    1
    2
    3
    4
    5
    6
    7
    8
    {
    "hostname": "RDS 호스트명",
    "port": 3306,
    "username": "유저명",
    "password": "비밀번호",
    "database": "데이터베이스명",
    "parent_folder_id": "상위 폴더 ID"
    }

테스트 실패

  • 오류
    테스트 실패
    테스트를 실행하면 시간 초과가 발생합니다.
  • 구성 > 일반 구성 > 편집
    제한 시간
  • 제한 시간을 늘려줍니다. (3초 -> 20초)
    제한 시간

테스트 성공

  • deploy가 켜져 있으면 코드가 배포(저장)되지 않았습니다. deploy를 누르고 다시 테스트를 실행합니다.
    deploy
  • 실행 성공 (7.6초)
    성공
  • 드라이브 확인
    드라이브

배치 작업 설정

  • 함수 개요 > 트리거 추가를 클릭합니다.
    트리거 추가
  • EventBridge를 사용하면 주기적으로 실행할 수 있습니다.
    트리거 추가
    cron 표현식을 사용해서 주기를 설정합니다. UTC기준으로 설정하므로 주의합니다.
  • 이벤트를 설정 하기 위해서 이벤트 브릿지 상세에 들어갑니다.
    트리거 확인
  • 이벤트 브릿지 편집
    브릿지 편집
  • 대상 선택 > 추가 설정
    대상 선택
  • 입력값을 설정하고 규칙 업데이트를 클릭합니다.
    입력값

모니터링

  • 람다로 돌아와서 모니터링 > 로그를 클릭하면 그래프로 확인할 수 있습니다.
    모니터
  • CloudWatch > 로그 스트림로 들어가서 상세 로그를 확인할 수 있습니다.
    로그