pandas / koalas / PySparkのdf操作比較
2020/12/26
プログラミング実施内容
Pandas / Koalas / PySparkについて、「df操作方法の比較」と「処理速度を比較」を実施した。
※ KoalasはPandas-likeにPySpark dfを扱えるようにしたPySparkのラッパー的な存在
実施環境
動作確認にはAzure Synapse Studioの環境を使用。2020年12月現在Synapse Studio上で使用できる各パッケージのバージョンは以下のとおり。
pandasバージョン:0.25.3
koalasバージョン:1.2.0
sparkバージョン:2.4.4.2.6.99.201-25973884df操作方法の比較
dfの作成と表示
| Pandas | Koalas | PySpark | |
| df作成 | pdf = pd.DataFrame(array) | kdf = ks.DataFrame(array) | sdf = spark.createDataFrame(array) |
| 表示① | pdf.head() | kdf.head() | sdf.show() |
| 表示② | pdf.tail() | - | - |
Koalasのtail()メソッドを使用しようとすると以下のメッセージがでる。
RuntimeError: tail can be used PySpark >= 3.0PySpark 3.0以上なら使用できる模様。
列に関する操作
| Pandas | Koalas | PySpark | |
| 型取得 | pdf.dtypes | kdf.dtypes | sdf.dtypes |
| 列名取得 | pdf.columns | kdf.columns | sdf.columns |
| 列数取得 | len(pdf.columns) | len(kdf.columns) | len(sdf.columns) |
| rename | pdf.rename(columns={'bef1':'aft1', 'bef2':'aft2'}) | kdf.rename(columns={'bef1':'aft1', 'bef2':'aft2'}) | sdf.select([func.col('bef1').alias('aft1'), func.col('bef2').alias('aft2')]) |
| 列選択 | pdf[['col11','col2']] | kdf[['col11','col2']] | sdf[['col11','col2']] |
行に関する操作
| Pandas | Koalas | PySpark | |
| 行数取得 | len(pdf) | len(kdf) | sdf.count() |
| index取得 | pdf.index | kdf.index | - |
| index選択① | pdf.loc[[1,2]] | kdf.loc[[1,2]] | - |
| index選択② | pdf.iloc[[1,2]] | pdf.iloc[[1,2]] | - |
PySparkではindexの操作はできない。KoalasはPandas同様の使用感。
集計操作
| Pandas | Koalas | PySpark | |
| 統計情報を取得 | pdf.describe() | kdf.describe() | - |
| 合計 | pdf.sum() | kdf.sum() | sdf.groupby().sum() |
| 最小値 | pdf.min() | kdf.min() | sdf.groupby().min() |
| 最大値 | pdf.max() | kdf.max() | sdf.groupby().max() |
| 平均値 | pdf.mean() | kdf.mean() | sdf.groupby().mean() |
| 中央値 | pdf.median() | kdf.median() | - |
| 標準偏差 | pdf.std() | kdf.std() | - |
| 歪度 | pdf.skew() | kdf.skew() | - |
| 尖度 | pdf.kurt() | kdf.kurt() | - |
| グループごとのカウント | pdf.groupby('col1').count() | kdf.groupby('col1').count() | sdf.groupby('col1').count() |
| グループごとの合計 | pdf.groupby('col1').sum() | kdf.groupby('col1').sum() | sdf.groupby('col1').sum() |
| rolling | pdf.rolling(5).agg('sum') | - | - |
Koalas 1.2.0時点でrolling.aggは実装されていない。PySparkだと自分でudfを定義しなければならない集計関数も、Koalasだと簡単に計算できる。
よく使われるdf操作
| Pandas | Koalas | PySpark | |
| apply | pdf[['col']].apply(lambda x: x+5) | kdf[['col']].apply(lambda x: x+5) | sdf.select(udf(lambda x: x+5)('col')) |
| ソート | pdf.sort_values(['col1','col2']) | df.sort_values(['col1','col2']) | sdf.sort(['col1','col2']) |
| マージ① | pdf_left.merge(pdf_right, on='key', how='left') | kdf_left.merge(kdf_right, on='key', how='left') | sdf_left.join(sdf_right, sdf_left['key'] == sdf_right['key'], "left") |
| マージ② | pd.merge(pdf_left, pdf_right, on='key', how='left') | ks.merge(kdf_left, kdf_right, on='key', how='left') | - |
| 縦結合① | pd.concat([pdf_up,pdf_down],ignore_index=True) | ks.concat([kdf_up,kdf_down], ignore_index=True) | reduce(DataFrame.unionByName, [sdf_up, sdf_down]) |
| 縦結合② | pdf_up.append(pdf_down, ignore_index=True) | kdf_up.append(kdf_down, ignore_index=True) | sdf_up.unionAll(sdf_down) |
| 抽出 | pdf[(pdf['col']>0.5) & (pdf['col']<0.6)] | kdf[(kdf['col']>0.5) & (kdf['col']<0.6)] | sdf.filter((func.col('col') > func.lit(0.5)) & (func.col('col') < func.lit(0.6))) |
| 列の追加① | pdf['newcol'] = 10 | kdf['newcol'] = 10 | sdf = sdf.withColumn('newcol',func.lit(10)) |
| 列の追加② | pdf['newcol'] = np.random.rand(len(pdf)) | set_option("compute.ops_on_diff_frames", True) #設定の変更が必要<br>kdf['newcol'] = ks.Series(np.random.rand(len(kdf))) | sdf = sdf.withColumn('newcol',func.rand()) |
| 四則演算① | pdf['newcol'] = pdf['col1']+pdf['col2'] | kdf['newcol'] = kdf['col1']+kdf['col2'] | sdf = sdf.withColumn('newcol',func.col('col1')+func.col('col2')) |
| 四則演算② | pdf['newcol'] = pdf['col1']-pdf['col2'] | kdf['newcol'] = kdf['col1']-kdf['col2'] | sdf = sdf.withColumn('newcol',func.col('col1')-func.col('col2')) |
| 四則演算③ | pdf['newcol'] = pdf['col1']*pdf['col2'] | kdf['newcol'] = kdf['col1']*kdf['col2'] | sdf = sdf.withColumn('newcol',func.col('col1')*func.col('col2')) |
| 四則演算④ | pdf['newcol'] = pdf['col1']/pdf['col2'] | kdf['newcol'] = kdf['col1']/kdf['col2'] | sdf = sdf.withColumn('newcol',func.col('col1')/func.col('col2')) |
| timestamp型の演算 | pdf['dif_timestamp'] = pdf['end_timestamp'] - pdf['start_timestamp'] | kdf['dif_timestamp'] = kdf['end_timestamp'] - kdf['start_timestamp'] | - |
PySparkでのtimestamp列の計算は、unix timestamp型に変換して計算する必要があり面倒だが、KoalasだとPandas同様に簡単に計算できる。
グラフ描画
| Pandas | Koalas | PySpark | |
| 折れ線グラフ | pdf[['col1']].plot(kind='line') | kdf[['col1']].plot(kind='line') | - |
| 棒グラフ | pdf[['col1']].plot(kind='bar') | kdf[['col1']].plot(kind='bar') | - |
| 箱ひげ図 | pdf[['col1']].plot(kind='box') | - | - |
| ヒストグラム | pdf[['col1']].plot(kind='hist') | kdf[['col1']].plot(kind='hist') | - |
| カーネル密度推定 | pdf[['col1']].plot(kind='kde', bw_method=0.2) | kdf[['col1']].plot(kind='kde', bw_method=0.2) | - |
| 面グラフ | pdf[['col1']].plot(kind='area') | kdf[['col1']].plot(kind='area') | - |
| 散布図 | pdf.plot(x='col1', y='col2', kind='scatter') | kdf.plot(x='col1', y='col2', kind='scatter') | - |
| 六角形ビニング図 | pdf.plot(x='col1', y='col2', kind='hexbin') | - | - |
| 円グラフ | pdf[['col1']].plot(kind='pie', subplots=True) | kdf[['col1']].plot(kind='pie', subplots=True) | - |
Koalas 1.2.0時点でbox, hexbinは実装されていない。Koalasでは使えないplot機能もあるが、概ねPandasと同じ使用感。
Series操作
| pandas | koalas | pyspark | |
| Series作成 | pse = pd.Series([0,1,2,3]) | kse = ks.Series([0,1,2,3]) | - |
| sort_index | pse.sort_index() | kse.sort_index() | - |
| sort_values | pse.sort_values() | kse.sort_values() | - |
| append | pse.append(pse2) | kse.append(kse2) | - |
| concat | pd.concat([pse,pse2]) | ks.concat([kse,kse2]) | - |
| 統計情報を取得 | pse.describe() | kse.describe() | - |
| 合計を取得 | pse.sum() | kse.sum() | - |
PySparkではSeriesが使用できない。KoalasはPandas同様の使用感。
KoalasとSpark独自の処理
| Pandas | Koalas | PySpark | |
| SQL文の使用 | - | ks.sql("SELECT * FROM {kdf} WHERE pig > 100") | spark.sql('SELECT * FROM sdf WHERE col1 > 0.5') |
| キャッシュの作成 | - | kdf.spark.cache() | sdf.cache() |
dfの相互変換
Pandas、Koalas、PySpark間のdfの相互変換
| Pandas | Koalas | PySpark |
| - | ks.from_pandas(pdf) | spark.createDataFrame(pdf) |
| - | ks.DataFrame(sdf) | sdf.to_koalas() |
| - | kdf.to_spark() | sdf.toPandas() |
Pandas、Koalas、PySpark間のdfの相互変換も簡単にできる。
KoalasとPySpark処理速度比較
乱数で作成した76MBのデータを基準として行数を1, 10, 100, 1000倍したデータに対して、各処理のKoalasとPySparkの処理速度を計測した。
# 1カラムのソート
kdf.sort_values(['col1'])
sdf.sort(['col1'])# 3カラムのソート
kdf.sort_values(['col1','col2','col3'])
sdf.sort(['col1','col2','col3'])# マージ(left)
kdf_left.merge(kdf_right, on='key', how='left')
sdf_left.join(sdf_right, sdf_left['key'] == sdf_right['key'], 'left')# マージ(inner)
kdf_left.merge(kdf_right, on='key', how='inner')
sdf_left.join(sdf_right, sdf_left['key'] == sdf_right['key'], 'inner')# 2dfの縦結合
kdf.append(kdf, ignore_index=True)
sdf.unionAll(sdf)# 3dfの縦結合
ks.concat([kdf,kdf,kdf], ignore_index=True)
reduce(DataFrame.unionByName, [sdf, sdf, sdf])# 平均値の計算
kdf.groupby('col1').mean()
sdf.groupby('col1').mean()KoalasとPySparkの処理速度は概ね同じ。マージ処理ではPySparkよりKoalasの方が処理速度が早くなっているが、うまく最適化してくれているのだろうか。(不明)
型変換
| Pandas | Koalas | PySpark |
| object | object | int |
| int8 | int8 | bigint |
| int16 | int16 | bigint |
| int32 | int32 | bigint |
| int64 | int64 | bigint |
| uint8 | - | - |
| uint16 | - | - |
| uint32 | - | - |
| uint64 | - | - |
| float16 | - | - |
| float32 | float32 | double |
| float64 | float64 | double |
| float128 | - | - |
| complex64 | - | - |
| complex128 | - | - |
| complex256 | - | - |
| bool | bool | boolean |
| datetime64[ns] | datetime64[ns] | timestamp |
| PySpark | Pandas | Koalas |
| date | - | - |
| tinyint | int8 | int8 |
| smallint | int16 | int16 |
| int | int32 | int32 |
| bigint | int64 | int64 |
| double | float64 | float64 |
| decimal(10,0) | object | object |
Pandas / Koalas / PySparkそれぞれの特徴
| Pandas | Koalas | PySpark | |
| 分散処理 | × | ○ | ○ |
| 処理速度 | ◎ | ○ | ○ |
| dfの扱いやすさ | ◎ | ◎ | ○ |
| Seriesの扱いやすさ | ◎ | ◎ | - |
| timestamp列の扱いやすさ | ◎ | ◎ | △ |
まとめ
Koalas 1.2.0ではまだ未実装の機能もあるが、概ねPandas-likeに使用できる。未実装な機能を呼び出そうとすると親切に「未実装だよ」とエラーメッセージを出してくれているので、将来的には実装してくれそう。 KoalasでSQL文もかけるため、今からPySparkを習得しようとしている人は、PySparkは使わずにKoalasだけでもいいかもしれない。
