Apache Spark adalah framework open source untuk pengolahan big data secara terdistribusi. Saat ini Spark menjadi salah satu framework yang paling banyak digunakan untuk melakukan pengolahan data yang berukuran sangat besar, karena kecepatannya serta kemampuannya mendukung pemrosesan batch, streaming, real time, maupun interaktif.
PySpark adalah API Python untuk Apache Spark. Jika Anda sudah terbiasa menggunakan Python dan Pandas, maka PySpark adalah bahasa yang sesuai untuk mempelajari pengolahan dan analisis data dengan Spark.
PySpark mendukung semua fitur Spark seperti Spark SQL, DataFrame, Structured Streaming, Machine Learning (MLlib) dan Spark Core.
Dalam artikel ini kita akan belajar bagaimana menggunakan PySpark, khususnya Spark DataFrame, dengan menggunakan jupyter notebook, dalam hal ini google colab.
PySpark dapat dijalankan secara interaktif melalui pyspark console maupun python notebook seperti Jupyter atau Zeppelin.
Spark DataFrame merupakan bagian dari high level API. DataFrame adalah kumpulan data yang terdistribusi, yang memiliki baris dan kolom, serupa dengan tabel dalam DataBase. Konsepnya mirip dengan DataFrame pada Pandas dan R. DataFrame pada Spark dibangun di atas RDD (Resilient Distributed Dataset), sehingga memiliki fitur yang sama dengan RDD, yaitu immutability dan lazy evaluation.
Beberapa kelebihan DataFrame dibandingkan RDD adalah : kemudahan penggunaan, serta optimasi dan efisiensi yang lebih baik.
Instalasi PySpark
Jika anda menggunakan notebook secara lokal, misalnya dengan anaconda, anda cukup melakukan satu kali instalasi pyspark. Akan tetapi jika anda menggunakan Google colab, anda perlu menginstall pyspark setiap kali membuka session baru.
Kita akan gunakan perintah pip (python install package), dengan perintah seperti berikut :
%pip install pyspark
Loading package
Aplikasi PySpark diawali dengan inisialisasi SparkSession. SparkSession diperkenalkan sejak versi Spark 2.0. SparkSession merupakan pintu masuk ke eksekusi Spark, untuk membuat objek-objek Spark RDD, DataFrame, dan DataSet.
from pyspark.sql import SparkSession
Inisialisasi Spark Session
Jika kita mengakses Spark melalui PySpark shell, shell akan secara otomatis membuatkan Spark session dalam variabel bernama spark. Untuk pemrograman di luar spark-shell, object SparkSession harus diinisialisasi sebelum dapat digunakan.
Perintah inisialisasi SparkSession adalah seperti berikut ini:
spark = SparkSession.builder.getOrCreate()
Kita bisa memberi nama object SparkSession dengan melakukan inisialisasi parameter appName
spark = SparkSession.builder.appName('Belajar Spark').getOrCreate()
Membuat Spark DataFrame
DataFrame dapat dibuat dengan banyak cara, di antaranya :
- Dari python object, misalnya array/list, tuple, dictionary, pandas dataframe, dll. Digunakan dengan perintah createDataFrame().
- Dari file : csv, json, dll. Digunakan dengan perintah read.
- Dari file di HDFS, dengan perintah read ke lokasi file di HDFS.
- Dari RDD, dengan perintah toDF().
- dll.
Untuk membuat DataFrame dari python tuple, diperlukan 2 parameter : tuple berisi data, dan list nama kolom.
mydata = (('apel',100),('jeruk',200),('pisang',150),('pir',50))
df_from_tuple = spark.createDataFrame(mydata, ["nama", "jumlah"])
Membuat DataFrame dari python list juga memerlukan 2 argumen seperti pada contoh sebelumnya
mydata = [['apel',100],['jeruk',200],['pisang',150],['pir',50]]
df_from_list = spark.createDataFrame(mydata,["nama", "jumlah"])
Membuat DataFrame dari python dictionary
mydata = [{'nama':'apel','jumlah':100},
{'nama':'jeruk','jumlah':200},
{'nama':'pisang','jumlah':150},
{'nama':'pir','jumlah':50}]
df_from_dict = spark.createDataFrame(mydata)
Untuk membuat DataFrame dari pandas dataframe
import pandas as pd
df_pd = pd.DataFrame(list(zip(['apel','jeruk','pisang','pir'],
[100,200,150,100])),
columns =['nama','jumlah'])
df_from_pandas = spark.createDataFrame(df_pd)
DataFrame dapat dibuat dengan melakukan loading file text, csv, json, dll. Untuk membaca file csv, gunakan perintah spark.read.csv(). Jika file csv yang dibaca mengandung header, set parameter header = True.
df_from_csv = spark.read.csv("nama_file.csv", header=True)
Eksplorasi Spark DataFrame
Sebelum melakukan pemrosesan, biasanya kita melakukan eksplorasi untuk mengetahui karakteristik data yang akan diproses. Beberapa perintah yang biasa kita gunakan yaitu :
Menampilkan satu baris pertama
df.first()
Menampilkan beberapa baris pertama
df.show(5)
Menampilkan jumlah baris
df.count()
Menampilkan kolom
df.columns
Menampilkan jumlah kolom
len(df.columns)
Menampilkan skema DataFrame
df.printSchema()
Untuk menampilkan rangkuman statistik dari DataFrame, gunakan fungsi describe() dan show(). Perintah ini akan menampilkan jumlah baris non null, rerata, standar deviasi, nilai minimum dan maksimum, dari kolom-kolom yang bertipe numerik.
df.describe().show()
Memilih kolom dan baris
Untuk memilih kolom tertentu saja, gunakan perintah select(namakolom)
df.select("nama").show()
Perintah filter(kondisi) digunakan untuk memilih atau memfilter baris sebuah DataFrame
df.filter(df.jumlah > 100).show()
Untuk menggunakan kondisi berupa operasi string, dapat digunakan fungsi-fungsi dari pyspark.sql.Column yang terkait string, misalnya contains(), startswith(), endswith()
df.filter(df.nama.contains('is')).show(5)
df.filter(df.nama.startswith('je')).show(5)
Kita juga dapat menggunakan fungsi like() yang serupa dengan SQL statement like
df.filter(df.nama.like('pi%')).show(5)
Jupyter notebook untuk artikel ini dapat dilihat di sini :
https://github.com/urfie/Seri-Belajar-PySpark/blob/main/Belajar_Spark_dengan_Python_Intro.ipynb