
PySpark menyediakan berbagai API yang dapat dimanfaatkan sesuai kebutuhan. Ada lebih dari satu cara yang dapat digunakan untuk dapat melakukan operasi DataFrame.
Selain menggunakan fungsi-fungsi python, kita dapat menggunakan ekspresi SQL untuk memanipulasi sebuah DataFrame. Ekspresi SQL tersebut dieksekusi menggunakan fungsi pyspark.sql.functions.expr(). Pada artikel ini, kita akan membahas fungsi tersebut beserta contoh penggunaannya.
Apa itu fungsi expr() dalam PySpark?
Fungsi expr() di PySpark adalah bagian dari API DataFrame dalam package spark.sql.functions. Fungsi ini memungkinkan pengguna untuk melakukan transformasi menggunakan ekspresi serupa SQL.
Beberapa keuntungan dari fungsi expr():
- Menyederhanakan kode sehingga lebih mudah untuk ditulis, dipahami, dan dikelola.
- Menggunakan ekspresi Seperti SQL yang familiar, tanpa harus membuat temporary view.
- Terintegrasi dengan Catalyst optimizer, yang memastikan bahwa transformasi dijalankan secara efisien. Hal ini menjamin kinerja yang tidak kalah dengan penggunaan fungsi-fungsi pyspark biasa.
- Memungkinkan untuk menggunakan nilai kolom DataFrame yang ada sebagai argumen ekspresi ke fungsi Pyspark.
Berikut ini package yang akan digunakan
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
Selanjutnya kita buat object SparkSession dengan nama aplikasi ‘Belajar Pyspark - Fungsi expr’.
spark = SparkSession.builder.appName("Belajar PySpark - expr").getOrCreate()
DataFrame yang akan kita gunakan adalah sebagai berikut:
data = [['Agus','Fisika',100],['Windy','Fisika',200],
['Budi','Biologi',200],['Dina','Fisika',150],
['Bayu','Fisika',50],['Dedi','Biologi',50]]
kolom = ["nama","jurusan","nilai"]
df = spark.createDataFrame(data,kolom)
df.show()
Mengeksekusi ekspresi SQL sederhana
Dengan fungsi expr() kita dapat menggunakan fungsi-fungsi SQL yang lebih familiar. Ekspresi SQL tersebut dapat digunakan untuk melakukan update, filter, agregasi, maupun menambah kolom baru.
Mengubah nilai kolom dengan withColumn dan expr
Untuk mengkonversi nilai sebuah kolom menjadi huruf besar, kita dapat menggunakan fungsi SQL upper() dan fungsi withColumn():
df.withColumn("nama", F.expr("upper(nama)")).show()
Memfilter DataFrame
Kita dapat melakukan filtering baris DataFrame menggunakan ekspresi SQL seperti berikut ini
df.filter(F.expr("nilai > 150")).show()
Untuk filter menggunakan gabungan beberapa kondisi
df.filter(F.expr("nama LIKE '%in%' AND nilai > 150")).show()
Agregasi
Operasi agregasi dapat dilakukan dengan menggunakan gabungan fungsi groupBy() dan expr() seperti contoh berikut
df.groupBy("jurusan").agg(F.expr("avg(nilai) as nilai_rata2")).show()
Memilih kolom dan menambahkan kolom baru
Untuk memilih beberapa kolom ataupun menambah kolom baru kita dapat menggunakan kombinasi perintah select() dan expr()
df.select(F.col("*"), F.expr('upper(nama) as nama1')).show()
Perintah di atas juga dapat ditulis dengan lebih singkat dengan menggunakan fungsi DataFrame.selectExpr()
df.selectExpr('*', 'upper(nama) as nama1').show()
Untuk mengekstraksi kolom sebuah DataFrame menggunakan ekspresi SQL, sebaiknya memilih selectExpr() dibandingkan expr(). Disamping sintaksnya lebih singkat dan jelas, kita juga tidak perlu melakukan import spark.sql.functions.
Fungsi selectExpr() dapat menerima beberapa ekspresi SQL sekaligus, yang dipisahkan dengan tanda koma (,). Oleh karena itu fungsi ini dapat mengembalikan beberapa kolom dari beberapa ekspresi SQL.
df.selectExpr("upper(nama) as nama1",
"upper(jurusan) as jurusan1",
"nilai").show()
Mengeksekusi ekspresi SQL yang kompleks
Fungsi expr() juga dapat mengeksekusi ekspresi SQL yang kompleks, seperti misalnya statement kondisional menggunakan CASE WHEN.
df.withColumn("kode_jurusan",
F.expr("CASE WHEN jurusan = 'Fisika' THEN 'F'"
" WHEN jurusan = 'Biologi' THEN 'B'"
" ELSE 'NA' END")).show()
Untuk mengeksekusi beberapa ekspresi SQL sekaligus, misalnya untuk memilih beberapa kolom, gunakan fungsi selectExpr() seperti contoh berikut ini
df.selectExpr("nama", "jurusan",
"(CASE WHEN jurusan = 'Fisika' THEN 'F'"
" WHEN jurusan = 'Biologi' THEN 'B'"
" ELSE 'NA' END) as kode_jurusan",
"(CASE WHEN nilai < 100 THEN 'C'"
" WHEN nilai < 200 THEN 'B'"
" ELSE 'A' END) as kode_nilai",
).show()
Notebook untuk tutorial ini bisa diunduh di sini.
Artikel sebelumnya :
Mengenal DataFrame
Select, Filter, dan Where pada DataFrame
Transformasi DataFrame dengan withColumn
Kolom Dataframe dengan When-Otherwise