データの分散処理 = データを複数のマシン (ノード) に分割して並列に計算させる方式。1 台では時間・メモリが足りないデータを、たくさんの普通のマシンで分担する。
用語集 → ビッグデータ / AI インフラ 分野 → データの分散処理 (Distributed Data Processing)。AI クラウド 教材で頻出する横串の概念です。深層学習で 1 GPU では足りないとき、ペタバイト級のログを 1 台では処理できないときに登場します。
本ページの SSDSE 文脈では 47 都道府県 = 47 ワーカのメタファーを使って、Pythonの concurrent.futures / multiprocessing / dask で並列処理する例を見せます。
「47 都道府県を 1 人で巡業」 vs 「47 人が同時に各県で集計し東京に集約」のどちらが早いか考えてください。後者がデータ分散処理の発想です。集計内容 (map)、集約方法 (reduce)、移動コスト (シャッフル) を慎重に設計しないと、人数を増やしても遅くなることもあります。
3 種類の並列性:
| 種類 | 分けるもの | 例 |
|---|---|---|
| データ並列 | 入力データを分割 | 47 都道府県別に同じ集計を分担 |
| タスク並列 | 違う計算を同時に | 人口集計と高齢化率集計を並行 |
| パイプライン並列 | 段階を流す | 読込→前処理→推論→保存を流れ作業 |
MapReduce のメタファー: map は「各都道府県で集計する係」、reduce は「47 県の結果を 1 つにまとめる係」、shuffle は「集計表を持ち寄って同じ項目どうしを並べる作業」。
Speedup (高速化倍率):
$$ S(n) = \frac{T(1)}{T(n)} $$
$T(n)$ は $n$ ノードで処理したときの実時間。理想は $S(n) = n$。
Amdahl の法則:並列化できる部分の比 $p$
$$ S(n) = \frac{1}{(1-p) + p/n} \xrightarrow{n \to \infty} \frac{1}{1-p} $$
どんなに $n$ を増やしても、逐次部分 $1-p$ が高速化の限界を決める。
Gustafson の法則:問題サイズを増やせるなら
$$ S(n) = (1-p) + p \cdot n $$
MapReduce の意味論:
$$ \mathrm{map}: (k_1, v_1) \to [(k_2, v_2)], \quad \mathrm{reduce}: (k_2, [v_2]) \to (k_2, v_3) $$
SSDSE では $k_1$=行番号、$v_1$=行、$k_2$=都道府県、$v_2$=値リスト、$v_3$=合計や平均などの集計結果。
CAP 定理: 分散システムは Consistency / Availability / Partition tolerance のうち同時に満たせるのは 2 つまで。
$$ C \cap A \cap P = \emptyset $$
| 記号 | 読み方 | 意味・例 |
|---|---|---|
| $n$ | number of workers | ノード数 (47 都道府県なら 47) |
| $T(n)$ | time | $n$ 並列での実行時間 |
| $p$ | parallelizable fraction | 並列化可能比率 (0〜1) |
| $E(n)$ | efficiency | $S(n)/n$。100%が理想 |
| $k_1, k_2$ | keys | map/reduce のキー (例: 都道府県) |
| $v_1, v_2, v_3$ | values | 入力 / 中間 / 集計後の値 |
SSDSE-B-2026 は 47 都道府県 × 約 30 年 = 約 1,400 行。これは「47 個に綺麗に切れるデータ」のおもちゃ例として最適です。
Amdahl の法則の数値例:
| 並列化率 $p$ | $n=4$ の Speedup | $n=47$ の Speedup | 上限 |
|---|---|---|---|
| 0.50 | 1.60× | 1.96× | 2.00× |
| 0.90 | 3.08× | 8.32× | 10.00× |
| 0.99 | 3.88× | 32.7× | 100× |
「ほぼ並列化できる ($p=0.99$) 処理」でも、47 並列で 32 倍にしかならない。逐次部分の最適化が命。
SSDSE 集計のコスト見積もり:
| 処理 | 1 都道府県あたり | 47 都道府県逐次 | 47 並列 |
|---|---|---|---|
| 行平均集計 | 1 ms | 47 ms | 3 ms (起動オーバーヘッド込み) |
| ML 特徴量計算 | 300 ms | 14.1 s | 450 ms |
| 回帰モデル fit | 800 ms | 37.6 s | 1.1 s |
① まずは逐次処理 (ベースライン)
import pandas as pd
import time
df = pd.read_csv('data/raw/SSDSE-B-2026.csv', encoding='utf-8', skiprows=1)
def heavy_aggregate(group):
# 何か重い計算 (例: 移動平均・分位点・線形回帰)
return {
'mean_pop': group['Total_population'].mean(),
'mean_inc': group['Income_per_capita'].mean(),
'aging': group['Aging_rate'].mean(),
'rows': len(group),
}
t0 = time.time()
result_seq = {p: heavy_aggregate(g) for p, g in df.groupby('Prefecture')}
print(f'逐次 : {time.time() - t0:.3f} s, 件数 = {len(result_seq)}')
② concurrent.futures.ProcessPoolExecutor で 47 並列
from concurrent.futures import ProcessPoolExecutor
def task(args):
pref, group = args
return pref, heavy_aggregate(group)
groups = list(df.groupby('Prefecture'))
t0 = time.time()
with ProcessPoolExecutor(max_workers=8) as ex:
result_par = dict(ex.map(task, groups))
print(f'並列 : {time.time() - t0:.3f} s')
③ Dask: 大規模 CSV を chunk 並列
import dask.dataframe as dd
ddf = dd.read_csv('data/raw/SSDSE-B-2026.csv', skiprows=1, blocksize='8MB')
agg = ddf.groupby('Prefecture').agg({
'Total_population': 'mean',
'Income_per_capita': 'mean',
'Aging_rate': 'mean',
}).compute() # ここで初めて実計算 (遅延評価)
print(agg.head())
④ MapReduce 風に 47 県を手で書く
from collections import defaultdict
# map: (行) -> (都道府県, 値) の列を吐く
def mapper(row):
yield row['Prefecture'], row['Total_population']
# shuffle: 同じキーを集める
buckets = defaultdict(list)
for _, row in df.iterrows():
for k, v in mapper(row):
buckets[k].append(v)
# reduce: 各キーで合計
result = {k: sum(vs) / len(vs) for k, vs in buckets.items()}
print(list(result.items())[:5])
⑤ Ray でモデル学習を 47 並列
import ray
from sklearn.linear_model import LinearRegression
ray.init(ignore_reinit_error=True)
@ray.remote
def fit_one(pref, sub):
X = sub[['Total_population', 'Aging_rate']].values
y = sub['Income_per_capita'].values
if len(sub) < 3:
return pref, None
m = LinearRegression().fit(X, y)
return pref, (m.coef_.tolist(), m.intercept_)
futures = [fit_one.remote(p, g) for p, g in df.groupby('Prefecture')]
models = dict(ray.get(futures))
print(list(models.items())[:3])
⑥ Amdahl の法則を可視化
import numpy as np
import matplotlib.pyplot as plt
n = np.arange(1, 48)
for p in [0.5, 0.9, 0.99]:
plt.plot(n, 1 / ((1 - p) + p / n), label=f'p={p}')
plt.xlabel('# workers (都道府県数)'); plt.ylabel('Speedup')
plt.legend(); plt.grid(True)
plt.title('Amdahl: 47 都道府県を並列にしても逐次部が上限を決める')
multiprocessing の意味はない。Dask/Spark は GB 級から効く。データ分散処理 ★
├─ パラダイム
│ ├─ MapReduce (Hadoop)
│ ├─ DAG実行 (Spark, Dask)
│ ├─ ストリーム (Flink, Beam)
│ └─ Actor (Ray, Akka)
├─ ストレージ
│ ├─ 分散FS (HDFS / S3 / GCS)
│ ├─ 列指向 (Parquet / ORC)
│ └─ メッセージング (Kafka)
├─ 性能理論
│ ├─ Amdahl (上限あり)
│ └─ Gustafson (問題拡大)
└─ 整合性
├─ CAP 定理
├─ コンセンサス (Raft / Paxos)
└─ 結果整合性 (eventual)