pandas / koalas / PySparkのdf操作比較

2020/12/26

プログラミング

実施内容

Pandas / Koalas / PySparkについて、「df操作方法の比較」と「処理速度を比較」を実施した。

※ KoalasはPandas-likeにPySpark dfを扱えるようにしたPySparkのラッパー的な存在

実施環境

動作確認にはAzure Synapse Studioの環境を使用。2020年12月現在Synapse Studio上で使用できる各パッケージのバージョンは以下のとおり。

pandasバージョン:0.25.3
koalasバージョン:1.2.0
sparkバージョン:2.4.4.2.6.99.201-25973884

df操作方法の比較

dfの作成と表示

PandasKoalasPySpark
df作成pdf = pd.DataFrame(array)kdf = ks.DataFrame(array)sdf = spark.createDataFrame(array)
表示①pdf.head()kdf.head()sdf.show()
表示②pdf.tail()

Koalasのtail()メソッドを使用しようとすると以下のメッセージがでる。

RuntimeError: tail can be used PySpark >= 3.0

PySpark 3.0以上なら使用できる模様。

列に関する操作

PandasKoalasPySpark
型取得pdf.dtypeskdf.dtypessdf.dtypes
列名取得pdf.columnskdf.columnssdf.columns
列数取得len(pdf.columns)len(kdf.columns)len(sdf.columns)
renamepdf.rename(columns={'bef1':'aft1', 'bef2':'aft2'})kdf.rename(columns={'bef1':'aft1', 'bef2':'aft2'})sdf.select([func.col('bef1').alias('aft1'), func.col('bef2').alias('aft2')])
列選択pdf[['col11','col2']]kdf[['col11','col2']]sdf[['col11','col2']]

行に関する操作

PandasKoalasPySpark
行数取得len(pdf)len(kdf)sdf.count()
index取得pdf.indexkdf.index
index選択①pdf.loc[[1,2]]kdf.loc[[1,2]]
index選択②pdf.iloc[[1,2]]pdf.iloc[[1,2]]

PySparkではindexの操作はできない。KoalasはPandas同様の使用感。

集計操作

PandasKoalasPySpark
統計情報を取得pdf.describe()kdf.describe()
合計pdf.sum()kdf.sum()sdf.groupby().sum()
最小値pdf.min()kdf.min()sdf.groupby().min()
最大値pdf.max()kdf.max()sdf.groupby().max()
平均値pdf.mean()kdf.mean()sdf.groupby().mean()
中央値pdf.median()kdf.median()
標準偏差pdf.std()kdf.std()
歪度pdf.skew()kdf.skew()
尖度pdf.kurt()kdf.kurt()
グループごとのカウントpdf.groupby('col1').count()kdf.groupby('col1').count()sdf.groupby('col1').count()
グループごとの合計pdf.groupby('col1').sum()kdf.groupby('col1').sum()sdf.groupby('col1').sum()
rollingpdf.rolling(5).agg('sum')

Koalas 1.2.0時点でrolling.aggは実装されていない。PySparkだと自分でudfを定義しなければならない集計関数も、Koalasだと簡単に計算できる。

よく使われるdf操作

PandasKoalasPySpark
applypdf[['col']].apply(lambda x: x+5)kdf[['col']].apply(lambda x: x+5)sdf.select(udf(lambda x: x+5)('col'))
ソートpdf.sort_values(['col1','col2'])df.sort_values(['col1','col2'])sdf.sort(['col1','col2'])
マージ①pdf_left.merge(pdf_right, on='key', how='left')kdf_left.merge(kdf_right, on='key', how='left')sdf_left.join(sdf_right, sdf_left['key'] == sdf_right['key'], "left")
マージ②pd.merge(pdf_left, pdf_right, on='key', how='left')ks.merge(kdf_left, kdf_right, on='key', how='left')
縦結合①pd.concat([pdf_up,pdf_down],ignore_index=True)ks.concat([kdf_up,kdf_down], ignore_index=True)reduce(DataFrame.unionByName, [sdf_up, sdf_down])
縦結合②pdf_up.append(pdf_down, ignore_index=True)kdf_up.append(kdf_down, ignore_index=True)sdf_up.unionAll(sdf_down)
抽出pdf[(pdf['col']>0.5) & (pdf['col']<0.6)]kdf[(kdf['col']>0.5) & (kdf['col']<0.6)]sdf.filter((func.col('col') > func.lit(0.5)) & (func.col('col') < func.lit(0.6)))
列の追加①pdf['newcol'] = 10kdf['newcol'] = 10sdf = sdf.withColumn('newcol',func.lit(10))
列の追加②pdf['newcol'] = np.random.rand(len(pdf))set_option("compute.ops_on_diff_frames", True) #設定の変更が必要<br>kdf['newcol'] = ks.Series(np.random.rand(len(kdf)))sdf = sdf.withColumn('newcol',func.rand())
四則演算①pdf['newcol'] = pdf['col1']+pdf['col2']kdf['newcol'] = kdf['col1']+kdf['col2']sdf = sdf.withColumn('newcol',func.col('col1')+func.col('col2'))
四則演算②pdf['newcol'] = pdf['col1']-pdf['col2']kdf['newcol'] = kdf['col1']-kdf['col2']sdf = sdf.withColumn('newcol',func.col('col1')-func.col('col2'))
四則演算③pdf['newcol'] = pdf['col1']*pdf['col2']kdf['newcol'] = kdf['col1']*kdf['col2']sdf = sdf.withColumn('newcol',func.col('col1')*func.col('col2'))
四則演算④pdf['newcol'] = pdf['col1']/pdf['col2']kdf['newcol'] = kdf['col1']/kdf['col2']sdf = sdf.withColumn('newcol',func.col('col1')/func.col('col2'))
timestamp型の演算pdf['dif_timestamp'] = pdf['end_timestamp'] - pdf['start_timestamp']kdf['dif_timestamp'] = kdf['end_timestamp'] - kdf['start_timestamp']

PySparkでのtimestamp列の計算は、unix timestamp型に変換して計算する必要があり面倒だが、KoalasだとPandas同様に簡単に計算できる。

グラフ描画

PandasKoalasPySpark
折れ線グラフpdf[['col1']].plot(kind='line')kdf[['col1']].plot(kind='line')
棒グラフpdf[['col1']].plot(kind='bar')kdf[['col1']].plot(kind='bar')
箱ひげ図pdf[['col1']].plot(kind='box')
ヒストグラムpdf[['col1']].plot(kind='hist')kdf[['col1']].plot(kind='hist')
カーネル密度推定pdf[['col1']].plot(kind='kde', bw_method=0.2)kdf[['col1']].plot(kind='kde', bw_method=0.2)
面グラフpdf[['col1']].plot(kind='area')kdf[['col1']].plot(kind='area')
散布図pdf.plot(x='col1', y='col2', kind='scatter')kdf.plot(x='col1', y='col2', kind='scatter')
六角形ビニング図pdf.plot(x='col1', y='col2', kind='hexbin')
円グラフpdf[['col1']].plot(kind='pie', subplots=True)kdf[['col1']].plot(kind='pie', subplots=True)

Koalas 1.2.0時点でbox, hexbinは実装されていない。Koalasでは使えないplot機能もあるが、概ねPandasと同じ使用感。

Series操作

pandaskoalaspyspark
Series作成pse = pd.Series([0,1,2,3])kse = ks.Series([0,1,2,3])
sort_indexpse.sort_index()kse.sort_index()
sort_valuespse.sort_values()kse.sort_values()
appendpse.append(pse2)kse.append(kse2)
concatpd.concat([pse,pse2])ks.concat([kse,kse2])
統計情報を取得pse.describe()kse.describe()
合計を取得pse.sum()kse.sum()

PySparkではSeriesが使用できない。KoalasはPandas同様の使用感。

KoalasとSpark独自の処理

PandasKoalasPySpark
SQL文の使用ks.sql("SELECT * FROM {kdf} WHERE pig > 100")spark.sql('SELECT * FROM sdf WHERE col1 > 0.5')
キャッシュの作成kdf.spark.cache()sdf.cache()

dfの相互変換

Pandas、Koalas、PySpark間のdfの相互変換

PandasKoalasPySpark
ks.from_pandas(pdf)spark.createDataFrame(pdf)
ks.DataFrame(sdf)sdf.to_koalas()
kdf.to_spark()sdf.toPandas()

Pandas、Koalas、PySpark間のdfの相互変換も簡単にできる。

KoalasとPySpark処理速度比較

乱数で作成した76MBのデータを基準として行数を1, 10, 100, 1000倍したデータに対して、各処理のKoalasとPySparkの処理速度を計測した。

# 1カラムのソート
kdf.sort_values(['col1'])
sdf.sort(['col1'])
Notion Image
# 3カラムのソート
kdf.sort_values(['col1','col2','col3'])
sdf.sort(['col1','col2','col3'])
Notion Image
# マージ(left)
kdf_left.merge(kdf_right, on='key', how='left')
sdf_left.join(sdf_right, sdf_left['key'] == sdf_right['key'], 'left')
Notion Image
# マージ(inner)
kdf_left.merge(kdf_right, on='key', how='inner')
sdf_left.join(sdf_right, sdf_left['key'] == sdf_right['key'], 'inner')
Notion Image
# 2dfの縦結合
kdf.append(kdf, ignore_index=True)
sdf.unionAll(sdf)
Notion Image
# 3dfの縦結合
ks.concat([kdf,kdf,kdf], ignore_index=True)
reduce(DataFrame.unionByName, [sdf, sdf, sdf])
Notion Image
# 平均値の計算
kdf.groupby('col1').mean()
sdf.groupby('col1').mean()
Notion Image

KoalasとPySparkの処理速度は概ね同じ。マージ処理ではPySparkよりKoalasの方が処理速度が早くなっているが、うまく最適化してくれているのだろうか。(不明)

型変換

  • PandasからKoalas/PySparkにdfを変換したときの型の対応
  • PandasKoalasPySpark
    objectobjectint
    int8int8bigint
    int16int16bigint
    int32int32bigint
    int64int64bigint
    uint8
    uint16
    uint32
    uint64
    float16
    float32float32double
    float64float64double
    float128
    complex64
    complex128
    complex256
    boolboolboolean
    datetime64[ns]datetime64[ns]timestamp
  • SparkからPandas/Koalasにdfを変換したときの型の対応
  • PySparkPandasKoalas
    date
    tinyintint8int8
    smallintint16int16
    intint32int32
    bigintint64int64
    doublefloat64float64
    decimal(10,0)objectobject

    Pandas / Koalas / PySparkそれぞれの特徴

    PandasKoalasPySpark
    分散処理×
    処理速度
    dfの扱いやすさ
    Seriesの扱いやすさ
    timestamp列の扱いやすさ

    まとめ

    Koalas 1.2.0ではまだ未実装の機能もあるが、概ねPandas-likeに使用できる。未実装な機能を呼び出そうとすると親切に「未実装だよ」とエラーメッセージを出してくれているので、将来的には実装してくれそう。 KoalasでSQL文もかけるため、今からPySparkを習得しようとしている人は、PySparkは使わずにKoalasだけでもいいかもしれない。


    著者画像

    ゆうき

    2018/04からITエンジニアとして活動、2021/11から独立。主な使用言語はPython, TypeScript, SAS, etc.