Meskipun Spark DataFrame memiliki struktur yang menyerupai tabel dalam database, ada beberapa perbedaan mendasar. Pada seri belajar PySpark ini, kita akan mendefinisikan skema bersarang (nested schema).
Dalam pemrosesan data besar, seringkali kita menghadapi data dengan struktur yang kompleks, seperti misalnya pada data JSON, Parquet, Avro, dll.
Pada artikel sebelumnya telah dibahas mengenai berbagai cara penggunaan skema pada DataFrame. Pada artikel ini akan kita pelajari mengenai nested schema beserta contoh penggunaannya pada file JSON.
Persiapan
Sebelumnya, pastikan package pyspark sudah terinstall. Jika belum, kita bisa menggunakan pip untuk melakukan instalasi.
Selanjutnya kita import package yang akan kita gunakan, serta kita inisialisasi spark session
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
spark = SparkSession.builder.appName("Belajar PySpark - Skema DataFrame").getOrCreate()
Mendefinisikan PySpark Nested Schema
Untuk mendefinisikan skema bersarang, gunakan object bertipe StructType, dan buat struktur skema sesuai dengan data yang akan dimuat ke dataframe.
Sintaks untuk StructType adalah
StructType([ StructField(namafield_1, Type, nullability),
…
StructField(namafield_N, Type, nullability)
])
Pada nested schema, salah satu StructField mengandung struktur StructType di dalam definisinya, seperti berikut ini:
StructType([ StructField(namafield_1, Type, nullability),
StructType(namafield_2, StructType([
StructField(namafield_2_1, Type, nullability),
StructField(namafield_2_2, Type, nullability)
], nullability)
])
Berikut ini contoh definisi nested schema dan penggunaannya. Data yang akan kita gunakan adalah python list
mySchema = StructType([
StructField("nama", StringType(), True),
StructField("jurusan", StringType(), True),
StructField("nilai", StructType([
StructField("nilai1", IntegerType(), True),
StructField("nilai2", IntegerType(), True),
StructField("nilai3", IntegerType(), True)
]), True)
])
data = [['Agus','F',(100,150,150)],
['Budi','B',(200,100,150)],
['Dina','F',(150,150,130)],
['Dedi','B', (50,100,100)]]
df = spark.createDataFrame(data, mySchema)
df.show()
df.printSchema()
+----+-------+---------------+
|nama|jurusan| nilai|
+----+-------+---------------+
|Agus| F|{100, 150, 150}|
|Budi| B|{200, 100, 150}|
|Dina| F|{150, 150, 130}|
|Dedi| B| {50, 100, 100}|
+----+-------+---------------+
root
|-- nama: string (nullable = true)
|-- jurusan: string (nullable = true)
|-- nilai: struct (nullable = true)
| |-- nilai1: integer (nullable = true)
| |-- nilai2: integer (nullable = true)
| |-- nilai3: integer (nullable = true)
Kolom berisi nested schema dalam DataFrame disimpan dengan tipe struct
Mengakses Kolom Pada Nested Schema
Untuk mengakses kolom pada nested schema, kita gunakan notasi titik, yaitu nama_kolom.nama_subkolom.
Misalnya kita ingin menampilkan kolom “nama” dan “nilai2” dari DataFrame di atas
df.select("nama", "nilai.nilai2").show()
+----+------+
|nama|nilai2|
+----+------+
|Agus| 150|
|Budi| 100|
|Dina| 150|
|Dedi| 100|
+----+------+
Agregasi field pada nested schema
Sama dengan metode akses field nested schema, untuk melakukan agregasi, kita gunakan fungsi_aggregasi(nama_kolom.nama_subkolom).
Misalnya untuk menghitung rata-rata nilai3 pada dataFrame di atas, kita gunakan avg(“nilai.nilai3”) sebagaimana berikut ini
import pyspark.sql.functions as F
df.groupby("jurusan") \
.agg(F.avg("nilai.nilai3") \
.alias("rerata_nilai3")).show()
+-------+-------------+
|jurusan|rerata_nilai3|
+-------+-------------+
| F| 140.0|
| B| 125.0|
+-------+-------------+
Filter dengan field pada nested schema
Untuk melakukan filtering berdasar field pada nested schema, digunakan df.filter(df[nama_kolom.nama_subkolom] kondisi_filtering).
Misal untuk memilih record dengan “nilai2” di bawah 150, perintahnya adalah seperti berikut ini:
df.filter(df["nilai.nilai2"] < 150).show()
+----+-------+---------------+
|nama|jurusan| nilai|
+----+-------+---------------+
|Budi| B|{200, 100, 150}|
|Dedi| B| {50, 100, 100}|
+----+-------+---------------+
Mengubah Nested Schema Menjadi Flat Schema
Untuk mengubah nested schema menjadi skema biasa / flat, kita bisa gunakan perintah select(“namakolom.*”).
Misalnya pada contoh di atas kita ingin menjadikan kolom-kolom nilai1, nilai2, dan nilai3 menjadi kolom biasa, maka kita jalankan perintah berikut ini
df.select("nama","jurusan","nilai.*").show()
+----+-------+------+------+------+
|nama|jurusan|nilai1|nilai2|nilai3|
+----+-------+------+------+------+
|Agus| F| 100| 150| 150|
|Budi| B| 200| 100| 150|
|Dina| F| 150| 150| 130|
|Dedi| B| 50| 100| 100|
+----+-------+------+------+------+
Menyimpan Skema Ke File Json
Kita dapat menyimpan file skema nested ini ke dalam file berformat JSON. Caranya, kita generate sebuah JSON string dari skema sebuah dataframe, dengan menggunakan menggunakan fungsi DataFrame.schema.json() seperti berikut ini
json_string = df.schema.json()
print(json_string)
{"fields":[{"metadata":{},"name":"nama","nullable":true,"type":"string"},{"metadata":{},"name":"jurusan","nullable":true,"type":"string"},{"metadata":{},"name":"nilai","nullable":true,"type":{"fields":[{"metadata":{},"name":"nilai1","nullable":true,"type":"integer"},{"metadata":{},"name":"nilai2","nullable":true,"type":"integer"},{"metadata":{},"name":"nilai3","nullable":true,"type":"integer"}],"type":"struct"}}],"type":"struct"}
Membaca Skema Dari File Json
Untuk membaca file skema dari file JSON, gunakan package json. Caranya adalah import package json, buka file json yang berisi skema, kemudian load data ke python dictionary, dengan fungsi json.load
import json
f = open("schema.json")
json_dict = json.load(f)
f.close()
json_dict
{'fields': [{'metadata': {},
'name': 'nama',
'nullable': True,
'type': 'string'},
{'metadata': {}, 'name': 'jurusan', 'nullable': True, 'type': 'string'},
{'metadata': {},
'name': 'nilai',
'nullable': True,
'type': {'fields': [{'metadata': {},
'name': 'nilai1',
'nullable': True,
'type': 'integer'},
{'metadata': {}, 'name': 'nilai2', 'nullable': True, 'type': 'integer'},
{'metadata': {}, 'name': 'nilai3', 'nullable': True, 'type': 'integer'}],
'type': 'struct'}}],
'type': 'struct'}
Selanjutnya kita dapat menggunakan dictionary tersebut untuk membentuk StructType dengan menggunakan fungsi StructType.fromJson(json_dictionary)
data = [['Agus','F',[100,150,150]],
['Budi','B',[200,100,150]],
['Dina','F',[150,150,130]],
['Dedi','B', [50,100,100]]]
schemaFromJson = StructType.fromJson(json_dict)
df3 = spark.createDataFrame(data, schemaFromJson)
df3.show()
df3.printSchema()
+----+-------+---------------+
|nama|jurusan| nilai|
+----+-------+---------------+
|Agus| F|{100, 150, 150}|
|Budi| B|{200, 100, 150}|
|Dina| F|{150, 150, 130}|
|Dedi| B| {50, 100, 100}|
+----+-------+---------------+
root
|-- nama: string (nullable = true)
|-- jurusan: string (nullable = true)
|-- nilai: struct (nullable = true)
| |-- nilai1: integer (nullable = true)
| |-- nilai2: integer (nullable = true)
| |-- nilai3: integer (nullable = true)
Wrapping Up
Nested schema dalam PySpark mengacu pada struktur data yang kompleks di dalam DataFrame. Schema "nested" ini terdiri dari struktur yang bersarang atau terdalam, seperti array, map, atau tipe data struktur lainnya, yang ada di dalam kolom DataFrame. Contohnya, suatu kolom dapat berisi array dari nilai-nilai, atau sebuah map yang menyimpan pasangan kunci-nilai yang lebih rumit. Nested schema memungkinkan representasi data yang lebih kompleks dan berstruktur, memungkinkan penyimpanan informasi yang lebih mendalam dalam kolom-kolom DataFrame. Hal ini sering digunakan untuk merepresentasikan data yang memiliki hierarki atau struktur yang lebih rumit, memfasilitasi analisis data yang lebih dalam dan kompleks dalam lingkungan PySpark.
Notebook untuk tutorial ini dapat diakses di sini
Artikel sebelumnya
- Definisi Skema pada DataFrame
- Membaca File JSON
- Membaca File csv ke DataFrame
- Memproses Dataframe dengan Temporary View dan SQL
- Join DataFrame
- Dataframe GroupBy dan Agregasi