logo

PySparkSQL

Apache Spark to najbardziej udane oprogramowanie Apache Software Foundation zaprojektowane z myślą o szybkich obliczeniach. Kilka branż korzysta z Apache Spark w celu znalezienia swoich rozwiązań. PySpark SQL to moduł w platformie Spark, który integruje przetwarzanie relacyjne z funkcjonalnym interfejsem API programowania platformy Spark. Możemy wyodrębnić dane za pomocą języka zapytań SQL. Możemy używać zapytań identycznie jak w języku SQL.

Jeśli masz podstawową wiedzę na temat RDBMS, PySpark SQL będzie łatwy w użyciu, gdzie możesz rozszerzyć ograniczenia tradycyjnego relacyjnego przetwarzania danych. Spark obsługuje również język zapytań Hive, ale istnieją ograniczenia bazy danych Hive. Spark SQL został opracowany w celu usunięcia wad bazy danych Hive. Przyjrzyjmy się następującym wadom Hive:

Wady Hive

  • Nie może wznowić przetwarzania, co oznacza, że ​​jeśli wykonanie nie powiedzie się w środku przepływu pracy, nie będzie można wznowić od miejsca, w którym utknęło.
  • Nie możemy usuwać zaszyfrowanych baz danych kaskadowo, gdy włączony jest kosz. Prowadzi to do błędu wykonania. Aby usunąć tego typu bazę danych, użytkownicy muszą skorzystać z opcji Usuń.
  • Zapytania ad-hoc są wykonywane przy użyciu narzędzia MapReduce, które jest uruchamiane przez Hive, ale gdy analizujemy bazę danych średniej wielkości, opóźnia to działanie.
  • Hive nie obsługuje operacji aktualizacji ani usuwania.
  • Ogranicza się do obsługi podzapytań.

Te wady są powodem opracowania Apache SQL.

Krótkie wprowadzenie do PySpark SQL

PySpark obsługuje zintegrowane przetwarzanie relacyjne za pomocą programowania funkcjonalnego Spark. Zapewnia obsługę różnych źródeł danych, umożliwiając łączenie zapytań SQL z transformacjami kodu, tworząc w ten sposób bardzo potężne narzędzie.

PySpark SQL ustanawia połączenie pomiędzy RDD a tabelą relacyjną. Zapewnia znacznie bliższą integrację przetwarzania relacyjnego i proceduralnego poprzez deklaratywne API Dataframe, które jest zintegrowane z kodem Spark.

Dzięki SQL może być łatwo dostępny dla większej liczby użytkowników i poprawiać optymalizację dla obecnych. Obsługuje także szeroką gamę źródeł danych i algorytmów w Big-Data.

Funkcja PySpark SQL

Poniżej podano funkcje PySpark SQL:

gry za pomocą wiadomości tekstowych na Androida

1) Dostęp do danych dotyczących spójności

Zapewnia spójny dostęp do danych, co oznacza, że ​​SQL obsługuje wspólny sposób dostępu do różnych źródeł danych, takich jak Hive, Avro, Parquet, JSON i JDBC. Odgrywa znaczącą rolę w dostosowaniu wszystkich istniejących użytkowników do Spark SQL.

2) Włączenie do Spark

Zapytania SQL PySpark są zintegrowane z programami Spark. Zapytań możemy używać wewnątrz programów Spark.

Jedną z jego największych zalet jest to, że programiści nie muszą ręcznie zarządzać awariami stanu ani synchronizować aplikacji z zadaniami wsadowymi.

3) Standardowa łączność

Zapewnia połączenie poprzez JDBC lub ODBC i te dwa standardy stanowią branżowe standardy łączności dla narzędzi Business Intelligence.

4) Funkcje zdefiniowane przez użytkownika

PySpark SQL ma połączoną funkcję zdefiniowaną przez użytkownika (UDF). UDF służy do definiowania nowej funkcji opartej na kolumnach, która rozszerza słownictwo DSL Spark SQL na potrzeby przekształcania DataFrame.

5) Zgodność z ulem

PySpark SQL uruchamia niezmodyfikowane zapytania Hive na bieżących danych. Umożliwia pełną kompatybilność z aktualnymi danymi Hive.

Moduł SQL PySpark

Oto kilka ważnych klas Spark SQL i DataFrames:

    pyspark.sql.SparkSession:Stanowi główny punkt wejścia dla Ramka danych i funkcjonalność SQL.pyspark.sql.DataFrame:Reprezentuje rozproszony zbiór danych pogrupowanych w nazwane kolumny.Kolumna pyspark.sql.:Reprezentuje wyrażenie kolumnowe w a Ramka danych. pyspark.sql.Wiersz:Reprezentuje wiersz danych w a Ramka danych. pyspark.sql.GroupedData:Metody agregacji zwracane przez DataFrame.groupBy(). Funkcje pyspark.sql.DataFrameNa:Reprezentuje metody obsługi brakujących danych (wartości null).Funkcje pyspark.sql.DataFrameStat:Reprezentuje metody funkcjonalności statystyki.Funkcje pysark.sql.:Reprezentuje listę wbudowanych funkcji dostępnych dla Ramka danych. typy pyspark.sql.:Reprezentuje listę dostępnych typów danych.pyspark.sql.Window:Służy do pracy z funkcjami okna.

Rozważmy następujący przykład PySpark SQL.

 import findspark findspark.init() import pyspark # only run after findspark.init() from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df = spark.sql('''select 'spark' as hello ''') df.show() 

Wyjście:

 +-----+ |hello| +-----+ |spark| +-----+ 

Wyjaśnienie kodu:

W powyższym kodzie zaimportowaliśmy plik znajdź iskrę moduł i wywołany findspark.init() konstruktor; następnie zaimportowaliśmy moduł SparkSession, aby utworzyć sesję Spark.

z pyspark.sql zaimportuj SparkSession

Sesja iskrowa może służyć do tworzenia interfejsu API zestawu danych i ramki DataFrame. Sesji SparkSession można także używać do tworzenia ramki DataFrame, rejestrowania ramki DataFrame jako tabeli, wykonywania kodu SQL na tabelach, tabeli pamięci podręcznej i odczytywania pliku parkietu.

budowniczy klas

Jest konstruktorem sesji Spark.

getOrCreate()

Służy do uzyskania istniejącego sesja Spark, lub jeśli nie istnieje, utwórz nowy w oparciu o opcje ustawione w kreatorze.

Kilka innych metod

Oto kilka metod PySpark SQL:

1. nazwa aplikacji(nazwa)

Służy do ustawienia nazwy aplikacji, która będzie wyświetlana w interfejsie internetowym Spark. Parametr nazwa akceptuje nazwę parametru.

2. config(key=Brak, wartość = Brak, conf = Brak)

Służy do ustawiania opcji konfiguracyjnych. Opcje ustawione przy użyciu tej metody są automatycznie propagowane do obu SparkConf I Sesja Spark konfiguracja.

 from pyspark.conf import SparkConfSparkSession.builder.config(conf=SparkConf()) 

Parametry:

    klucz-Ciąg nazwy klucza właściwości konfiguracyjnej.wartość-Reprezentuje wartość właściwości konfiguracyjnej.konf -Instancja SparkConf.

3. mistrz(mistrz)

Ustawia adres URL mastera Sparka, z którym można się połączyć, np. „local”, aby działać lokalnie, „local[4]”, aby działać lokalnie z 4 rdzeniami.

Parametry:

    gospodarz:adres URL mistrza iskier.

4. Katalog sesji SparkSession

Jest to interfejs, za pomocą którego użytkownik może tworzyć, usuwać, zmieniać lub wysyłać zapytania do baz danych, tabel, funkcji itp.

5. SparkSession.conf

Jest to interfejs konfiguracyjny środowiska uruchomieniowego dla Sparka. Jest to interfejs, za pomocą którego użytkownik może uzyskać i ustawić wszystkie konfiguracje platformy Spark i Hadoop istotne dla platformy Spark SQL.

klasa pyspark.sql.DataFrame

Jest to rozproszony zbiór danych pogrupowanych w nazwane kolumny. DataFrame jest podobna do tabeli relacyjnej w Spark SQL i może być utworzona przy użyciu różnych funkcji w SQLContext.

 student = sqlContext.read.csv('...') 

Po utworzeniu ramki danych możemy nią manipulować, korzystając z kilku języków specyficznych dla domeny (DSL), które są predefiniowanymi funkcjami DataFrame. Rozważ następujący przykład.

 # To create DataFrame using SQLContext student = sqlContext.read.parquet('...') department = sqlContext.read.parquet('...') student.filter(marks > 55).join(department, student.student_Id == department.id)  .groupBy(student.name, 'gender').({'name': 'student_Id', 'mark': 'department'}) 

Rozważmy następujący przykład:

Zapytania przy użyciu Spark SQL

W poniższym kodzie najpierw tworzymy DataFrame i wykonujemy zapytania SQL w celu pobrania danych. Rozważ następujący kod:

wiersz i kolumna
 from pyspark.sql import * #Create DataFrame songdf = spark.read.csv(r'C:UsersDEVANSH SHARMA	op50.csv', inferSchema = True, header = True) #Perform SQL queries songdf.select('Genre').show() songdf.filter(songdf['Genre']=='pop').show() 

Wyjście:

 +----------------+ | Genre| +----------------+ | canadian pop| | reggaeton flow| | dance pop| | pop| | dfw rap| | pop| | trap music| | pop| | country rap| | electropop| | reggaeton| | dance pop| | pop| | panamanian pop| |canadian hip hop| | dance pop| | latin| | dfw rap| |canadian hip hop| | escape room| +----------------+ only showing top 20 rows +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ |_c0| Track.Name| Artist.Name|Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity| +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 6|I Don't Care (wit...| Ed Sheeran| pop| 102| 68| 80| -5| 9| 84| 220| 9| 4| 84| | 8| How Do You Sleep?| Sam Smith| pop| 111| 68| 48| -5| 8| 35| 202| 15| 9| 90| | 13| Someone You Loved|Lewis Capaldi| pop| 110| 41| 50| -6| 11| 45| 182| 75| 3| 88| | 38|Antisocial (with ...| Ed Sheeran| pop| 152| 82| 72| -5| 36| 91| 162| 13| 5| 87| | 44| Talk| Khalid| pop| 136| 40| 90| -9| 6| 35| 198| 5| 13| 84| | 50|Cross Me (feat. C...| Ed Sheeran| pop| 95| 79| 75| -6| 7| 61| 206| 21| 12| 82| +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ 

Korzystanie z funkcji groupBy().

Funkcja groupBy() zbiera dane o podobnych kategoriach.

 songdf.groupBy('Genre').count().show() 

Wyjście:

 +----------------+-----+ | Genre|count| +----------------+-----+ | boy band| 1| | electropop| 2| | pop| 7| | brostep| 2| | big room| 1| | pop house| 1| | australian pop| 1| | edm| 3| | r&b en espanol| 1| | dance pop| 8| | reggaeton| 2| | canadian pop| 2| | trap music| 1| | escape room| 1| | reggaeton flow| 2| | panamanian pop| 2| | atl hip hop| 1| | country rap| 2| |canadian hip hop| 3| | dfw rap| 2| +----------------+-----+ 

dystrybucja(numpartitions, *cols)

The dystrybucja() zwraca nową ramkę DataFrame, która jest wyrażeniem partycjonującym. Ta funkcja akceptuje dwa parametry liczba partycji I *przełęcz. The liczba partycji Parametr określa docelową liczbę kolumn.

 song_spotify.repartition(10).rdd.getNumPartitions() data = song_spotify.union(song_spotify).repartition('Energy') data.show(5) 

Wyjście:

 +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ |_c0| Track.Name|Artist.Name| Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity| +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 5|Goodbyes (Feat. Y...|Post Malone|dfw rap| 150| 65| 58| -4| 11| 18| 175| 45| 7| 94| | 17| LA CANCI?N| J Balvin| latin| 176| 65| 75| -6| 11| 43| 243| 15| 32| 90| | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 5|Goodbyes (Feat. Y...|Post Malone|dfw rap| 150| 65| 58| -4| 11| 18| 175| 45| 7| 94| +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ only showing top 5 rows