Belajar PySpark - Menyimpan File csv dari DataFrame

belajar-pyspark-menyimpan-file-csv

Setelah sebelumnya kita mempelajari bagaimana membaca file csv ke DataFrame, maka dalam tutorial ini kita akan membahas mengenai bagaimana menyimpan DataFrame ke dalam file csv dengan object DataFrameWriter, beserta parameter yang dapat digunakan.

Dalam pyspark, ada beberapa cara untuk menulis data ke dalam csv file, yaitu :

  • Menggunakan fungsi write.csv()
  • Menggunakan fungsi write.format()

Keduanya menggunakan object pyspark.sql.DataFrameWriter yang terdapat dalam object DataFrame. Object ini diakses menggunakan interface DataFrame.write

Sebelumnya kita persiapkan terlebih dahulu environment dan data yang akan digunakan

Jika menggunakan google colab, maka install library pyspark dengan perintah pip.

%pip install pyspark

 

Import package yang akan digunakan dan inisialisasi SparkSession

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("Belajar PySpark - Menulis file csv").getOrCreate()

 

Persiapkan DataFrame yang akan kita gunakan

data = [['Agus','Fisika','Umum',150],['Windy','Fisika','Khusus',200],
        ['Budi','Biologi','Umum',170],['Dina','Fisika','Khusus',180],
        ['Bayu','Fisika','Umum',160],['Dedi','Biologi','Khusus',185]]

kolom = ["nama","jurusan","jalur","nilai"]

df = spark.createDataFrame(data, kolom)

df.show()

Output yang dihasilkan:

+-----+-------+------+-----+
| nama|jurusan| jalur|nilai|
+-----+-------+------+-----+
| Agus| Fisika|  Umum|  150|
|Windy| Fisika|Khusus|  200|
| Budi|Biologi|  Umum|  170|
| Dina| Fisika|Khusus|  180|
| Bayu| Fisika|  Umum|  160|
| Dedi|Biologi|Khusus|  185|
+-----+-------+------+-----+

 

Menggunakan fungsi write.csv()

Berbeda dengan Pandas yang menyimpan DataFrame ke dalam sebuah file csv, pyspark menyimpan DataFrame ke dalam beberapa file csv di dalam sebuah direktori.


Menyimpan file csv tanpa header   

Ketika menggunakan fungsi df.write.csv(), maka semua opsi dapat dikirim sebagai parameter dalam fungsi tersebut. Salah satunya adalah parameter nama atau path direktori di mana file akan disimpan. Misalnya DataFrame dalam contoh di atas akan disimpan di direktori "mahasiswa"

df.write.csv("mahasiswa")

 

Dalam direktori tersebut akan dibentuk file-file dengan jumlah sesuai dengan jumlah partisi DataFrame yang bersangkutan. Kita bisa menampilkan jumlah partisi dalam sebuah DataFrame dengan perintah DataFrame.rdd.getNumPartitions().

df.rdd.getNumPartitions()
Output :

2

 

Jika kita list outputnya, akan tampak file-file dengan jumlah yang sama dengan jumlah partisi

!ls -l mahasiswa
total 8
-rw-r--r-- 1 root root 67 Feb 26 07:15 part-00000-df143a52-762b-477a-b09d-de93827bcf2e-c000.csv
-rw-r--r-- 1 root root 68 Feb 26 07:15 part-00001-df143a52-762b-477a-b09d-de93827bcf2e-c000.csv
-rw-r--r-- 1 root root  0 Feb 26 07:15 _SUCCESS

 

Kita bisa melihat isinya dengan perintah cat atau head. Jika data yang disimpan kecil, kita bisa menampilkan semuanya dengan perintah cat *

!cat mahasiswa/*
Output :

Agus,Fisika,Umum,150
Windy,Fisika,Khusus,200
Budi,Biologi,Umum,170
Dina,Fisika,Khusus,180
Bayu,Fisika,Umum,160
Dedi,Biologi,Khusus,185

 

Menyimpan file csv dengan header

Secara default file csv akan disimpan tanpa header atau baris pertama yang berisi nama kolom. Jika kita ingin menyimpan file csv dengan header, maka kita set parameter header = True.

df.write.csv("mahasiswa_header", header=True)

 

Perhatikan bahwa karena pyspark membentuk lebih dari satu file csv, maka masing-masing file tersebut akan disimpan dengan header.

!ls -l mahasiswa_header
total 8
-rw-r--r-- 1 root root 92 Feb 26 13:55 part-00000-941fd598-218d-4dfd-81bd-e7844b796ace-c000.csv
-rw-r--r-- 1 root root 93 Feb 26 13:55 part-00001-941fd598-218d-4dfd-81bd-e7844b796ace-c000.csv
-rw-r--r-- 1 root root  0 Feb 26 13:55 _SUCCESS


Sehingga jika kita concat/gabungkan semua hasilnya, akan muncul multiple header seperti berikut ini

!cat mahasiswa_header/*
Output :

nama,jurusan,jalur,nilai
Agus,Fisika,Umum,150
Windy,Fisika,Khusus,200
Budi,Biologi,Umum,170
nama,jurusan,jalur,nilai
Dina,Fisika,Khusus,180
Bayu,Fisika,Umum,160
Dedi,Biologi,Khusus,185

 

Menyimpan file csv dengan delimiter selain koma

Ada kalanya kita perlu menyimpan DataFrame ke dalam file csv dengan delimiter selain koma. Misalnya jika dalam data kita terdapat kolom bertipe string yang mengandung karakter koma. Kita bisa memilih karakter yang dianggap ‘aman’, seperti misalnya | (pipe).

Untuk menyimpan file dengan delimiter ‘|’, kita gunakan parameter separator="|" atau sep="|".
 

df.write.csv("mahasiswa_delim", sep="|")

 

Hasilnya berupa file teks dengan delimiter '|'

!cat mahasiswa_delim/*
Output :

Agus|Fisika|Umum|150
Windy|Fisika|Khusus|200
Budi|Biologi|Umum|170
Dina|Fisika|Khusus|180
Bayu|Fisika|Umum|160
Dedi|Biologi|Khusus|185

 

Menggunakan fungsi write.option()

Selain mengirimkan parameter melalui fungsi write.csv(), kita juga dapat menggunakan fungsi write.option(), dengan sintaks option(“param_name”,value)

Untuk lebih dari 1 parameter, gunakan rangkaian fungsi option seperti berikut ini :


df.write.option(“param_name1”,value1).option(“param_name2”,value2).csv(“nama_direktori”)

Perlu diingat bahwa rangkaian fungsi option() harus muncul di depan fungsi csv(). Misalnya jika kita ingin menyimpan DataFrame dengan header dan delimiter = “|”

df.write.option("sep","|") \
    .option("header", True) \
    .csv("mahasiswa_delim")

 

Menggunakan fungsi write.format()

Selain fungsi write.csv(), kita juga dapat menggunakan fungsi write.format(“csv”). Fungsi ini digunakan bersama dengan fungsi write.option() untuk menentukan parameternya, dan write.save(“dir_name”) untuk menentukan path dan nama direktorinya.

df.write.format("csv").save("mahasiswa_format")

 

Untuk menyimpan dengan header dan delimiter “|”, kita gunakan fungsi write.option(). Perlu diperhatikan bahwa rangkaian option() harus diletakkan sebelum save()

df.write.format("csv") \
    .option("header",True) \
    .option("sep","|") \
    .save("mahasiswa_format_delim")

 

!cat mahasiswa_format_delim/*
Output :

nama|jurusan|jalur|nilai
Agus|Fisika|Umum|150
Windy|Fisika|Khusus|200
Budi|Biologi|Umum|170
nama|jurusan|jalur|nilai
Dina|Fisika|Khusus|180
Bayu|Fisika|Umum|160
Dedi|Biologi|Khusus|185

 

Mode penulisan file csv : error, append, overwrite, ignore


Ada beberapa pilihan mode penulisan DataFrame ke file, yaitu :

  • error : mengembalikan pesan error jika direktori sudah ada sebelumnya
  • append : membuat file baru ke dalam direktori yang sudah ada (atau ke direktori baru, jika belum ada)
  • overwrite : jika direktori sudah ada, file lama akan dihapus dan diganti dengan data baru
  • ignore : jika direktori sudah ada, tidak dilakukan apa-apa. Jika belum, tulis data

Secara default pyspark menggunakan mode error. Untuk menggunakan mode yang lain, gunakan parameter mode.

Misalnya kita akan menulis data berikut ini ke direktori-direktori yang kita tulis di contoh sebelumnya

data_new = [['Citra','Fisika','Umum',170],
            ['Jaka','Biologi','Khusus',180]]
kolom = ["nama","jurusan","jalur","nilai"]

df_new = spark.createDataFrame(data_new, kolom)

df_new.show()
Output :

+-----+-------+------+-----+
| nama|jurusan| jalur|nilai|
+-----+-------+------+-----+
|Citra| Fisika|  Umum|  170|
| Jaka|Biologi|Khusus|  180|
+-----+-------+------+-----+

 

Menggunakan mode append dan fungsi write.csv

Jika menggunakan fungsi write.csv, kita dapat mengirimkan mode sebagai salah satu parameter dalam fungsi.

df_new.write.csv("mahasiswa_header", header=True, mode="append")

 

Terlihat bahwa pyspark membuat 2 file baru ke dalam direktori mahasiswa_header yang sebelumnya sudah ada.

!ls -l mahasiswa_header

total 16
-rw-r--r-- 1 root root 47 Feb 26 14:36 part-00000-5516c025-30a3-4228-9bd9-509da5a5e664-c000.csv
-rw-r--r-- 1 root root 92 Feb 26 13:55 part-00000-941fd598-218d-4dfd-81bd-e7844b796ace-c000.csv
-rw-r--r-- 1 root root 49 Feb 26 14:36 part-00001-5516c025-30a3-4228-9bd9-509da5a5e664-c000.csv
-rw-r--r-- 1 root root 93 Feb 26 13:55 part-00001-941fd598-218d-4dfd-81bd-e7844b796ace-c000.csv
-rw-r--r-- 1 root root  0 Feb 26 14:36 _SUCCESS

 

Datanya merupakan gabungan data yang lama dan data baru. Karena keduanya kita simpan dengan header=True, maka terdapat 4 header dalam gabungan outputnya, sesuai dengan jumlah file yang terbentuk.

!cat mahasiswa_header/*
Output :

nama,jurusan,jalur,nilai
Citra,Fisika,Umum,170
nama,jurusan,jalur,nilai
Agus,Fisika,Umum,150
Windy,Fisika,Khusus,200
Budi,Biologi,Umum,170
nama,jurusan,jalur,nilai
Jaka,Biologi,Khusus,180
nama,jurusan,jalur,nilai
Dina,Fisika,Khusus,180
Bayu,Fisika,Umum,160
Dedi,Biologi,Khusus,185

 

Menggunakan mode overwrite dan fungsi write.format

Pada mode overwrite, terlihat bahwa file lama akan dihapus dan digantikan dengan file yang baru :

df_new.write.format("csv") \
    .option("header",True) \
    .mode("overwrite") \
    .save("mahasiswa_header")

File yang terbentuk hanya 2, yaitu file dari DataFrame yang baru. Sedangkan data yang lama dihapus.
 

!ls -l mahasiswa_header
total 8
-rw-r--r-- 1 root root 47 Feb 26 14:41 part-00000-10bbca3c-38d7-42c2-8ba2-a244d01b82fe-c000.csv
-rw-r--r-- 1 root root 49 Feb 26 14:41 part-00001-10bbca3c-38d7-42c2-8ba2-a244d01b82fe-c000.csv
-rw-r--r-- 1 root root  0 Feb 26 14:41 _SUCCESS

 

Kontennya adalah

!cat mahasiswa_header/*
Output :

nama,jurusan,jalur,nilai
Citra,Fisika,Umum,170
nama,jurusan,jalur,nilai
Jaka,Biologi,Khusus,180


Menyimpan dengan kompresi

Ketika kita memproses data teks berukuran besar, seringkali akan lebih efisien jika file tersebut disimpan secara terkompresi. Untuk menyimpan file secara terkompresi, kita gunakan parameter compression. Pyspark mendukung berbagai metode kompresi, di antaranya yaitu bzip2, gzip, lz4, dan snappy.

df.write.csv("mahasiswa_gzip", header=True, compression="gzip")

 

Hasilnya berupa file-file csv yang terkompresi, sejumlah partisi DataFrame

!ls -ls mahasiswa_gzip
Output:

total 8
4 -rw-r--r-- 1 root root 99 Feb 27 03:45 part-00000-333b118a-afcb-4480-bde1-f595afc1bc64-c000.csv.gz
4 -rw-r--r-- 1 root root 98 Feb 27 03:45 part-00001-333b118a-afcb-4480-bde1-f595afc1bc64-c000.csv.gz
0 -rw-r--r-- 1 root root  0 Feb 27 03:45 _SUCCESS


Menyimpan DataFrame ke dalam 1 file csv

Jika DataFrame berukuran kecil, kita mungkin ingin menyimpannya ke dalam satu file csv agar informasi headernya tidak berulang.

Untuk menghindari spark memecah file, kita dapat menyatukan terlebih dahulu DataFrame ke dalam 1 partisi menggunakan fungsi coalesce(1), baru kemudian menuliskan ke dalam file.

df.coalesce(1) \
    .write.format("csv") \
    .option("header",True) \
    .save("mahasiswa_coalesce")

Output file dalam direktori

!ls -l mahasiswa_coalesce
total 4
-rw-r--r-- 1 root root 160 Feb 27 03:45 part-00000-50a23307-3e38-4ba3-ac22-28b52b04f4f3-c000.csv
-rw-r--r-- 1 root root   0 Feb 27 03:45 _SUCCESS

Dengan cara ini, kita dapat menyimpan file beserta header tanpa terjadi duplikasi header.
!cat mahasiswa_coalesce/*
Output:

nama,jurusan,jalur,nilai
Agus,Fisika,Umum,150
Windy,Fisika,Khusus,200
Budi,Biologi,Umum,170
Dina,Fisika,Khusus,180
Bayu,Fisika,Umum,160
Dedi,Biologi,Khusus,185

Perlu diingat bahwa perintah coalesce(1) mengumpulkan seluruh data ke dalam satu partisi, sehingga hal ini sebaiknya dihindari jika data yang akan disimpan berukuran sangat besar.


Parameter lain dalam DataFrameWriter

Selain parameter yang sudah dibahas di atas, yaitu path, separator, header, mode, dan compression, ada beberapa parameter lain yang dapat digunakan dalam penyimpanan data di pyspark. Beberapa di antaranya yaitu :

  • nullValue: menentukan representasi string dari nilai null dalam file output.
  • escape: menentukan karakter escape yang akan digunakan saat menulis data dalam format teks
  • quote: menentukan karakter quote yang akan digunakan saat menulis data teks
  • dateFormat: menentukan format tanggal yang akan digunakan
  • timestampFormat: menentukan format timestamp yang akan digunakan


Wrapping Up

Pyspark menyediakan berbagai opsi/parameter untuk menyimpan DataFrame ke dalam file csv. Parameter tersebut dapat digunakan untuk menentukan header, delimiter, kompresi, dan lain-lain.

Pyspark menulis file output ke dalam satu direktori, yang terdiri dari beberapa file, tergantung pada partisinya. Untuk menuliskan ke dalam 1 file, kita dapat menggabungkan DataFrame ke dalam satu partisi. Hal ini tidak disarankan untuk data yang berukuran besar.

Notebook untuk artikel ini dapat diakses di sini

Simak juga artikel-artikel sebelumnya