포스트

[MLOps] GCP VertexAI 1(BigQuery)

VertexAI에서 BigQuery의 데이터 불러오기

[MLOps] GCP VertexAI 1(BigQuery)

BigQuery Logo

BigQuery

  • 개요: Serverless로 구성된 대용량 데이터 웨어하우스 및 분석 플랫폼
  • 특징
    • SQL 기반의 강력한 쿼리 기능 제공
    • 대용량 데이터셋을 실시간으로 분석할 수 있는 높은 성능
    • 서버리스 아키텍처로 인프라 관리 부담 없음
    • 외부 데이터 소스와 쉽게 통합 가능

데이터 분석의 변화에 적합한 구조

BigQuery Architecture

  • 전통적 데이터베이스의 한계 극복
    • BigQuery는 Serverless 구조로 확장이 쉽고 실시간 데이터 분석 가능
    • 대용량 데이터셋에서도 높은 성능 제공
  • 인프라 관리의 간소화
    • 서버 관리 없이 자동 스케일링
    • 대용량 데이터셋에 대해서도 빠르고 효율적인 쿼리 수행 가능
  • 다양한 데이터 소스 통합
    • 여러 데이터 파이프라인 및 Google Cloud 서비스와 자연스럽게 연동
    • 데이터 수집부터 분석까지 하나의 흐름으로 처리 가능

쉬운 분석과 다양한 데이터 지원

BigQuery Data Flow

  • 시각화 도구와의 강력한 연동
    • Google Data Studio, Looker 등과 연계 가능
    • SQL 기반으로 즉각적인 시각화 분석 수행
  • 다양한 데이터 포맷 지원
    • JSON, Avro, Parquet 등 다양한 형식 지원
    • 여러 데이터 소스로부터 유연하게 데이터 수집 가능
  • 유연한 스키마 처리
    • 스키마가 정의되지 않은 데이터도 쿼리 가능
    • 반정형 / 비정형 데이터 분석에 적합

VertexAI에서 BigQuery 데이터 불러오기

방법 1

  • SQL query를 통해 데이터를 전처리하고 불러오기

방법 2

  • bpd 라이브러리(bigframes.pandas)을 사용하여 데이터 불러오기 -> pandas형식으로 전처리

해당 방법은 방법1을 바탕으로 작성하였다.

데이터는 bigquery public에 있는 london_bicyclescycle_stationscycle_hire를 사용하였다.

1
2
3
4
5
6
7
8
9
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader

from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from datetime import datetime
import pandas as pd
import numpy as np
1
2
from google.cloud.bigquery import Client, QueryJobConfig
client = Client()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
query="""WITH staging AS (
    SELECT
        STRUCT(
            start_stn.name,
            ST_GEOGPOINT(start_stn.longitude, start_stn.latitude) AS POINT,
            start_stn.docks_count,
            start_stn.install_date
        ) AS starting,
        STRUCT(
            end_stn.name,
            ST_GEOGPOINT(end_stn.longitude, end_stn.latitude) AS point,
            end_stn.docks_count,
            end_stn.install_date
        ) AS ending,
        STRUCT(
            rental_id,
            bike_id,
            duration, --seconds
            ST_DISTANCE(
                ST_GEOGPOINT(start_stn.longitude, start_stn.latitude),
                ST_GEOGPOINT(end_stn.longitude, end_stn.latitude)
            ) AS distance, --meters
            start_date,
            end_date
        ) AS bike
        FROM `bigquery-public-data.london_bicycles.cycle_stations` AS start_stn
        LEFT JOIN `bigquery-public-data.london_bicycles.cycle_hire` AS b
        ON start_stn.id = b.start_station_id
        LEFT JOIN `bigquery-public-data.london_bicycles.cycle_stations` AS end_stn
        on end_stn.id = b.end_station_id
        LIMIT 100000)

SELECT * FROM staging
"""
job = client.query(query)
df = job.to_dataframe()
1
df.head()
startingendingbike
0{'name': 'New Spring Gardens Walk, Vauxhall', ...{'name': 'Broadley Terrace, Marylebone', 'poin...{'rental_id': 118913498, 'bike_id': 21845, 'du...
1{'name': 'New Spring Gardens Walk, Vauxhall', ...{'name': 'Moor Street, Soho', 'point': 'POINT(...{'rental_id': 106029120, 'bike_id': 12260, 'du...
2{'name': 'New Spring Gardens Walk, Vauxhall', ...{'name': 'Ethelburga Estate, Battersea Park', ...{'rental_id': 79563236, 'bike_id': 10009, 'dur...
3{'name': 'New Spring Gardens Walk, Vauxhall', ...{'name': 'Northumberland Avenue, Strand', 'poi...{'rental_id': 114066766, 'bike_id': 15247, 'du...
4{'name': 'New Spring Gardens Walk, Vauxhall', ...{'name': 'Doddington Grove, Kennington', 'poin...{'rental_id': 78933280, 'bike_id': 9620, 'dura...
1
2
3
4
5
6
values = df["bike"].values

duration = list(map(lambda x: x["duration"], values))
distance = list(map(lambda x: x["distance"], values))
dates = list(map(lambda x: x["start_date"], values))
data = pd.DataFrame(data = {"duration": duration, "distance": distance, "start_date": dates})
1
data = data.dropna()
1
data.head()
durationdistancestart_date
01800.05051.9615042022-04-13 17:09:00+00:00
1780.02904.8639842021-03-14 20:37:00+00:00
2780.03136.3250022018-08-22 18:33:00+00:00
3780.02109.2183172021-10-25 08:46:00+00:00
4780.01371.8885102018-08-05 13:55:00+00:00
1
data.info()
1
2
3
4
5
6
7
8
9
10
<class 'pandas.core.frame.DataFrame'>
Index: 94480 entries, 0 to 99999
Data columns (total 3 columns):
 #   Column      Non-Null Count  Dtype              
---  ------      --------------  -----              
 0   duration    94480 non-null  float64            
 1   distance    94480 non-null  float64            
 2   start_date  94480 non-null  datetime64[ns, UTC]
dtypes: datetime64[ns, UTC](1), float64(2)
memory usage: 2.9 MB
1
2
3
4
5
6
# start_date -> weekday, hour
# duration -> minute

data["weekday"] = data["start_date"].apply(lambda x: x.weekday())
data["hour"] = data["start_date"].apply(lambda x: x.time().hour)
data.drop(columns=["start_date"], inplace=True)
1
data["duration"] = data["duration"].apply(lambda x: float(x/60))
1
data.head()
durationdistanceweekdayhour
030.05051.961504217
113.02904.863984620
213.03136.325002218
313.02109.21831708
413.01371.888510613
1
2
3
4
# weekday, hour -> one-hot
# distance -> Normalization

data = pd.get_dummies(data, columns = ["weekday", "hour"], prefix = ["weekday", "hour"])
1
2
X = data.drop(["duration"],axis=1).to_numpy()
y = data["duration"].to_numpy().reshape(-1,1)

MLP 테스트

bigquery에서 추출한 데이터를 MLP모델에 학습하였다.

1
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, shuffle=True, random_state=1004)
1
2
3
4
X_train = torch.FloatTensor(X_train)
X_test = torch.FloatTensor(X_test)
y_train = torch.FloatTensor(y_train)
y_test = torch.FloatTensor(y_test)
1
2
3
4
5
6
7
8
train_loader = DataLoader(
    TensorDataset(X_train, y_train),
    batch_size = 256, shuffle=True
)
val_loader = DataLoader(
    TensorDataset(X_test, y_test),
    batch_size = 256, shuffle=False
)
1
2
3
4
5
6
7
8
9
10
11
class MLP(nn.Module):
    def __init__(self, in_dim):
        super().__init__()
        self.layer = nn.Sequential(
            nn.Linear(in_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )

    def forward(self, x):
        return self.layer(x)
1
2
in_dim = X_train.size()[1]
model = MLP(in_dim)
1
2
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
1
2
3
4
5
6
7
8
9
10
def evaluate(model, loader, criterion):
    model.eval()
    total_loss, n = 0.0, 0
    with torch.no_grad():
        for xb, yb in loader:
            pred = model(xb)
            loss = criterion(pred, yb)
            total_loss += loss.item() * xb.size(0)
            n += xb.size(0)
    return total_loss / max(n, 1)
1
2
3
4
5
6
7
8
9
10
11
12
13
epochs = 5

for epoch in range(epochs):
    model.train()
    total_loss, n = 0.0, 0

    for xb, yb in train_loader:
        pred = model(xb)
        loss = criterion(pred, yb)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.