Salah satu kelebihan Apache Spark dalam pemrosesan data adalah kemudahan penggunaan. Selain menggunakan fungsi-fungsi Python pada PySpark, kita juga dapat mengakses dan memproses DataFrame dengan perintah SQL.
Ada beberapa cara penggunaan SQL dalam PySpark. Cara pertama yaitu dengan fungsi expr() seperti telah dibahas dalam artikel sebelumnya. Pilihan ini sesuai jika kita ingin mengeksekusi ekspresi SQL sederhana seperti select, filter, dll. Untuk mengeksekusi query SQL yang lebih kompleks, seperti misalnya menggabungkan beberapa DataFrame sekaligus, kita dapat menggunakan temporary view.
Dengan membuat temporary view, maka DataFrame akan dianggap tabel atau view yang dapat diquery menggunakan SQL untuk memprosesnya.
Untuk dapat menggunakan temporary view dan mengeksekusi query SQL terhadap DataFrame, lakukan langkah berikut :
- Buat Spark Session
- Buat DataFrame
- Buat temporary view
- Jalankan query
Membuat Temporary View Pada PySpark
Ada beberapa cara untuk membuat temporary view/table dari sebuah dataframe, yaitu :
- Menggunakan perintah DataFrame.createOrReplaceTempView(nama_tempview) perintah ini secara default akan menggantikan (replace) view dengan nama yang sama jika sudah ada sebelumnya.
- Menggunakan perintah DataFrame.createTempView(nama_tempview) perintah ini akan menghasilkan TempTableAlreadyExistsException jika view dengan nama yang sama sudah ada sebelumnya.
- Untuk Spark versi sebelum 2.0, perintah yang digunakan adalah DataFrame.registerTempTable(nama_tempview)
Pertama-tama kita load data ke dalam dataframe :
data = [['Agus','Fisika',100],['Windy','Fisika',200],
['Budi','Biologi',200],['Dina','Fisika',150],
['Bayu','Fisika',50],['Dedi','Biologi',50]]
kolom = ["nama","jurusan","nilai"]
df = spark.createDataFrame(data,kolom)
df.show()

Untuk membuat temporary view bernama “mahasiswa” dari dataframe yang telah kita load, kita gunakan perintah createOrReplaceTempView
df.createOrReplaceTempView("mahasiswa")
Perhatikan bahwa perintah createOrReplaceTempView akan otomatis menggantikan view yang lama dengan temporary view baru, jika nama yang digunakan sama. Misalnya :
df2 = df.filter(df.kode_jurusan == 'F')
df2.createOrReplaceTempView("mahasiswa")
Sedangkan perintah createTempView akan mengembalikan error atau exception jika menemukan bahwa nama view tersebut sudah pernah digunakan.
df2 = df.filter(df.kode_jurusan == 'F')
df2.createTempView("mahasiswa")

Menjalankan Query SQL dengan Temporary View
Setelah kita membuat temporary view untuk sebuah DataFrame, kita dapat mengaksesnya sebagaimana kita mengakses sebuah tabel dalam query SQL.
Untuk menjalankan query SQL di PySpark, kita gunakan fungsi spark.sql(). Fungsi ini akan mengembalikan DataFrame sesuai hasil query yang dieksekusi.
result = spark.sql("select * from mahasiswa")
result.show()

spark.sql("select * from mahasiswa where kode_jurusan = 'F'").show()

Kita dapat memanfaatkan fungsi-fungsi SQL seperti misalnya fungsi pemrosesan string, dan menggunakan conditional statement untuk transformasi kolom
spark.sql("""SELECT UPPER(nama) as Nama,
kode_jurusan as Jurusan,
nilai as Nilai,
CASE
WHEN nilai > 150 THEN 'High'
WHEN nilai > 100 THEN 'Medium'
ELSE 'Low'
END AS Grade
FROM mahasiswa""").show()

Kita juga dapat melakukan agregasi dengan menggunakan perintah SQL
spark.sql("""SELECT kode_jurusan,
max(nilai) as max_nilai,
min(nilai) as min_nilai
FROM mahasiswa
GROUP BY kode_jurusan""").show()

Kita juga dapat melakukan JOIN untuk menggabungkan beberapa dataframe.
Misalnya kita ingin melakukan join dengan sebuah dataframe referensi. Dataframe tersebut kita load dan kita buatkan viewnya terlebih dahulu seperti berikut ini
ref = [['F','Fisika','MIPA'],['B','Biologi','MIPA'],
['A', 'Akuntansi', 'Ekonomi']]
kolom = ["kode_jurusan","nama_jurusan","nama_fakultas"]
df_ref = spark.createDataFrame(ref,kolom)
df_ref.show()
df_ref.createOrReplaceTempView("jurusan")

Lalu kita jalankan perintah LEFT JOIN seperti berikut
spark.sql("""SELECT * FROM mahasiswa
LEFT JOIN jurusan
ON mahasiswa.kode_jurusan=jurusan.kode_jurusan""").show()

Jangkauan atau Scope Temporary View
Temporary view yang dibuat dengan perintah createOrReplaceTempView maupun createTempView pada PySpark bersifat session-bound, yaitu view yang dihasilkan hanya dapat diakses oleh spark session yang membuatnya. View tersebut akan menghilang ketika session yang membuatnya berakhir.
Jika kita ingin agar sebuah view tetap ada selama sebuah aplikasi Spark berjalan dan dapat diakses oleh multiple session di dalam aplikasi tersebut, maka kita perlu membuat sebuah global view, dengan menggunakan perintah createOrReplaceGlobalTempView.
df.createOrReplaceGlobalTempView("mhs")
Global view pada spark disimpan pada database global_temp. Untuk mengaksesnya, kita perlu menyebutkan global_temp sebagai databasenya.
spark.sql("SELECT * FROM global_temp.mhs").show()

Ada beberapa hal yang perlu diperhatikan ketika membuat global temporary view, yaitu
Global temp view disimpan dalam lokasi khusus (shared location), sehingga membutuhkan resource yang lebih besar daripada temp view biasa
Global temp view dapat dilihat oleh semua SparkSessions dalam aplikasi yang sama, sehingga membuka peluang untuk diakses oleh user yang tidak berwenang
Penutup
Salah satu fitur yang sangat memudahkan dalam pemrosesan data di pyspark adalah penggunaan temporary view dan query SQL untuk mengakses dataframe.
Fitur ini sangat bermanfaat bagi user yang familiar dengan perintah-perintah SQL. Berbeda dengan fungsi expr() yang sesuai untuk menjalankan ekspresi SQL sederhana, cara ini sangat sesuai untuk menjalankan query SQL yang lebih komplek.
Notebook untuk tutorial ini dapat diakses di sini
Artikel sebelumnya