ETL 實戰指南:使用 Python 與 Airflow 每日自動抓取 TikTok 熱門數據並寫入數據倉庫
在數據驅動的時代,能夠即時獲取並管理社交平台上的資訊是許多企業的核心需求。TikTok 作為全球成長最快的社交平台之一,提供了極其豐富的數據資源。本文將透過一個實戰案例,示範如何利用 Python 搭配 LuckData TikTok API,自動化抓取熱門影片、評論、挑戰、收藏等資料,並透過 ETL 流程存入 PostgreSQL 或 BigQuery,最後設計基本的資料品質監控機制。
一、系統設計概述
這套系統目標為每日自動執行以下任務:
擷取熱門 TikTok 資訊(影片、評論、挑戰、收藏)
儲存至數據倉庫(PostgreSQL / BigQuery)
進行資料清洗、轉換與品質檢查
記錄執行狀況與異常日誌
技術選型:
類別 | 工具/技術 |
---|---|
語言 | Python 3.10+ |
調度工具 | Apache Airflow(可選 cron) |
數據來源 | LuckData TikTok API |
數據倉庫 | PostgreSQL / BigQuery |
驗證與日誌 | JSONSchema / logging |
二、資料擷取階段(Extract)
我們使用 LuckData 提供的 API 端點,以下為擷取熱門影片的範例:
import requestsdef get_hot_videos(count=20):
headers = {
'X-Luckdata-Api-Key': 'your_luckdata_key'
}
url = f"https://luckdata.io/api/tiktok-api/X2ZbQZ1YsWij?count={count}&cursor=0&keywords=trending"
response = requests.get(url, headers=headers)
return response.json()
同理,也可對應抓取評論、挑戰與收藏:
get comment list by video
get challenge info
get collection list by user id
get music post video list
三、資料轉換階段(Transform)
擷取回來的原始 JSON 通常結構複雜,需轉換為資料庫易於儲存的格式:
def transform_video_data(raw_data):videos = []
for item in raw_data.get("data", []):
videos.append({
"video_id": item.get("id"),
"author": item.get("author", {}).get("unique_id"),
"description": item.get("desc"),
"create_time": item.get("create_time"),
"like_count": item.get("statistics", {}).get("digg_count"),
"comment_count": item.get("statistics", {}).get("comment_count")
})
return videos
這樣可將 JSON 結構壓平為關聯式資料庫結構。
四、資料載入階段(Load)
以 PostgreSQL 為例,將轉換後的資料寫入資料表:
import psycopg2def load_to_postgres(video_data, conn_params):
conn = psycopg2.connect(**conn_params)
cursor = conn.cursor()
for v in video_data:
cursor.execute("""
INSERT INTO tiktok_videos (video_id, author, description, create_time, like_count, comment_count)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (video_id) DO NOTHING
""", (
v['video_id'], v['author'], v['description'],
v['create_time'], v['like_count'], v['comment_count']
))
conn.commit()
cursor.close()
conn.close()
若使用 BigQuery,可用 google-cloud-bigquery
實現。
五、自動化流程設計
使用 Apache Airflow 編寫 DAG,實現每日自動調度:
from airflow import DAGfrom airflow.operators.python import PythonOperator
from datetime import datetime
from my_etl_scripts import get_hot_videos, transform_video_data, load_to_postgres
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 1, 1),
'retries': 1
}
with DAG('tiktok_etl_daily', default_args=default_args, schedule_interval='@daily') as dag:
extract = PythonOperator(task_id='extract', python_callable=get_hot_videos)
transform = PythonOperator(task_id='transform', python_callable=transform_video_data)
load = PythonOperator(task_id='load', python_callable=load_to_postgres)
extract >> transform >> load
六、資料品質檢查與監控
常見的資料驗證項目:
欄位不為空
數據類型正確(如時間戳、數值欄位)
重複紀錄去除
可使用 Python 結合 jsonschema
或自定義校驗邏輯進行:
from jsonschema import validate, ValidationErrorvideo_schema = {
"type": "object",
"properties": {
"video_id": {"type": "string"},
"author": {"type": "string"},
"like_count": {"type": "integer"}
},
"required": ["video_id", "author"]
}
def validate_video(record):
try:
validate(instance=record, schema=video_schema)
return True
except ValidationError:
return False
七、擴充應用場景
行銷趨勢監測:分析每日熱門影片與挑戰標籤
競品觀察:追蹤指定帳號的收藏、音樂或影片變化
使用者行為建模:匯整評論、喜好、影片內容做用戶分群
八、結語
透過本篇實戰指引,我們從 0 到 1 搭建了一個每日自動化擷取 TikTok 數據的 ETL 系統。LuckData API 的靈活性與穩定性,結合 Python 的簡潔與 Airflow 的調度能力,讓我們能輕鬆實現跨平台的資料整合與分析。若要將系統進一步擴充為實時處理架構,可搭配 Kafka 與 Spark Streaming,或透過 GCP 的 Dataflow 實現無伺服器的流式運算。
TikTok 的數據世界非常廣闊,而有效擷取與整理這些資訊,是數位轉型的關鍵一步。