Apache Spark: budowa niezawodnego pipeline'u do generowania plików CSV

W ostatnim czasie Apache Spark stał się jednym z wiodących silników do rozproszonego przetwarzania i analizy danych, działający bezproblemowo zarówno w chmurze, jak i w środowiskach lokalnych. Za sprawą wysoce skalowalnej architektura oraz możliwości przetwarzania danych w pamięci stał się kluczowym narzędziem do obsługi dużych zbiorów danych w czasie rzeczywistym, umożliwiając użytkownikom uzyskiwanie wniosków w niespotykanym dotąd tempie.

Choć Spark jest powszechnie ceniony za wykonywanie złożonych zadań, takich jak uczenie maszynowe na dużą skalę, przetwarzanie grafów czy strumieniowanie danych w czasie rzeczywistym, świetnie sprawdza się również w bardziej rutynowych operacjach. Od budowy ETL'i i czyszczenia danych po raportowanie i bieżące analizy – wszechstronność Sparka sprawia, że radzi sobie z zadaniami o różnym poziomie skomplikowania.

Poniżej chciałbym przedstawić przykład wykorzystania Apache Sparka do stworzenia potoku przetwarzania danych, którego celem jest generowanie plików CSV zawierających informacje o cenach i produktach dla firm z branży detalicznej.

Proces ten zakłada, że finalny wynik będzie wymagał integracji danych z dwóch różnych źródeł. Spark obsługuje odczyt danych z wielu formatów, w tym CSV, JSON, Parquet i plików tekstowych, a także umożliwia dostęp do danych ustrukturyzowanych i częściowo ustrukturyzowanych przechowywanych w relacyjnych bazach danych, tabelach Hive oraz systemach NoSQL, takich jak MongoDB czy Cassandra. API DataFrame w Sparku upraszcza pobieranie danych, oferując ujednolicony interfejs do obsługi tych różnorodnych źródeł danych, co umożliwia sprawne budowanie przepływów typu ETL (extract, transform, load).

W naszym przykładzie wcielimy się w rolę magazynu, który przygotowuje pliki produktowe i cenniki dla wielu detalistów. Załóżmy, że dane wejściowe będą pochodziły z tabeli relacyjnej PostgreSQL o nazwie "retailers" oraz z dwóch tabel Hive – "product_pricing" i "product_information", przechowywanych w klastrze Hadoop.

Spark potrafi bezproblemowo korzystać z danych zawartych w tabelach Hive'owych, pod warunkiem, że SparkSession włączona jest obsługa Hive i poprawnie skonfigurowano Hive metastore. Wówczas możemy utworzyć DataFrame z zawartości wybranej tabeli w sposób następujący:

By dostać się do danych z baz relacyjnych, takich jak PostgreSQL, możemy wykorzystać metodę jdbc. Zakładając, że sterownik JDBC dla PostgreSQL jest dostępny, możemy skonfigurować właściwości połączenia w następujący sposób:

Następnie możemy utworzyć dataframe z danymi z tabelki:

W tych kilku prostych krokach mogliśmy otrzymać dane wymagane do wygenerowania pików wyjściowych.

Mając już załadowane i przygotowane dane, możemy w pełni wykorzystać możliwości Apache Spark, a w szczególności Spark SQL, do zbudowania finalnego zbioru danych. Dzięki rozproszonej i równoległej architekturze przetwarzania Spark pozwala efektywnie wykonywać złożone transformacje, agregacje oraz łączenia na dużych zbiorach danych.

W naszym przykładzie najpierw wykonamy proste łączenie (join) pomiędzy "productPricingDF" a "productInformationDF", aby utworzyć DataFrame zawierający wszystkie niezbędne informacje produktowe potrzebne do przygotowania pliku.

Na początek nadamy aliasy łączonym tabelom, aby uniknąć niejednoznaczności:

To pozwoli nam wykonać joina między DataFrame'ami:

Następnie będziemy musieli wykonać cross join pomiedzy tym DataFramem i naszym Dataframem "retailers" by uzyskać zbiór w którym będzie zawarte zarówno "retailer_id" oraz "product_id" przyporządkowane każdemu kupcowi:

To prowadzi nas do ostatniego etapu potoku przetwarzania, w którym możemy wykorzystać potężne możliwości partycjonowania Apache Sparka, aby efektywnie wygenerować osobny plik CSV dla każdego detalisty. Dzięki użyciu metody partitionBy podczas zapisu danych Spark automatycznie organizuje dane wyjściowe w oddzielne katalogi, w oparciu o wskazaną kolumnę partycjonującą – w naszym przypadku będzie to "retailer_id".

Takie podejście nie tylko upraszcza dalsze przetwarzanie danych, ale także zwiększa wydajność zapytań podczas późniejszego odczytu, ponieważ dane każdego detalisty są przechowywane w osobnej partycji. Dodatkowo, rozproszona architektura Sparka sprawia, że proces partycjonowania jest zarówno skalowalny, jak i wydajny, nawet w przypadku pracy z bardzo dużymi zbiorami danych, dzięki równomiernemu rozłożeniu obciążenia między węzły klastra. Powstałe w ten sposób pliki są dobrze zorganizowane, co czyni je idealnymi do bezpośredniego wykorzystania w raportowaniu, analizach lub do dalszej integracji z innymi systemami.