ETL 實戰指南:使用 Python 與 Airflow 每日自動抓取 TikTok 熱門數據並寫入數據倉庫

在數據驅動的時代,能夠即時獲取並管理社交平台上的資訊是許多企業的核心需求。TikTok 作為全球成長最快的社交平台之一,提供了極其豐富的數據資源。本文將透過一個實戰案例,示範如何利用 Python 搭配 LuckData TikTok API,自動化抓取熱門影片、評論、挑戰、收藏等資料,並透過 ETL 流程存入 PostgreSQL 或 BigQuery,最後設計基本的資料品質監控機制。

一、系統設計概述

這套系統目標為每日自動執行以下任務:

  1. 擷取熱門 TikTok 資訊(影片、評論、挑戰、收藏)

  2. 儲存至數據倉庫(PostgreSQL / BigQuery)

  3. 進行資料清洗、轉換與品質檢查

  4. 記錄執行狀況與異常日誌

技術選型:

類別

工具/技術

語言

Python 3.10+

調度工具

Apache Airflow(可選 cron)

數據來源

LuckData TikTok API

數據倉庫

PostgreSQL / BigQuery

驗證與日誌

JSONSchema / logging

二、資料擷取階段(Extract)

我們使用 LuckData 提供的 API 端點,以下為擷取熱門影片的範例:

import requests

def get_hot_videos(count=20):

headers = {

'X-Luckdata-Api-Key': 'your_luckdata_key'

}

url = f"https://luckdata.io/api/tiktok-api/X2ZbQZ1YsWij?count={count}&cursor=0&keywords=trending"

response = requests.get(url, headers=headers)

return response.json()

同理,也可對應抓取評論、挑戰與收藏:

  • get comment list by video

  • get challenge info

  • get collection list by user id

  • get music post video list

三、資料轉換階段(Transform)

擷取回來的原始 JSON 通常結構複雜,需轉換為資料庫易於儲存的格式:

def transform_video_data(raw_data):

videos = []

for item in raw_data.get("data", []):

videos.append({

"video_id": item.get("id"),

"author": item.get("author", {}).get("unique_id"),

"description": item.get("desc"),

"create_time": item.get("create_time"),

"like_count": item.get("statistics", {}).get("digg_count"),

"comment_count": item.get("statistics", {}).get("comment_count")

})

return videos

這樣可將 JSON 結構壓平為關聯式資料庫結構。

四、資料載入階段(Load)

以 PostgreSQL 為例,將轉換後的資料寫入資料表:

import psycopg2

def load_to_postgres(video_data, conn_params):

conn = psycopg2.connect(**conn_params)

cursor = conn.cursor()

for v in video_data:

cursor.execute("""

INSERT INTO tiktok_videos (video_id, author, description, create_time, like_count, comment_count)

VALUES (%s, %s, %s, %s, %s, %s)

ON CONFLICT (video_id) DO NOTHING

""", (

v['video_id'], v['author'], v['description'],

v['create_time'], v['like_count'], v['comment_count']

))

conn.commit()

cursor.close()

conn.close()

若使用 BigQuery,可用 google-cloud-bigquery 實現。

五、自動化流程設計

使用 Apache Airflow 編寫 DAG,實現每日自動調度:

from airflow import DAG

from airflow.operators.python import PythonOperator

from datetime import datetime

from my_etl_scripts import get_hot_videos, transform_video_data, load_to_postgres

default_args = {

'owner': 'airflow',

'start_date': datetime(2024, 1, 1),

'retries': 1

}

with DAG('tiktok_etl_daily', default_args=default_args, schedule_interval='@daily') as dag:

extract = PythonOperator(task_id='extract', python_callable=get_hot_videos)

transform = PythonOperator(task_id='transform', python_callable=transform_video_data)

load = PythonOperator(task_id='load', python_callable=load_to_postgres)

extract >> transform >> load

六、資料品質檢查與監控

常見的資料驗證項目:

  • 欄位不為空

  • 數據類型正確(如時間戳、數值欄位)

  • 重複紀錄去除

可使用 Python 結合 jsonschema 或自定義校驗邏輯進行:

from jsonschema import validate, ValidationError

video_schema = {

"type": "object",

"properties": {

"video_id": {"type": "string"},

"author": {"type": "string"},

"like_count": {"type": "integer"}

},

"required": ["video_id", "author"]

}

def validate_video(record):

try:

validate(instance=record, schema=video_schema)

return True

except ValidationError:

return False

七、擴充應用場景

  1. 行銷趨勢監測:分析每日熱門影片與挑戰標籤

  2. 競品觀察:追蹤指定帳號的收藏、音樂或影片變化

  3. 使用者行為建模:匯整評論、喜好、影片內容做用戶分群

八、結語

透過本篇實戰指引,我們從 0 到 1 搭建了一個每日自動化擷取 TikTok 數據的 ETL 系統。LuckData API 的靈活性與穩定性,結合 Python 的簡潔與 Airflow 的調度能力,讓我們能輕鬆實現跨平台的資料整合與分析。若要將系統進一步擴充為實時處理架構,可搭配 Kafka 與 Spark Streaming,或透過 GCP 的 Dataflow 實現無伺服器的流式運算。

TikTok 的數據世界非常廣闊,而有效擷取與整理這些資訊,是數位轉型的關鍵一步。

Articles related to APIs :