Belajar PySpark - Nested Schema pada DataFrame

belajar-pyspark-nestedschema

Meskipun Spark DataFrame memiliki struktur yang menyerupai tabel dalam database, ada beberapa perbedaan mendasar. Pada seri belajar PySpark ini, kita akan mendefinisikan skema bersarang (nested schema).

Dalam pemrosesan data besar, seringkali kita menghadapi data dengan struktur yang kompleks, seperti misalnya pada data JSON, Parquet, Avro, dll. 

Pada artikel sebelumnya telah dibahas mengenai berbagai cara penggunaan skema pada DataFrame. Pada artikel ini akan kita pelajari mengenai nested schema beserta contoh penggunaannya pada file JSON.

 

Persiapan

Sebelumnya, pastikan package pyspark sudah terinstall. Jika belum, kita bisa menggunakan pip untuk melakukan instalasi.

Selanjutnya kita import package yang akan kita gunakan, serta kita inisialisasi spark session

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

spark = SparkSession.builder.appName("Belajar PySpark - Skema DataFrame").getOrCreate()

 

Mendefinisikan PySpark Nested Schema

Untuk mendefinisikan skema bersarang, gunakan object bertipe StructType, dan buat struktur skema sesuai dengan data yang akan dimuat ke dataframe.

Sintaks untuk StructType adalah 

StructType([ StructField(namafield_1, Type, nullability), 
             … 
             StructField(namafield_N, Type, nullability)
           ])

 

Pada nested schema, salah satu StructField mengandung struktur StructType di dalam definisinya, seperti berikut ini:

StructType([ StructField(namafield_1, Type, nullability), 
             StructType(namafield_2, StructType([
                 StructField(namafield_2_1, Type, nullability),
                 StructField(namafield_2_2, Type, nullability)
              ], nullability)
           ])

 

Berikut ini contoh definisi nested schema dan penggunaannya. Data yang akan kita gunakan adalah python list

mySchema = StructType([
    StructField("nama", StringType(), True),
    StructField("jurusan", StringType(), True),
    StructField("nilai", StructType([
        StructField("nilai1", IntegerType(), True),
        StructField("nilai2", IntegerType(), True),
        StructField("nilai3", IntegerType(), True)
    ]), True)
])

data = [['Agus','F',(100,150,150)],
        ['Budi','B',(200,100,150)],
        ['Dina','F',(150,150,130)],
        ['Dedi','B', (50,100,100)]]

df = spark.createDataFrame(data, mySchema)
df.show()
df.printSchema()
+----+-------+---------------+
|nama|jurusan|          nilai|
+----+-------+---------------+
|Agus|      F|{100, 150, 150}|
|Budi|      B|{200, 100, 150}|
|Dina|      F|{150, 150, 130}|
|Dedi|      B| {50, 100, 100}|
+----+-------+---------------+
root
 |-- nama: string (nullable = true)
 |-- jurusan: string (nullable = true)
 |-- nilai: struct (nullable = true)
 |    |-- nilai1: integer (nullable = true)
 |    |-- nilai2: integer (nullable = true)
 |    |-- nilai3: integer (nullable = true)

Kolom berisi nested schema dalam DataFrame disimpan dengan tipe struct

 

Mengakses Kolom Pada Nested Schema

Untuk mengakses kolom pada nested schema, kita gunakan notasi titik, yaitu nama_kolom.nama_subkolom.

Misalnya kita ingin menampilkan kolom “nama” dan “nilai2” dari DataFrame di atas

df.select("nama", "nilai.nilai2").show()
+----+------+
|nama|nilai2|
+----+------+
|Agus|   150|
|Budi|   100|
|Dina|   150|
|Dedi|   100|
+----+------+

 

Agregasi field pada nested schema

Sama dengan metode akses field nested schema, untuk melakukan agregasi, kita gunakan fungsi_aggregasi(nama_kolom.nama_subkolom).

Misalnya untuk menghitung rata-rata nilai3 pada dataFrame di atas, kita gunakan avg(“nilai.nilai3”) sebagaimana berikut ini 

import pyspark.sql.functions as F

df.groupby("jurusan") \
      .agg(F.avg("nilai.nilai3") \
           .alias("rerata_nilai3")).show()
+-------+-------------+
|jurusan|rerata_nilai3|
+-------+-------------+
|      F|        140.0|
|      B|        125.0|
+-------+-------------+

 

Filter dengan field pada nested schema

Untuk melakukan filtering berdasar field pada nested schema, digunakan df.filter(df[nama_kolom.nama_subkolom] kondisi_filtering). 

Misal untuk memilih record dengan “nilai2” di bawah 150, perintahnya adalah seperti berikut ini:

df.filter(df["nilai.nilai2"] < 150).show()
+----+-------+---------------+
|nama|jurusan|          nilai|
+----+-------+---------------+
|Budi|      B|{200, 100, 150}|
|Dedi|      B| {50, 100, 100}|
+----+-------+---------------+

 

Mengubah Nested Schema Menjadi Flat Schema

Untuk mengubah nested schema menjadi skema biasa / flat, kita bisa gunakan perintah select(“namakolom.*”).

Misalnya pada contoh di atas kita ingin menjadikan kolom-kolom nilai1, nilai2, dan nilai3 menjadi kolom biasa, maka kita jalankan perintah berikut ini 

df.select("nama","jurusan","nilai.*").show()
+----+-------+------+------+------+
|nama|jurusan|nilai1|nilai2|nilai3|
+----+-------+------+------+------+
|Agus|      F|   100|   150|   150|
|Budi|      B|   200|   100|   150|
|Dina|      F|   150|   150|   130|
|Dedi|      B|    50|   100|   100|
+----+-------+------+------+------+

 

Menyimpan Skema Ke File Json

Kita dapat menyimpan file skema nested ini ke dalam file berformat JSON. Caranya, kita generate sebuah JSON string dari skema sebuah dataframe, dengan menggunakan menggunakan fungsi DataFrame.schema.json() seperti berikut ini

json_string = df.schema.json()
print(json_string)
{"fields":[{"metadata":{},"name":"nama","nullable":true,"type":"string"},{"metadata":{},"name":"jurusan","nullable":true,"type":"string"},{"metadata":{},"name":"nilai","nullable":true,"type":{"fields":[{"metadata":{},"name":"nilai1","nullable":true,"type":"integer"},{"metadata":{},"name":"nilai2","nullable":true,"type":"integer"},{"metadata":{},"name":"nilai3","nullable":true,"type":"integer"}],"type":"struct"}}],"type":"struct"}

 

Membaca Skema Dari File Json

Untuk membaca file skema dari file JSON, gunakan package json. Caranya adalah import package json, buka file json yang berisi skema, kemudian load data ke python dictionary, dengan fungsi json.load

import json

f = open("schema.json")
json_dict = json.load(f)
f.close()

json_dict
{'fields': [{'metadata': {},
   'name': 'nama',
   'nullable': True,
   'type': 'string'},
  {'metadata': {}, 'name': 'jurusan', 'nullable': True, 'type': 'string'},
  {'metadata': {},
   'name': 'nilai',
   'nullable': True,
   'type': {'fields': [{'metadata': {},
      'name': 'nilai1',
      'nullable': True,
      'type': 'integer'},
     {'metadata': {}, 'name': 'nilai2', 'nullable': True, 'type': 'integer'},
     {'metadata': {}, 'name': 'nilai3', 'nullable': True, 'type': 'integer'}],
    'type': 'struct'}}],
 'type': 'struct'}

 

Selanjutnya kita dapat menggunakan dictionary tersebut untuk membentuk StructType dengan menggunakan fungsi StructType.fromJson(json_dictionary)

data = [['Agus','F',[100,150,150]],
        ['Budi','B',[200,100,150]],
        ['Dina','F',[150,150,130]],
        ['Dedi','B', [50,100,100]]]


schemaFromJson = StructType.fromJson(json_dict)


df3 = spark.createDataFrame(data, schemaFromJson)
df3.show()
df3.printSchema()
+----+-------+---------------+
|nama|jurusan|          nilai|
+----+-------+---------------+
|Agus|      F|{100, 150, 150}|
|Budi|      B|{200, 100, 150}|
|Dina|      F|{150, 150, 130}|
|Dedi|      B| {50, 100, 100}|
+----+-------+---------------+
root
 |-- nama: string (nullable = true)
 |-- jurusan: string (nullable = true)
 |-- nilai: struct (nullable = true)
 |    |-- nilai1: integer (nullable = true)
 |    |-- nilai2: integer (nullable = true)
 |    |-- nilai3: integer (nullable = true)

 

Wrapping Up

Nested schema dalam PySpark mengacu pada struktur data yang kompleks di dalam DataFrame. Schema "nested" ini terdiri dari struktur yang bersarang atau terdalam, seperti array, map, atau tipe data struktur lainnya, yang ada di dalam kolom DataFrame. Contohnya, suatu kolom dapat berisi array dari nilai-nilai, atau sebuah map yang menyimpan pasangan kunci-nilai yang lebih rumit. Nested schema memungkinkan representasi data yang lebih kompleks dan berstruktur, memungkinkan penyimpanan informasi yang lebih mendalam dalam kolom-kolom DataFrame. Hal ini sering digunakan untuk merepresentasikan data yang memiliki hierarki atau struktur yang lebih rumit, memfasilitasi analisis data yang lebih dalam dan kompleks dalam lingkungan PySpark.

Notebook untuk tutorial ini dapat diakses di sini

Artikel sebelumnya