
Dalam proses pengolahan data, seringkali kita perlu melakukan transformasi atau pengubahan bentuk DataFrame dari satu kolom menjadi beberapa kolom (mengubah dari baris menjadi kolom), dan sebaliknya. Transformasi ini banyak dilakukan ketika kita melakukan agregasi untuk keperluan analisis maupun visualisasi data. Operasi ini disebut juga dengan operasi transpose, atau pivot dan unpivot.
Pada pyspark, fungsi pivot tersedia dalam pyspark.sql.GroupedData.pivot, yaitu format object hasil operasi groupBy. Sedangkan fungsi unpivot terdapat pada pyspark.sql.DataFrame.unpivot.
Dalam artikel ini kita akan mempelajari mengenai fungsi pivot dan unPivot
Sebelumnya, pastikan package pyspark sudah terinstall. Jika belum, kita bisa menggunakan perintah pip install pyspark untuk melakukan instalasi.
Selanjutnya kita import package yang akan kita gunakan, serta kita inisialisasi spark session
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.appName("Belajar PySpark - pivot unPivot").getOrCreate()
Selanjutnya kita persiapkan DataFrame yang akan kita olah
data = [['Agus','Fisika','Umum',150],['Windy','Fisika','Khusus',200],
['Budi','Biologi','Umum',170],['Dina','Fisika','Khusus',180],
['Bayu','Fisika','Umum',160],['Dedi','Biologi','Khusus',185],
['Ana','Biologi','Umum',170],['Yani','Biologi','Khusus',175],
['Citra','Fisika','Umum',170],['Jaka','Biologi','Khusus',180],
['Doni','Biologi','Umum',170],['Hadi','Fisika','Umum',175]]
kolom = ["nama","jurusan","jalur","nilai"]
df = spark.createDataFrame(data,kolom)
df.show()
output :
+-----+-------+------+-----+
| nama|jurusan| jalur|nilai|
+-----+-------+------+-----+
| Agus| Fisika| Umum| 150|
|Windy| Fisika|Khusus| 200|
| Budi|Biologi| Umum| 170|
| Dina| Fisika|Khusus| 180|
| Bayu| Fisika| Umum| 160|
| Dedi|Biologi|Khusus| 185|
| Ana|Biologi| Umum| 170|
| Yani|Biologi|Khusus| 175|
|Citra| Fisika| Umum| 170|
| Jaka|Biologi|Khusus| 180|
| Doni|Biologi| Umum| 170|
| Hadi| Fisika| Umum| 175|
+-----+-------+------+-----+
Agregasi data tanpa pivot
Jika kita melakukan agregasi berdasarkan jalur dan jurusan, akan kita dapatkan hasil sebagai berikut ini
df.groupBy("jalur","jurusan").avg("nilai").show()
output :
+------+-------+----------+
| jalur|jurusan|avg(nilai)|
+------+-------+----------+
|Khusus| Fisika| 190.0|
|Khusus|Biologi| 180.0|
| Umum|Biologi| 170.0|
| Umum| Fisika| 163.75|
+------+-------+----------+
Menampilkan crosstab dengan pivot
Terkadang kita memerlukan data tersebut ditampilkan secara bersilang atau crosstab. Misalnya untuk hasil di atas, tiap jurusan ditampilkan sebagai kolom, seperti berikut ini :
Crosstab by jurusan:
+------+-------+------+
| jalur|Biologi|Fisika|
+------+-------+------+
|Khusus| 180.0| 190.0|
| Umum| 170.0|163.75|
+------+-------+------+
Dalam pyspark, hal ini dapat dilakukan dengan fungsi pivot.
Karena pivot tersedia pada object GroupedData, maka fungsi pivot ini dipanggil setelah fungsi groupBy. Sintaksnya adalah sebagai berikut
df.groupBy(kolom yang digunakan untuk grouping).pivot(kolom pivot, list value yang digunakan sebagai kolom).operasi_agregasi(kolom yang di-agregasi)
Memanggil fungsi pivot dengan menyebutkan nama kolom
Misalnya untuk data di atas, kita ingin menampilkan rata-rata nilai per jalur untuk tiap jurusan. Dalam hal ini, jalur akan ditampilkan sebagai baris, dan jurusan sebagai kolom.
Maka, kita bisa menggunakan rangkaian fungsi berikut ini
- fungsi groupBy dengan parameter "jalur", kemudian
- fungsi pivot dengan parameter "jurusan" dan list berupa nilai distinct jurusan yaitu [“Biologi”,”Fisika”], dan
- fungsi avg dengan parameter "nilai"
seperti dalam contoh berikut ini :
df.groupBy("jalur") \
.pivot("jurusan",["Biologi","Fisika"]) \
.avg("nilai") \
.show()
output:
+------+-------+------+
| jalur|Biologi|Fisika|
+------+-------+------+
|Khusus| 180.0| 190.0|
| Umum| 170.0|163.75|
+------+-------+------+
Memanggil fungsi pivot tanpa menyebutkan nama kolom
Kita juga dapat memanggil fungsi pivot tanpa menggunakan parameter kedua, yaitu list value yang akan dijadikan kolom, seperti berikut ini.
df.groupBy("jalur") \
.pivot("jurusan") \
.avg("nilai") \
.show()
perintah ini akan menghasilkan output yang sama dengan sebelumnya.
Akan tetapi pemanggilan dengan cara ini kurang efisien dibanding cara pertama, karena di sini Spark melakukan pemrosesan tambahan untuk mendapatkan nilai distinct dari kolom yang di-pivot.
Memanggil fungsi pivot untuk sebagian kolom
Kita tidak selalu harus menampilkan seluruh nilai yang mungkin muncul. Kita dapat memilih salah satu atau sebagian value saja dari semua value yang muncul tersebut.
Misalnya untuk conto di atas, kita hanya ingin menampilkan data untuk jurusan Biologi saja, maka list value yang kita gunakan adalah [“Biologi”]
df.groupBy("jalur").pivot("jurusan",["Biologi"]).avg("nilai").show()
output :
+------+-------+
| jalur|Biologi|
+------+-------+
|Khusus| 160.0|
| Umum| 180.0|
+------+-------+
Selain fungsi avg, kita dapat melakukan agregasi dengan fungsi lain, seperti count, min, max, dll.
df.groupBy("jalur") \
.pivot("jurusan",["Biologi","Fisika"]) \
.count() \
.show()
output :
+------+-------+------+
| jalur|Biologi|Fisika|
+------+-------+------+
|Khusus| 3| 2|
| Umum| 3| 4|
+------+-------+------+
Kita juga dapat menggunakan fungsi agg maupun expr untuk melakukan aggregasi.
df.groupBy("jalur").pivot("jurusan").agg({"nilai":"min"}).show()
df.groupBy("jalur").pivot("jurusan").agg(F.expr("min(nilai)")).show()
Untuk mengetahui secara lebih lengkap mengenai grouping, agregasi, serta fungsi expr, simak artikel-artikel sebelumnya : GroupBy dan Agregasi dan SQL pada Dataframe dengan expr()
Fungsi unPivot
Fungsi unPivot digunakan untuk mengubah atau mentranspose DataFrame, dari kolom menjadi baris. Fungsi ini membalik operasi groupBy + pivot, sehingga menghasilkan dataframe seperti hasil groupBy biasa. Fungsi ini tersedia mulai pySpark versi 3.4.0.
Fungsi unPivot dilakukan pada object bertipe DataFrame, dan menghasilkan DataFrame yang lain. Sintaksnya adalah
DataFrame.unpivot(kolom grouping, list kolom yang akan dijadikan baris, namaKolomHasil, namaKolomValue)
Misalnya dari contoh di atas, kita akan mengembalikan kolom Biologi dan Fisika menjadi kolom jurusan, dan value-nya menjadi kolom nilai_rata2.
dfPivot = df.groupBy("jalur") \
.pivot("jurusan",["Biologi","Fisika"]) \
.avg("nilai")
print(" Pivoted data :\n\n")
dfPivot.show()
print("\n UnPivoted data :\n")
dfPivot.unpivot(["jalur"], ["Biologi", "Fisika"],
"jurusan",
"nilai_rata2").show()
Pivoted data :
+------+-------+------+
| jalur|Biologi|Fisika|
+------+-------+------+
|Khusus| 180.0| 190.0|
| Umum| 170.0|163.75|
+------+-------+------+
UnPivoted data :
+------+-------+-----------+
| jalur|jurusan|nilai_rata2|
+------+-------+-----------+
|Khusus|Biologi| 180.0|
|Khusus| Fisika| 190.0|
| Umum|Biologi| 170.0|
| Umum| Fisika| 163.75|
+------+-------+-----------+
Wrapping Up
Fungsi pivot dan unpivot merupakan fungsi yang melakukan transformasi atau transpose GroupData dan DataFrame, dari kolom ke baris dan sebaliknya. Fungsi pivot umumnya digunakan bersama dengan fungsi groupBy.
Notebook untuk artikel ini dapat diakses di sini.
Ikuti juga artikel-artikel sebelumnya :
Belajar PySpark - GroupBy dan Agregasi
Belajar PySpark - Nested Schema pada DataFrame
Belajar PySpark - MapType pada DataFrame Schema
Belajar PySpark - ArrayType pada DataFrame Schema
Belajar PySpark - Definisi Skema pada DataFrame