別名・略称:(なし)
ETLツール(ETL Tool):データ抽出・変換・ロードの自動化ツール
| ツール | タイプ | 特徴 |
|---|---|---|
| Apache Airflow | OSS / Python | DAG でジョブ定義、 デファクト |
| dbt | OSS / SQL | DWH 内変換に特化(ELT) |
| Embulk | OSS / Java | 並列バルクロードに強い |
| Talend | 商用 / GUI | 大企業向け統合 |
ETL は概念図で表すのが分かりやすい:
SSDSE データを使った ETL の例:
prefecture_stats テーブルに INSERTSSDSE-B-2026(47 都道府県・2023 年データ)を題材にした最小コード:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | # pandas で簡易 ETL import pandas as pd # Extract: CSV から抽出 df = pd.read_csv('data/raw/SSDSE-B-2026.csv', encoding='utf-8', skiprows=1) # Transform: 列名統一、 欠損除去、 型変換 df = df.rename(columns={'地域コード': 'pref_code'}) df = df.dropna(subset=['消費支出']) df['年'] = df['年'].astype(int) # Load: SQLite に書き込み(実運用は PostgreSQL / BigQuery 等) import sqlite3 conn = sqlite3.connect('data/processed/etl_demo.db') df.to_sql('prefecture_stats', conn, if_exists='replace', index=False) |
SSDSE-B-2026 のような公的データを定期更新する場合、 手書きスクリプトより専用 ETL ツールを使うと再実行性・モニタリング・依存関係管理が一気に楽になる。
| ツール | タイプ | 強み | SSDSE-B での使い所 |
|---|---|---|---|
| Apache Airflow | ワークフローエンジン | Python DAG, スケジュール | 年次データの自動取得→変換→ロード |
| dbt | SQL ベース変換 | Git連携, テスト | DWH に入った後の集計クエリ管理 |
| Fivetran / Airbyte | コネクタ | API 連携豊富 | e-Stat API → BigQuery |
| Talend / Informatica | GUI ETL | ノーコード | 業務側がメンテ |
| Apache NiFi | ストリーミング | フロー視覚化 | センサー・ログのリアルタイム取り込み |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | # Airflow DAG で SSDSE-B-2026 を ETL する例
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import pandas as pd
import sqlite3
def extract(**ctx):
df = pd.read_csv('data/raw/SSDSE-B-2026.csv',
encoding='cp932', skiprows=1)
df.to_parquet('/tmp/ssdse_raw.parquet')
def transform(**ctx):
df = pd.read_parquet('/tmp/ssdse_raw.parquet')
df = df[df['年度']==2023].copy()
df['高齢化率'] = df['65歳以上人口']/df['総人口']*100
df.to_parquet('/tmp/ssdse_t.parquet')
def load(**ctx):
df = pd.read_parquet('/tmp/ssdse_t.parquet')
con = sqlite3.connect('ssdse.db')
df.to_sql('ssdse_b', con, if_exists='replace', index=False)
con.close()
with DAG('ssdse_b_etl', start_date=datetime(2026,1,1),
schedule='@yearly', catchup=False) as dag:
t1 = PythonOperator(task_id='extract', python_callable=extract)
t2 = PythonOperator(task_id='transform', python_callable=transform)
t3 = PythonOperator(task_id='load', python_callable=load)
t1 >> t2 >> t3
|
dbt は SQL ベースで Transform を書く。 SSDSE-B-2026 を BigQuery にロード後、 次の models/ssdse_b_summary.sql のような変換を Git 管理する。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | -- models/ssdse_b_aged.sql
{{ config(materialized='table') }}
with src as (
select 都道府県 as prefecture,
年度 as year,
総人口 as population,
65歳以上人口 as aged_pop
from {{ source('raw','ssdse_b_2026') }}
where 年度 = 2023
)
select prefecture, population, aged_pop,
round(aged_pop * 100.0 / population, 2) as aged_ratio
from src
order by aged_ratio desc
|
ref() でモデル間依存を解決し DAG を自動生成tests: not_null, unique, accepted_values で品質保証snapshots: SCD2 で履歴管理docs: 自動ドキュメント+カラム説明