論文一覧に戻る 📚 用語集トップ 🗺 概念マップ
📚 用語解説
📚 用語解説
データの分散処理
Distributed Data Processing
大規模データ 並列計算 MapReduce

🔖 キーワード索引

💡 30秒で分かる 📍 文脈 🎨 直感 📐 数式 🔬 記号 🧮 実値計算 🐍 Python ⚠️ 落とし穴 🌐 関連手法 🔗 関連用語 📚 グループ教材 🗺 概念マップ

💡 30秒で分かる結論

データの分散処理 = データを複数のマシン (ノード) に分割して並列に計算させる方式。1 台では時間・メモリが足りないデータを、たくさんの普通のマシンで分担する。

📍 あなたが今見ているもの

用語集 → ビッグデータ / AI インフラ 分野 → データの分散処理 (Distributed Data Processing)AI クラウド 教材で頻出する横串の概念です。深層学習で 1 GPU では足りないとき、ペタバイト級のログを 1 台では処理できないときに登場します。

本ページの SSDSE 文脈では 47 都道府県 = 47 ワーカのメタファーを使って、Pythonの concurrent.futures / multiprocessing / dask で並列処理する例を見せます。

🎨 直感で掴む

「47 都道府県を 1 人で巡業」 vs 「47 人が同時に各県で集計し東京に集約」のどちらが早いか考えてください。後者がデータ分散処理の発想です。集計内容 (map)、集約方法 (reduce)、移動コスト (シャッフル) を慎重に設計しないと、人数を増やしても遅くなることもあります。

3 種類の並列性

種類分けるもの
データ並列入力データを分割47 都道府県別に同じ集計を分担
タスク並列違う計算を同時に人口集計と高齢化率集計を並行
パイプライン並列段階を流す読込→前処理→推論→保存を流れ作業

MapReduce のメタファー: map は「各都道府県で集計する係」、reduce は「47 県の結果を 1 つにまとめる係」、shuffle は「集計表を持ち寄って同じ項目どうしを並べる作業」。

📐 数式・定義

Speedup (高速化倍率):

$$ S(n) = \frac{T(1)}{T(n)} $$

$T(n)$ は $n$ ノードで処理したときの実時間。理想は $S(n) = n$。

Amdahl の法則:並列化できる部分の比 $p$

$$ S(n) = \frac{1}{(1-p) + p/n} \xrightarrow{n \to \infty} \frac{1}{1-p} $$

どんなに $n$ を増やしても、逐次部分 $1-p$ が高速化の限界を決める。

Gustafson の法則:問題サイズを増やせるなら

$$ S(n) = (1-p) + p \cdot n $$

MapReduce の意味論:

$$ \mathrm{map}: (k_1, v_1) \to [(k_2, v_2)], \quad \mathrm{reduce}: (k_2, [v_2]) \to (k_2, v_3) $$

SSDSE では $k_1$=行番号、$v_1$=行、$k_2$=都道府県、$v_2$=値リスト、$v_3$=合計や平均などの集計結果。

CAP 定理: 分散システムは Consistency / Availability / Partition tolerance のうち同時に満たせるのは 2 つまで。

$$ C \cap A \cap P = \emptyset $$

🔬 数式を言葉で読み解く

記号読み方意味・例
$n$number of workersノード数 (47 都道府県なら 47)
$T(n)$time$n$ 並列での実行時間
$p$parallelizable fraction並列化可能比率 (0〜1)
$E(n)$efficiency$S(n)/n$。100%が理想
$k_1, k_2$keysmap/reduce のキー (例: 都道府県)
$v_1, v_2, v_3$values入力 / 中間 / 集計後の値

🧮 実値で計算してみる — SSDSE-B-2026 を 47 並列で集計

SSDSE-B-2026 は 47 都道府県 × 約 30 年 = 約 1,400 行。これは「47 個に綺麗に切れるデータ」のおもちゃ例として最適です。

Amdahl の法則の数値例:

並列化率 $p$$n=4$ の Speedup$n=47$ の Speedup上限
0.501.60×1.96×2.00×
0.903.08×8.32×10.00×
0.993.88×32.7×100×

「ほぼ並列化できる ($p=0.99$) 処理」でも、47 並列で 32 倍にしかならない。逐次部分の最適化が命。

SSDSE 集計のコスト見積もり:

処理1 都道府県あたり47 都道府県逐次47 並列
行平均集計1 ms47 ms3 ms (起動オーバーヘッド込み)
ML 特徴量計算300 ms14.1 s450 ms
回帰モデル fit800 ms37.6 s1.1 s

🐍 Python 実装 — 47 都道府県データを並列処理する

① まずは逐次処理 (ベースライン)

import pandas as pd
import time

df = pd.read_csv('data/raw/SSDSE-B-2026.csv', encoding='utf-8', skiprows=1)

def heavy_aggregate(group):
    # 何か重い計算 (例: 移動平均・分位点・線形回帰)
    return {
        'mean_pop': group['Total_population'].mean(),
        'mean_inc': group['Income_per_capita'].mean(),
        'aging': group['Aging_rate'].mean(),
        'rows': len(group),
    }

t0 = time.time()
result_seq = {p: heavy_aggregate(g) for p, g in df.groupby('Prefecture')}
print(f'逐次  : {time.time() - t0:.3f} s, 件数 = {len(result_seq)}')

② concurrent.futures.ProcessPoolExecutor で 47 並列

from concurrent.futures import ProcessPoolExecutor

def task(args):
    pref, group = args
    return pref, heavy_aggregate(group)

groups = list(df.groupby('Prefecture'))
t0 = time.time()
with ProcessPoolExecutor(max_workers=8) as ex:
    result_par = dict(ex.map(task, groups))
print(f'並列  : {time.time() - t0:.3f} s')

③ Dask: 大規模 CSV を chunk 並列

import dask.dataframe as dd

ddf = dd.read_csv('data/raw/SSDSE-B-2026.csv', skiprows=1, blocksize='8MB')
agg = ddf.groupby('Prefecture').agg({
    'Total_population': 'mean',
    'Income_per_capita': 'mean',
    'Aging_rate': 'mean',
}).compute()        # ここで初めて実計算 (遅延評価)
print(agg.head())

④ MapReduce 風に 47 県を手で書く

from collections import defaultdict

# map: (行) -> (都道府県, 値) の列を吐く
def mapper(row):
    yield row['Prefecture'], row['Total_population']

# shuffle: 同じキーを集める
buckets = defaultdict(list)
for _, row in df.iterrows():
    for k, v in mapper(row):
        buckets[k].append(v)

# reduce: 各キーで合計
result = {k: sum(vs) / len(vs) for k, vs in buckets.items()}
print(list(result.items())[:5])

⑤ Ray でモデル学習を 47 並列

import ray
from sklearn.linear_model import LinearRegression

ray.init(ignore_reinit_error=True)

@ray.remote
def fit_one(pref, sub):
    X = sub[['Total_population', 'Aging_rate']].values
    y = sub['Income_per_capita'].values
    if len(sub) < 3:
        return pref, None
    m = LinearRegression().fit(X, y)
    return pref, (m.coef_.tolist(), m.intercept_)

futures = [fit_one.remote(p, g) for p, g in df.groupby('Prefecture')]
models = dict(ray.get(futures))
print(list(models.items())[:3])

⑥ Amdahl の法則を可視化

import numpy as np
import matplotlib.pyplot as plt

n = np.arange(1, 48)
for p in [0.5, 0.9, 0.99]:
    plt.plot(n, 1 / ((1 - p) + p / n), label=f'p={p}')
plt.xlabel('# workers (都道府県数)'); plt.ylabel('Speedup')
plt.legend(); plt.grid(True)
plt.title('Amdahl: 47 都道府県を並列にしても逐次部が上限を決める')

⚠️ よくある落とし穴

❌ 小さなデータで並列化する
プロセス起動・シリアライズのオーバーヘッドが本処理より長くなり、逐次より遅くなる。SSDSE 規模 (1400 行) では multiprocessing の意味はない。Dask/Spark は GB 級から効く。
❌ データ偏り (skew) を無視
「東京だけ膨大」のような skew があると 1 ワーカが詰まる。事前にハッシュ分割やソルティング (salting) でバランスを取る。
❌ シャッフル量を見積もらない
groupBy / join はネットワークを大量に流れる。事前集約 (combiner)、broadcast join、bucketing で削減する。
❌ 失敗を想定しない
ノード故障・ネットワーク分断は日常茶飯事。チェックポイント、リトライ、idempotent な処理設計が必須。Sparkは RDD 系統で自動復旧。
❌ CAP を「3 つ満たせる」と勘違い
分断が起きたら C か A のどちらかを諦める。「強整合 (CP) か高可用 (AP) か」を業務要件で先に決める。

🗺 概念マップ

データ分散処理 ★
├─ パラダイム
│   ├─ MapReduce (Hadoop)
│   ├─ DAG実行 (Spark, Dask)
│   ├─ ストリーム (Flink, Beam)
│   └─ Actor (Ray, Akka)
├─ ストレージ
│   ├─ 分散FS (HDFS / S3 / GCS)
│   ├─ 列指向 (Parquet / ORC)
│   └─ メッセージング (Kafka)
├─ 性能理論
│   ├─ Amdahl (上限あり)
│   └─ Gustafson (問題拡大)
└─ 整合性
    ├─ CAP 定理
    ├─ コンセンサス (Raft / Paxos)
    └─ 結果整合性 (eventual)