Cloud Storage에 JSON파일이 Upload 될 때, Bigquery Loads 파이프 라인 구성

2024. 8. 13. 20:44·Google Cloud

 

GCS에 JSONL파일이 업로드 되었을때  Bigquery로 Data가 올라갈 수 있게 PipeLine을 구성 해달라는 테스크를 받게 되었다.

파이프 라인을 구성하면서 생각보다 생각 할 것들이 많았다. 앞으로 내가 응용 할 수 있는 개념들이 쌓여 가는 느낌을 많이 받아 좋은 경험 이였다. 

 

PipeLine 

 GCS에 JSONL 파일 업로드 이벤트 발생 -> Cloud logging router Sink -> Pub/Sub -> Cloud Function  

 

 

 

가장 먼저 Cloud Storage Bucket을 생성한다. (구글링만 해도 bucket 생성은 많이 나오니 생략..)

생성이 완료된 후 , Cloud Pub/Sub의 topic 생성한다

잠깐, 

Cloud Pub/Sub 이란? 

메시지를 생성하는 서비스를 해당 메시지를 처리하는 서비스에서 분리하는 확장 가능한 비동기 메시징 서비스 여기서 메시징 서비스란, 메시지들을 Queue에 넣고, 차례대로 전달해주는 서비스를 뜻합니다.

 

데이터를 수집하고 배포하는 스트리밍 분석 및 데이터 통합 파이프라인에 사용 즉, GCP에서 Pub/Sub 서비스는 BigQuery, 데이터 레이크, 운영 데이터베이스로 스트리밍하기 위한 이벤트 수집


예전에 개인 프로젝트 하면서 Rabbit MQ를 찍먹 해본적이 있어 개념을 이해하는데 낯설지 않았다.
다만, 대규모 프로젝트에 다양한 플로우를 설계하고 고민하게 구독과 구독자의 관계 만들수 있는 워크플로우에 대해 깊게 공부가 필요하다


Cloud Pub/Sub Topic 생성

추 후에 Cloud Function 생성하고 트리거로 생성한 pub/sub topic 지정 할 거라 기본 구독 추가를 할 필요 없다. (자동 생성됨)

 

이제 cloud storage에 파일이 생성 되었을때 pub/sub으로 메세지를 전달하기 위한 설정이 필요하다.

 

 Cloud Logging - routing sink 설정  

 

 

여기서 아까 생성한 주제를 선택하고 싱크에 포함할 로그를 지정해야 한다.

 

resource.type="gcs_bucket"
resource.labels.bucket_name="test"
protoPayload.methodName="storage.objects.create"
protoPayload.resourceName:".jsonl"
 

Cloud logging Query의 부분이다. 더 세세한 항목의 설정이 필요하면 구글 Docs에 자세하게 설명이 나와있다.

위 작성한 것에 대해 좀 더 자세히 풀어 설명하자면,

- 리소스 타입이 gcs_bucket인 로그 항목
- 버킷 이름이 test인 GCS 버킷에서 발생한 로그 항목
- 메서드 이름이 storage.objects.create인 로그 항목(즉, 객체가 생성된 로그)
- 리소스 이름에 .jsonl 문자열이 포함된 로그 항목(즉, .jsonl 확장자를 가진 파일이 생성된 로그)

버킷에 파일을 하나 업로드 하고, Cloud logging 탐색에서 정상적으로 조회 된다면 성공!

 

 

여기 GCS에 파일이 업로드 되면 라우팅 싱크를 통해 Cloud Pub/Sub이 호출되고 마지막으로 Cloud Function에서 Bigquery로 Load 하는 과정만 남았다.

 

 

Cloud Functions을 생성할대 트리거유형을 Cloud Pub/Sub을 선택하고 주제 부분은 아까 생성한 topic을 선택한다!

 

 

(혹시나 permission 에러는 권한 문제로 인한 에러 이므로.. 생략하겠습니다.... 오류 내용을 바탕으로 서비스 계정과 IAM 권한을 추가하시면 됩니다..)

 

코드에 대해

간략하게 보여드리자면

def main(cloud_event):
    try:
        logging.info("*** pub/sub message arrive ***")
        # Print out the data from Pub/Sub, to prove that it worked
        decoded_data = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8')
        json_data = json.loads(decoded_data)

        logging.info(json_data)
        
    except Exception as e:
        logging.error(f"An error [main] occurred: {e}")
        logging.error(traceback.format_exc())

 

실제로 json_data에 어떤식으로 데이터가 담기는지 확인 가능하며, bigquery로 load 하는 과정은 레퍼런스가 너무 잘 설명되어 있기 때문에 아래 링크 참고 남겨두겠습니다..!

https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json?hl=ko

 

Cloud Storage에서 JSON 데이터 로드  |  BigQuery  |  Google Cloud

Cloud Storage의 JSON 파일을 새 테이블로 로드하거나 테이블에 추가하거나 덮어쓰는 방법을 보여줍니다. 중첩/반복 JSON 데이터 및 하이브 파티션을 나눈 JSON 데이터를 로드하는 방법을 보여줍니다. J

cloud.google.com

 

 

파이프라인에 대한 구조적인 설명은 여기까지 입니다.

조언이나, 코드에 대하여 추가로 궁금하신 부분 혹은 질문이 있으신 분들은 언제든지 댓글 남겨주세요!

 

 

저작자표시 비영리 변경금지 (새창열림)

'Google Cloud' 카테고리의 다른 글

[Cloud Storage] 버킷에 CORS를 적용할 수 있다고?  (0) 2025.02.20
[Google Cloud] API Gateway에 대한 개념과 설정 방법  (0) 2025.02.11
[Cloud Storage] 이미지 업로드 성능 개선기  (0) 2025.02.10
[Secret Manager] API 를 통하여 SSH 비밀 키 추출 및 SFTP 전송  (0) 2024.04.09
[Cloud Storage] MD5 hash 값을 활용하여 파일 무결성 체크  (0) 2024.03.25
'Google Cloud' 카테고리의 다른 글
  • [Google Cloud] API Gateway에 대한 개념과 설정 방법
  • [Cloud Storage] 이미지 업로드 성능 개선기
  • [Secret Manager] API 를 통하여 SSH 비밀 키 추출 및 SFTP 전송
  • [Cloud Storage] MD5 hash 값을 활용하여 파일 무결성 체크
창MIN
창MIN
  • 창MIN
    미니의 코드
    만들고 도전하는것을 좋아합니다💻
  • Guest
    Gmail
    GitHub
  • 전체
    오늘
    어제
    • 분류 전체보기 (25)
      • Google Cloud (6)
      • NodeJS (3)
      • NestJS (1)
      • Python (1)
      • DB (1)
      • Docker & Kubernetes (1)
      • Server & Infra (3)
      • CS (7)
      • Algorithm (2)
        • 개념 (2)
        • 문제 (0)
      • 개발 (0)
  • 인기 글

  • 태그

    버킷 cors
    signed url
    typeScript
    파일 무결성
    Cloud Storage
    서버 부하 분산
    cloud buckets
    nodejs
    Cors
    Cloud Function
    알고리즘
    서버 부하
    google api gateway
    cloud logging
    redoc
    Google Cloud
    cors 개념
    cors 작동
    Secret Manager
    쿠키와 세션의 개념
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.2
창MIN
Cloud Storage에 JSON파일이 Upload 될 때, Bigquery Loads 파이프 라인 구성
상단으로

티스토리툴바