How we monitor thousands of Spark data pipelines (TH version)

Thanan Sapthamrong
Life@LINE MAN Wongnai
8 min readApr 25, 2023

--

สวัสดีครับ เราชื่ออาร์ท ตอนนี้เป็น Senior Data Platform Engineer อยู่ที่ LINE MAN Wongnai วันนี้เราจะมาเล่าว่าเรา monitor Spark pipeline กันยังไง แต่ก่อนอื่นขอแนะนำตำแหน่งตัวเองซักนิดนึง ….. ต้องเท้าความก่อนว่าเมื่อก่อน Data Engineer คือคนที่ดูแลทุกอย่างตั้งแต่ Data Pipeline, Data Infrastructure, Apache Airflow, Presto Cluster, Monitoring Tools, และอื่น ๆ อีกมากมาย ซึ่งมันเยอะมากจนทำให้การดูแลของเหล่านี้มันไม่ทั่วถึง ด้วยเหตุผลนี้บริษัทเลยเปิดตำแหน่งใหม่ขึ้นมาเป็น Data Platform Engineer ที่จะมาเข้ามาดู​ data platform โดยเฉพาะเลย หน้าที่หลักคือคล้าย ๆ Site Reliability Engineer แต่จะดูแค่ data platform and related services เท่านั้น และก็เราจะต้องทำยังไงก็ได้ให้ platform ไม่ล่ม น่าเชื่อถือ และทำ improvement ใน platform ให้ดียิ่งขึ้น วันนี้เราขอหยิบหนึ่งตัวอย่างของ improvement มาให้ดูกัน

Overview of Batch Data Pipelines

ขอเกริ่นถึงภาพรวมในปัจจุบันของ data pipeline ก่อน ตอนนี้เราเลือกใช้ Apache Airflow เป็น workflow orchestrator ตัวหลักในการช่วยทำ data pipeline ซึ่งตอนนี้เรามี Airflow ทั้งหมด 3 ตัว ได้แก่

  1. DAP Airflow เป็น Airflow ที่ Data Engineer กับ Data Scientist ใช้ทำ ETL pipeline จากฐานข้อมูลต่าง ๆ ในบริษัท ใช้ดึงข้อมูลจาก third party api ใช้ประมวลผลไฟล์จาก cloud object storage จาก GCP และ AWS รวมถึงการดึงข้อมูลจาก Google Sheets ด้วย นอกเหนือจากการทำ ETL จากข้อมูลภายนอก เรามี pipeline ที่ซัพพอร์ตการทำ precomputed table อีกด้วย (table ที่ aggregate ข้อมูลจากหลาย ๆ table เข้าด้วยกัน)
  2. DS Airflow เป็น Airflow ที่ Data Scientist ใช้ในการ process data เพื่องานของ data scientist โดยเฉพาะ เช่น การทำ feature engineering, model training & prediction, หรือการทำ analysis ทั่ว ๆ ไป
  3. DBT Airflow เป็น Airflow ที่ Analytics Engineer ใช้ในการทำ precomputed table โดยเฉพาะ โดยหลังบ้านจะใช้ DBT (Data Build Tool) ในการช่วยจัดการ data modeling ทั้งหมด
Overall Batch Data Pipelines of LINE MAN Wongnai in 2023

จาก Airflow ทั้ง 3 ตัวนี้ เรามี DAG (Direct Acyclic Graph) ที่ใช้ Apache Spark ในการเข้ามาช่วยประมวลผลข้อมูลเป็นจำนวนทั้งสิ้น 83.9% จากจำนวน DAG ทั้งหมด หรือตีเป็น 386 DAGs ถ้านับเป็นจำนวน Task Instance ก็จะได้ทั้งหมด 2,560 Task Instances นั่นเอง พูดง่าย ๆ คือเรามี data pipelines ที่ทำงานด้วย Apache Spark ทั้งหมด 2,560 pipelines นั่นเอง

So, What’s the problem (s) ?

ดังนั้นเมื่อจำนวน pipeline มันมีมากขนาดนี้ การดูแล pipeline มันก็เริ่มยากขึ้นด้วย และถ้าเราดูแลไม่ดีหรือ take action ไม่ไวหรือไม่มีวิธีการ monitor มันก็กระทบไปถึงฝั่ง business ได้เลยในเวลาแค่ไม่กี่ชั่วโมง ตัวอย่างเช่น เรามี pipeline ที่เริ่มทำงานช่วงเที่ยงคืนเวลาไทย แล้วถ้ามี pipeline ซัก 1 ตัวทำงานช้ากว่าปกติหรือมีพังด้วยสาเหตุต่าง ๆ และมีอีกหลาย ๆ downstream pipeline ที่มารอข้อมูลจาก pipeline ตัวนี้เพื่อนำไปคำนวณต่อ ผลที่จะเกิดขึ้นก็คือ ข้อมูลจะถูกดีเลย์หรือไม่ก็ข้อมูลผิดไปเลย ซึ่งมันก็จะส่งผลกับฝั่ง business ที่แต่ละทีมก็จะ take action ได้ช้าหรือตัดสินใจผิดเพราะข้อมูลไม่ครบก็ได้

ดังนั้นเรามาดูภาพปัจจุบันกันต่อว่าสิ่งที่เกิดขึ้นใน Data Pipeline ระหว่าง Airflow กับ Spark มีอะไรเกิดขึ้นบ้าง โดยการทำงานคร่าว ๆ มีดังนี้

  1. Airflow ส่ง job เข้าไปหา Spark cluster ด้วย spark-submit cli หรือ Livy REST API
  2. Airflow รอ Spark รันไปเรื่อย ๆ
  3. พอ Spark รันเสร็จ ก็จะเก็บ log ต่าง ๆ กลับมา แต่ถ้าไม่เสร็จก็จะเก็บ error กลับมาแทน
Pipeline Flows Between Apache Airflow and Apache Spark

จาก flow นี้ ปัญหาจริง ๆ ที่เกิดขึ้นคือเราขาด Pipeline Observability เราไม่สามารถรับรู้ได้เลยว่า Spark pipeline ต่าง ๆ ทำงานดีไหม หรือแค่ทำงานได้ สิ่งที่เราจะรับรู้จาก Airflow จะมีแค่ error ที่ส่งเข้ามาใน Slack ว่า pipeline พัง แบบในรูปด้านล่าง พอปัญหาแบบนี้เกิดขึ้นปุ๊บ เราก็จะเข้าไปแก้ไขบัคหรือปรับ Spark Configuration ใน pipeline ให้มันทำงานต่อได้ แต่เราก็ยังไม่สามารถรับรู้ได้อยู่ดีว่า มี pipeline ไหนที่ใกล้จะมีปัญหาอีก หรือมี pipeline ไหนที่เราสามารถปรับปรุงเพื่อให้มันทำงานได้แบบไม่ต้องไปเสี่ยงให้มันเกิดปัญหาอีก หรือทำให้มันเร็วขึ้นกว่าเดิมได้ไหมเพื่อที่จะลดความเสี่ยงที่จะเกิดข้อมูลดีเลย์

Airflow Error when Spark Job Failed

ซึ่งการจะเข้าไปปรับปรุงให้ pipeline มันทำงานเร็วขึ้น เราจะต้องเข้าไปดูที่ Spark Web UI ของแต่ละ pipeline เท่านั้น และตัว Spark Web UI เองก็มี retention ถึงแค่ 14 วัน และอย่างที่ได้เล่ามาว่าเรามี pipeline จำนวนถึงหลักพัน ดังนั้นการจะเข้าไปหาว่า pipeline ไหนควรได้รับการปรับปรุงบ้างจึงเป็นเรื่องที่ใช้เวลาเยอะมาก

ซึ่งปัญหาหลัก ๆ ที่เราพบบน Spark pipeline โดนส่วนมากมีดังนี้ ได้แก่

Data Skew — ในเมื่อการทำงานของ Apache Spark มันทำงานกันเป็น cluster มี worker หลาย ๆ ตัวช่วยกันประมวลผลพร้อม ๆ กัน ถ้า worker ตัวใดตัวหนึ่งใช้เวลาทำงานเยอะกว่าเพื่อน แปลว่าเราอาจจะกำลังเจอปัญหา Data Skew เข้าให้แล้ว ดังตัวอย่างในรูป เรามี task ทั้งหมด 200 tasks ซึ่ง 199 tasks อาจจะทำงานเสร็จไปแล้วภายใน 10 นาที แต่มีอีก 1 task ที่ยังทำงานอยู่แล้วใช้เวลาไปแล้ว 47 ชั่วโมง

Data Skew in Apache Spark ref: https://www.clairvoyant.ai/blog/optimizing-the-skew-in-spark

Data Spill — เป็นปัญหาที่จะเกิดขึ้นเมื่อ execution memory ของ Spark worker ไม่พอ จนต้องใช้ memory ส่วนอื่น ๆ หรือถ้าไม่พออีก มันก็จะไปใช้ disk ในการเก็บ data แทน ซึ่งมันจะทำให้เสียเวลามากขึ้นในการที่ Spark จะไปเขียน/อ่านข้อมูลที่ disk อย่างที่เรารู้กันว่า RAM จะทำงานเร็วกว่า disk เสมอ ดังนั้นนี่จึงเป็นอีกหนึ่งปัญหาที่อาจจะทำให้ pipeline เราทำงานช้าลง

Memory Spill and Disk Spill in Spark Web UI

Over/Under Resource Utilization — พอเรามี job จำนวนมาก เราเองไม่มีทางรู้ได้เลยว่า job ไหนขอ CPU core กับ memory เยอะไปหรือน้อยไป บาง job ขอ CPU ไปเยอะมาก แต่จริง ๆ แล้วอาจจะใช้แค่ 10 CPU cores ก็เพียงพอต่อการรันงานให้จบภายในเวลาที่กำหนดแล้ว ซึ่งเราจะรู้ก็ต่อเมื่อไปดูในโค้ดของเรา หรือ Spark Web UI อีกทั้งการขอ resource เยอะมาก ๆ มันก็ส่งผลกับ capacity ที่มีอยู่ อาจจะส่งผลให้มันไม่พอจนเกิด job ดีเลย์

จากปัญหาที่เล่ามาจะเห็นว่ามันค่อนข้างกระทบกับ pipeline โดยตรง และจากที่ทีมได้เจอมา บาง pipeline ใช้เวลาถึง 2 ชั่วโมง แต่พอหลังเจอและแก้ไป ใช้เวลาราว ๆ 20 นาที ดังนั้นขอนำไปสู่วิธีการที่เราใช้ตรวจเจอปัญหาที่เล่ามาเลย

The Solution, Spark Listener

เราจะใช้ Spark Listener มาแก้ปัญหาที่ว่า ก่อนอื่นเลย Spark Listener คืออะไร

Spark Listener คือเป็น Developer API ของ Spark ตัวนึงที่โดยปกติแล้ว Spark จะส่ง callback event มาที่ Interface นี้ แล้วเราสามารถหยิบ event นี้ไปทำอะไรต่อก็ได้ ซึ่งตัว Spark Web UI เองก็ใช้ Spark Listener ในการช่วย export ข้อมูลต่าง ๆ ออกมาแสดงผลที่หน้า Web UI นั่นเอง

ดังนั้นเราจะใช้เจ้านี่แหละ ช่วย export ข้อมูลต่าง ๆ ไม่ว่าจะเป็น job นี้ใช้ Spark Configuration อะไรบ้าง job เริ่มกี่โมง มี memory / disk spill หรือ data skew ไหม หรือ job นี้เขียน data ไปทั้งหมดกี่ไบต์

ซึ่งตัว Spark Listener ที่เราทำขึ้นมาเขียนด้วยภาษา Java และจะโฟกัสที่การคำนวณ Spark Task กับ Spark Stage เพื่อให้ได้ข้อมูลที่ต้องการ กับขออธิบายคร่าว ๆ ก่อนว่า Job, Stage, และ Task ใน Spark คืออะไร

  1. Spark Job — ทุกครั้งที่เราเรียกคำสั่ง save, collect, count หรืออะไรที่ทำการเรียก result จาก Spark ตัว Spark Driver จะสร้าง Job ขึ้นมา ซึ่งแต่ละ Job สามารถถูกประมวลแบบ parallel ได้พร้อมกัน
  2. Spark Stage — ในหนึ่ง Spark Job สามารถประกอบไปด้วยหลาย ๆ Stage และในแต่ละ Stage จะเก็บ logic ของการ transformation ไว้ เช่น filter, join, sort, หรือทำการ exchange Spark partition กันใน Spark cluster
  3. Spark Task — เป็น unit ที่เล็กที่สุดใน Spark ที่จะอยู่ใน Spark Stage อีกที ซึ่ง unit นี้จะเป็นส่วนที่ Spark worker จะนำไปประมวลผลจริง ๆ แล้ว
Relationship of Spark Job, Spark Stage, and Stage Task (ref: https://stackoverflow.com/questions/42263270/what-is-the-concept-of-application-job-stage-and-task-in-spark)

การทำงานของ Spark Listener ตัวนี้จะเริ่มจากการไปเก็บข้อมูลตาม Spark event ต่าง ๆ นำไป pre-process และย่อยให้เหลือแต่สถิติของแต่ละ Spark Stage เช่น เราเก็บจำนวน memory / disk spill, Percentile (P50/P75/P90) ของเวลาที่ใช้ไปในแต่ละ Spark Task, จำนวน byte / record ที่เขียนลงไปที่ HDFS, Error ต่าง ๆ (ถ้าเกิด failed) รวมถึงคำนวณ skewness ของ Spark Task ในแต่ละ Stage ด้วย และพอ pipeline ทำงานเสร็จ เราก็จะให้ Spark Listener ของเราส่งข้อมูลทั้งหมดออกไปหา server ตัวนึงที่ชื่อว่า DTP Internal Server ผ่าน REST API

DTP Internal Server เป็นอีกหนึ่ง Service ที่ภายในทีมจะใช้เพื่อเก็บ log ต่าง ๆ ผ่าน REST API และพอมันได้รับ request มาจาก client มันจะทำการส่งข้อมูลขึ้น Apache Kafka อีกทีนึง และข้อมูลจะถูก sync ลง Hive table ทุก ๆ 2 ชั่วโมง

ขอย้อนกลับไปที่วิธีที่เราใช้คำนวณ skewness เราใช้ทั้งหมด 3 วิธีในการวัด skewness ของ Spark stage ได้แก่

  1. Standard Deviation — ถ้าเวลาที่ใช้ไปของ task นั่นเกิน 2*SD หรือ 3*SD ก็จะบอกว่ามัน skew
  2. Skewness Formula — ใช้สูตรทางสถิติเข้ามาช่วยเลย อันนี้สุดท้ายเราจะได้ค่ามาหนึ่งค่าเพื่อบอกว่า distribution ของเวลาที่ใช้ไปในแต่ละ Spark task เป็นอย่างไร
  3. Max(Spark Task Runtime) — P90(Spark Task Runtime) > 10 minutes — พูดง่าย ๆ คือ ถ้าเกิด runtime ของ task สุดท้ายมันห่างกับ runtime ที่ P90 เกิน 10 นาที เราก็จะบอกว่ามัน skew นั่นเอง

ที่เราเลือกใช้วิธีวัดทั้งหมด 3 วิธีเพราะเรายังไม่เห็น data จริง ๆ ว่าวิธีการไหนเหมาะสมสำหรับ data ที่เราเก็บมา เราเลยต้อง implement ทั้ง 3 วิธีไปก่อน และอย่างวิธีที่ 3 มันก็มีจุดบอดเช่นกัน ถ้าเกิด task ที่ P90 ใช้เวลาใกล้เคียงกับ task ที่ใช้เวลาเยอะสุด มันก็จะบอกว่าไม่ skew (ขอเคลมก่อนว่าผู้เขียนไม่ได้เชี่ยวชาญด้านสถิติมาก ถ้าเกิดผิดพลาดตรงนี้ ขออภัยไว้​ ณ ​ที่นี่ด้วย)

พอเรามีข้อมูลมาเก็บที่ Hive table แล้ว เราก็พร้อมที่จะ query ข้อมูลขึ้นมาแล้วเอาไปหา insight ต่าง ๆ รวมถึงสามารถนำไปคำนวณ SLO ได้ด้วย กับเราไม่ต้องห่วงเรื่อง retention ด้วย สามารถเก็บได้เป็นหลักปี (ใช้แค่ 1–3 เดือนก็เพียงพอต่อการใช้งานแล้ว)

Conceptual Flow of Our Spark Listener

เมื่อเรามี Spark Listener แล้ว เราจะเอาตัว Spark Listener ไปติดกับทุก ๆ pipeline ที่อยู่ตาม Airflow รวมถึงเราจะใส่ Spark Configuration เหล่านี้เพิ่มไปด้วย เพื่อเก็บไว้ใช้ทำ tracking กลับไปที่ตัว pipeline

  1. spark.dap.source — ใช้บอกที่มาของ Spark job นี้ว่าโดนส่งมาจาก data platform หรือ client ไหน
  2. spark.dap.platformEnv — ใช้บอก environment ของ data platform ที่ส่ง job เข้ามา
  3. spark.dap.appId — ใช้ระบุที่มาของ pipeline logic ซึ่งเรานำข้อมูลจาก Airflow macro มาใช้ และใช้เป็น format นี้ dag_id — task_id — run_id

ตัวอย่างใน spark-submit command ในการใช้ Spark Listener

spark-submit \
--conf \"spark.extraListeners=dap.LMWNSparkListener\" \
--conf \"spark.jars=dap-spark-listener-1.0.0.jar\" \
--conf \"spark.dap.source=dap-airflow\" \
--conf \"spark.dap.platformEnv=production\" \
--conf \"spark.dap.appId=wongnai_import--transform_and_update_wn_restaurant--scheduled__2023-04-13T02:00:00+00:00\" \
script.py

Let’s Get into Monitoring

มาถึงส่วนสุดท้ายแล้ว แล้วเรา monitor อะไรบ้าง อย่างแรกคือเราตั้ง SLO ขึ้นมาก่อนเลยโดย SLO ของเรามีทั้งหมด 4 ข้อ

  1. Runtime — มี pipeline จำนวนเท่าไหร่ที่มี runtime รันเกิดกว่า time limit ที่ตั้งไว้ ซึ่ง time limit เราคิดมาจาก cron ของ Airflow เช่น runtime ของ @hourly pipeline จะต้องไม่เกิน 70% ของ 1 ชั่วโมง หรือคิดเป็น 42 นาที หรือ runtime ของ 30 */2 * * * pipeline จะต้องไม่เกิน 1 ชั่วโมงเป็นต้น
  2. Skew — สำหรับ skew เราวัดมาจาก คำตอบที่ได้จาก skewness calculation จากทั้งหมด 3 วิธีที่ได้กล่าวไว้ และเอาคำตอบมารวมกัน ถ้ามีคำตอบใดคำตอบหนึ่งบอกว่ามัน skew เราก็จะนับว่า pipeline นี้ไม่ meet SLO (ทำคล้าย ๆ กับ ensemble method)
  3. Spill — อันนี้ค่อนข้างตรงไปตรงมา ถ้า pipeline ไหนมี spill ไม่ว่าจะเกิดที่ memory หรือ disk เราก็จะนับว่า pipeline นี้ไม่ meet SLO ทันที
  4. Failed Apps — SLO ตัวสุดท้ายคือ pipeline เรา failed เมื่อไหร่ ก็จะนับว่าไม่ meet SLO ทันที

และนี่คือตัวอย่าง SLO จะเห็นว่ามันแบ่งเป็นตาม tier อีก ใช่แล้ว ในทุก ๆ pipeline จะมีการระบุความสำคัญเป็น tier อยู่ ไล่ความสำคัญตั้งแต่ tier-1, tier-2, tier-3, tier-4, tier-no (ไม่มี tier) เราเลยเอา tier ตรงนี้มาช่วย grouping ในแต่ละ SLO ด้วย ดังนั้นเราจะสามารถให้ความสำคัญกับการทำ pipeline tuning ได้ดีขึ้น

SLOs of Spark Batch Data Pipelines

จากรูป SLO จะเห็นไว้ว่า Runtime เราค่อนข้างไม่เขียว และมี Spill ค่อนข้างเยอะมาก กลับกับในส่วนของ Skew จะเห็นว่ามันค่อนข้างจะเขียวเลย และก็เคสของ Failed Apps มันควรเป็น 100% ซึ่งหมายถึงไม่มี pipeline ไหนพังเลย แต่กลับมาที่โลกของความจริง มันก็ไม่ได้เป็นอย่างนั้น ดังนั้นคำถามคือ เราจะทำให้ตัวเลขเหล่านี้มันเพิ่มขึ้นยังไงได้บ้าง จึงเข้ามาสู่การ monitor อีกส่วนนึงคือเราสามารถดูเป็นราย pipeline ได้เลยว่า pipeline ไหนมีปัญหาบ้าง

ตัวอย่างแรกเลยคือเราทำ pivot table ที่ลิสต์ DAG ที่ใช้ runtime เกินกว่า time limit และเอามานับเป็นจำนวนครั้งต่อวัน จะเห็นว่ามีสีแดง ๆ อยู่อันนึงที่ runtime เกินตลอดเวลา (เป็น pipeline ที่ทำงานทุก ๆ 30 นาที) แต่เป็น pipeline tier-3 ซึ่งอาจจะไม่สำคัญมาก ยังปล่อยให้ delay ได้อยู่ (runtime จริงอาจจะใกล้กับรอบ DAG run มาก ๆ เช่น runtime 28–29 นาที) ถ้าเคสนี้เราก็ควรโฟกัสที่ pipeline tier-1 ก่อน เพราะ impact ต่อ business มากกว่า

Pivot Table of Pipelines Exceed Time Limit per Day

ตัวอย่างที่สองยังเป็น pivot table เหมือนเดิม แต่เป็น pivot table ที่โชว์ลิสต์ของ DAG กับ Task Instance ที่เกิดปัญหา spill ทั้ง memory และ disk

Pivot Table of Pipelines have the Spill Issue

ตัวอย่างที่สามเป็น pivot table ที่สรุป error จาก Spark ว่าในแต่ละ pipeline ที่พังมันพังด้วยเหตุผลอะไร และก็สามารถดูเพิ่มเติมได้ว่า error เต็ม ๆ คืออะไร และ มี YARN Application Id ให้สามารถกลับไป search หาใน Spark History Server ได้เพื่อการ debugging เพิ่มเติม

Pivot Table of Failure Reason Summary per DAG and Task Instance
A Table of Failure Reason

สำหรับ Skew pipeline เราสามารถดูได้เลยว่า Spark Stage ไหนที่มันเกิด Skew และสามารถย้อนกลับที่ไปดูที่ Spark Web UI ที่อยู่ใน Spark History Server ได้อีกด้วย

A Table of Pipelines that have Spark Skew Stages

นอกเหนือจากนี้เรายังมีกราฟที่จะช่วยเราหาอีกว่าตอนนี้มี pipeline ไหนบ้างที่ใช้ resource เยอะ ๆ แต่เขียน data ออกไปนิดเดียว ดังในรูปข้างล่างนี้ ดังในรูปจะเห็นว่ามี pipeline อีกหลาย ๆ ตัวเลยที่ใช้จำนวน Spark executor ถึง 100 ตัว แต่เขียน data ออกไปไม่ถึง 5 กิกะไบต์ด้วยซ้ำ ใช่ครับทุกคนฟังไม่ผิด มีจำนวนเยอะประมาณนึงเลย TT ซึ่งจริง ๆ แล้ว pipeline เหล่านี้อาจจะใช้แค่ Spark executor แค่ 5 ตัวก็เพียงพอแล้วก็ได้

Scatter Plot between the number of executors and written bytes

นอกเหนือจากนี้เรายังสามารถหา pipeline ที่เขียนไฟล์ ลง HDFS แบบไม่ optimal ได้ด้วย อย่างเช่น มี pipeline เขียนไฟล์ทั้งหมด 1 ไฟล์มีขนาด 2 กิกะไบต์ แบบนี้เรียกว่าไม่ optimal ซึ่งควรปรับให้เขียนทั้งหมด 8 ไฟล์แทน (1 ไฟล์ควรมีขนาดใกล้เคียงกับ HDFS Block Size ~256MB) ซึ่งตรงนี้ก็เป็นอีกหนึ่งจุดที่มักจะทำให้เกิดปัญหา spill นั่นเอง เพราะเราเขียนข้อมูลใหญ่เกินไปกว่าที่ execution memory ของ Spark worker จะรับไหว อีกทั้งยังทำให้ runtime สูงขึ้นด้วย เพราะ Spark รวบรวม data จากหลาย ๆ ที่เข้ามารวมที่เดียวกันก่อนเขียนไฟล์

List of DAGs and Task Instances that produce non-optimal file sizes

Example Results after We have Monitoring

เราลองมาดูตัวอย่างเคสจริงที่เราเจอจากการ monitoring และได้ทำการปรับปรุงแก้ไขไปแล้วกัน

เคสที่ 1 — เป็นเคสที่ pipeline ที่รันวันละครั้ง โดย pipeline จะรัน query ผ่าน SparkSQL เพื่อทำ precomputed table ซึ่ง pipeline นี้ย้ำว่าเป็น pipeline ที่รันผ่านทุกวันและไม่มีการ Retry แต่อย่างใด แต่มีปัญหา Data Skew เลยทำให้ runtime สูงเกินสองชั่วโมง ซึ่งเรามาตรวจเจอสิ่งนี้จาก pivot table ด้านบน เลยทำการปรับแก้ query โดยเปลี่ยน Join Strategy (เปลี่ยนโค้ดแค่ 1 บรรทัด) ไปใช้ Broadcast Hash Join แทน สุดท้าย runtime เลยเหลือไม่ถึง 25 นาที

Case#1 — Before and After We Have Monitoring

เคสที่ 2 — เคสนี้เป็น pipeline ที่จะทำงานทุก ๆ ชั่วโมง โดยที่ปัญหามันคือ มันเกิดการ retry ที่ Airflow หลาย ๆ รอบ (ไม่ได้เจอทุกรอบ) สาเหตุเป็นเพราะ Spark worker มี resource ไม่พอ และ pipeline นี้เขียนไฟล์ลง HDFS แบบไม่ optimal มันเลยทำให้ Spark worker ตายจน Airflow ต้อง retry ตัวเองใหม่เรื่อย ๆ ถึง 2–3 รอบ จนมันผ่านเอง ซึ่งที่ผ่านยังไม่มี error จาก pipeline นี้ส่งลงไปที่ Slack ของทีมเลยเพราะ Airflow ยัง retry ตัวเองไม่ถึง threshold ที่ตั้งไว้ หลังจากการปรับ tuning ไป โดยได้ปรับ resource และได้ทำการ repartition ใน Spark ไปเพื่อปรับจำนวนไฟล์ก่อนจะเขียนลง HDFS ผลที่ได้ก็คือไม่มี Airflow retry เกิดขึ้นอีกเลย

Case#2 — Before and After We Have Monitoring

Final Thought

มาถึงสรุปแล้ว ก็หลังจากที่ทีมเราได้มี Spark Listener เข้ามาช่วย export ข้อมูลและมี Dashboard / Graph ในการช่วยชี้เป้าปัญหาต่าง ๆ เราก็เริ่มทำการแก้ไขปัญหาได้อย่างตรงจุดมากขึ้น รวดเร็ว และขุดปัญหาใต้พรมต่าง ๆ ที่เราไม่เคยรู้ และก็ยังสามารถ support ทุก ๆ ทีม ทุก ๆ platform ที่ใช้ Apache Spark ได้ในเวลาเดียวกันอีกทั้งเราก็มี SLO ด้วยที่ได้มาจากข้อมูลตรงนี้ ทำให้เราสามารถมั่นใจได้ว่า pipeline ทั้งหมดของเรายังทำงานได้ดี หรือถ้ายังไม่ meet SLO ก็ต้องทำ improvement กันไป ฮ่า ๆ ซึ่งทั้งหมดมันก็ส่งผลให้ pipeline เรามีคุณภาพ มี observability ที่ดีขึ้น และถ้าฝั่ง pipeline ดีขึ้น มันก็ส่งผลไปถึงปลายทางฝั่ง business ด้วยเช่นกัน business ก็จะมั่นใจการใช้ข้อมูลประกอบการตัดสินใจมากขึ้น

สุดท้ายนี้ขอแถมอีกนิดนึง ขายของกันรัว ๆ ในปัจจุบัน pipeline ของทีม Data ที่ทำงานอยู่บน Apache Spark มีจำนวนรวมราว ๆ 8,000 pipeline runs ต่อวัน นับเป็นรายชั่วโมงคือราว ๆ 600 pipeline runs ต่อชั่วโมง และมีข้อมูลถูกเขียนลง HDFS เฉลี่ยแล้ว 16 Terabytes ต่อวัน หรือตีเป็น record ได้ 50 พันล้านแถว ก็ด้วย scale งานปัจจุบันเราจึงยังต้องการคนเก่ง ๆ เข้ามาร่วมทีมอีก ใครสนใจเข้ามาร่วมงานด้วยกันไม่ว่าจะเป็น Data Engineer ที่จะเข้ามาพัฒนา pipeline เหล่านี้ให้ดีขึ้น หรือ Data Platform Engineer ที่จะเข้ามาทำให้ data platform ของ LINE MAN Wongnai เสถียรขึ้น scalable มากขึ้น (เราไม่ได้มีแต่ ETL Infrastructure อย่างเดียวที่ต้องดูแล ยังมีอีกหลาย services ที่ยังไม่ได้พูดถึงในนี้ต้องดูแลอีกมากมาย) หรืออยากเข้ามาซัพพอร์ตฝั่ง Business Intelligence มาเขียน query เพื่อหา insight ของธุรกิจก็สามารถส่งใบสมัครงานเข้ามาได้ที่ https://careers.lmwn.com/data-and-analytics

ขอบคุณครับ

--

--