Pyspark RDD - Transformações

Pyspark RDD - Transformações
Em Python, Pyspark é um módulo Spark usado para fornecer um tipo semelhante de processamento como o Spark.

RDD significa conjuntos de dados distribuídos resilientes. Podemos chamar RDD de estrutura de dados fundamental no Apache Spark.

Precisamos importar RDD do Pyspark.Módulo RDD.

Assim, no Pyspark para criar um RDD, podemos usar o método parallelize ().

Sintaxe:

Spark_App.SparkContext.paralelize (dados)

Onde,

Os dados podem ser um dados unidimensional (dados lineares) ou bidimensionais (dados da coluna de linha).

Transformações RDD:

Um RDD de transformação é uma operação aplicada a um RDD para criar novos dados do RDD existente. Usando transformações, somos capazes de filtrar o RDD aplicando algumas transformações.

Vamos ver as transformações que são realizadas no RDD fornecido.

Vamos discuti -los um por um.

1. mapa()

A transformação do map () é usada para mapear um valor para os elementos presentes no RDD. É preciso uma função anônima como um parâmetro, como Lambda e transforma os elementos em um RDD.

Sintaxe:

Rdd_data.mapa (anonymous_function)

Parâmetros:

Anonymous_function parece:

Elemento Lambda: operação

Por exemplo, a operação é adicionar/subtrair todos os elementos com algum novo elemento.

Vamos ver os exemplos para entender melhor esta transformação.

Exemplo 1:

Neste exemplo, criamos um RDD chamado Student_Marks com 20 elementos e aplica a transformação de map () adicionando cada elemento com 20 e exibindo -os usando colecion () ação.

#import o módulo Pyspark
Importar Pyspark
#Import SparkSession para criar uma sessão
de Pyspark.SQL Import SparkSession
# importar rdd do Pyspark.rdd
de Pyspark.RDD Import rdd
#Crie um aplicativo chamado Linuxhint
Spark_App = SparkSession.construtor.AppName ('Linuxhint').getorcreate ()
# Crie o aluno marca dados com 20 elementos
Student_Marks = Spark_App.SparkContext.paralelize ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34]))
#Display Data em RDD
print ("Dados reais em RDD:", Student_marks.mapa (elemento lambda: elemento).colet ()))
#Apply map () transformação adicionando 20 a cada elemento em RDD
print ("Depois de adicionar 20 a cada elemento em RDD:", Student_marks.mapa (elemento lambda: elemento+ 20).colet ()))

Saída:

Dados reais em RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Depois de adicionar 20 a cada elemento em RDD: [109, 96, 98, 109, 110, 120, 54, 76, 74, 42, 65, 63, 43, 76, 98, 41, 54, 54, 76, 54]

A partir da saída acima, podemos ver que o elemento 20 é adicionado a todos os elementos em RDD através da função lambda usando a transformação do map ().

Exemplo 2:

Neste exemplo, criamos um RDD chamado Student_Marks com 20 elementos e aplica a transformação do map () subtraindo cada elemento por 15 e exibindo -os usando colecionamento () ação.

#import o módulo Pyspark
Importar Pyspark
#Import SparkSession para criar uma sessão
de Pyspark.SQL Import SparkSession
# importar rdd do Pyspark.rdd
de Pyspark.RDD Import rdd
#Crie um aplicativo chamado Linuxhint
Spark_App = SparkSession.construtor.AppName ('Linuxhint').getorcreate ()
# Crie o aluno marca dados com 20 elementos
Student_Marks = Spark_App.SparkContext.paralelize ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34]))
#Display Data em RDD
print ("Dados reais em RDD:", Student_marks.mapa (elemento lambda: elemento).colet ()))
#Apply map () transformação subtraindo 15 de cada elemento em RDD
print ("Depois de subtrair 15 de cada elemento em RDD:", Student_marks.Mapa (elemento Lambda: Element-15).colet ()))

Saída:

Dados reais em RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Depois de subtrair 15 de cada elemento em RDD: [74, 61, 63, 74, 75, 85, 19, 41, 39, 7, 30, 28, 8, 41, 63, 6, 19, 19, 41, 19]

A partir da saída acima, podemos ver que o elemento 15 é subtraído a todos os elementos em RDD através da função Lambda usando a transformação MAP ().

2. filtro()

Filter () a transformação é usada para filtrar valores do RDD. É preciso uma função anônima como Lambda e retorna os elementos filtrando elementos de um RDD.

Sintaxe:

Rdd_data.filtro (anonymous_function)

Parâmetros:

Anonymous_function parece:

Elemento Lambda: Condição/Expressão

Por exemplo, a condição é usada para especificar as declarações expressivas para filtrar o RDD.

Vamos ver exemplos para entender melhor esta transformação.

Exemplo 1:

Neste exemplo, criamos um RDD chamado Student_Marks com 20 elementos e aplica a transformação filtro () filtrando apenas múltiplos de 5 e exibindo -os usando colecionamento () ação.

#import o módulo Pyspark
Importar Pyspark
#Import SparkSession para criar uma sessão
de Pyspark.SQL Import SparkSession
# importar rdd do Pyspark.rdd
de Pyspark.RDD Import rdd
#Crie um aplicativo chamado Linuxhint
Spark_App = SparkSession.construtor.AppName ('Linuxhint').getorcreate ()
# Crie o aluno marca dados com 20 elementos
Student_Marks = Spark_App.SparkContext.paralelize ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34]))
#Display Data em RDD
print ("Dados reais em RDD:", Student_marks.mapa (elemento lambda: elemento).colet ()))
#Apply filter () transformação retornando múltiplos inly de 5.
Imprima ("Multipulares de 5 de um RDD:", Student_marks.filtro (elemento lambda: elemento%5 == 0).colet ()))
)

Saída:

Dados reais em RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Múltiplos de 5 de um RDD: [90, 100, 45]

Na saída acima, podemos ver que múltiplos de 5 elementos são filtrados do RDD.

Exemplo 2:

Neste exemplo, criamos um RDD chamado Student_Marks com 20 elementos e aplica a transformação filtro () filtrando elementos maiores que 45 e exibindo -os usando colecionamento () ação.

#import o módulo Pyspark
Importar Pyspark
#Import SparkSession para criar uma sessão
de Pyspark.SQL Import SparkSession
# importar rdd do Pyspark.rdd
de Pyspark.RDD Import rdd
#Crie um aplicativo chamado Linuxhint
Spark_App = SparkSession.construtor.AppName ('Linuxhint').getorcreate ()
# Crie o aluno marca dados com 20 elementos
Student_Marks = Spark_App.SparkContext.paralelize ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34]))
#Display Data em RDD
print ("Dados reais em RDD:", Student_marks.mapa (elemento lambda: elemento).colet ()))
#Apply filter () transformação filtrando valores maiores que 45
print ("Valores maiores que 45:", Student_marks.filtro (elemento lambda: elemento> 45).colet ()))

Saída:

Dados reais em RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Valores maiores que 45: [89, 76, 78, 89, 90, 100, 56, 54, 56, 78, 56]

Na saída acima, podemos ver esses elementos maiores que 45 são filtrados do RDD.

3. União()

A transformação da Union () é usada para combinar dois RDDs. Podemos realizar essa transformação em dois RDDs…

Sintaxe:

Rdd_data1.Union (rdd_data2)

Vamos ver exemplos para entender melhor esta transformação.

Exemplo 1:

Neste exemplo, criaremos um único RDD com dados de marcas do aluno e geraremos dois RDD a partir do único RDD filtrando alguns valores usando a transformação do filtro (). Depois disso, podemos realizar a transformação Union () nos dois RDDs filtrados.

#import o módulo Pyspark
Importar Pyspark
#Import SparkSession para criar uma sessão
de Pyspark.SQL Import SparkSession
# importar rdd do Pyspark.rdd
de Pyspark.RDD Import rdd
#Crie um aplicativo chamado Linuxhint
Spark_App = SparkSession.construtor.AppName ('Linuxhint').getorcreate ()
# Crie o aluno marca dados com 20 elementos
Student_Marks = Spark_App.SparkContext.paralelize ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34]))
#Display Data em RDD
print ("Dados reais em RDD:", Student_marks.mapa (elemento lambda: elemento).colet ()))
primeiro_filter = student_marks.filtro (elemento lambda: elemento> 90)
Second_filter = student_marks.filtro (elemento lambda: elemento <40)
#Display Primeira transformação filtrada
print ("Elementos em RDD maior que 90", First_Filter.colet ()))
#Display Segunda transformação filtrada
print ("Elementos em RDD menor que 40", Second_Filter.colet ()))
#Apply Union () Transformação realizando Union nos 2 filtros acima
Print ("Transformação da União em dois dados filtrados", First_Filter.Union (Second_Filter).colet ()))

Saída:

Dados reais em RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Elementos em RDD maiores que 90 [100]
Elementos em RDD menor que 40 [34, 22, 23, 21, 34, 34, 34]
Transformação da União em dois dados filtrados [100, 34, 22, 23, 21, 34, 34, 34]

A partir da saída acima, você pode ver que realizamos Union em First_Filter e Second_Filter.

O First_Filter é obtido pela obtenção de elementos do Studentmarks RDD maiores que 90 e o segundo_filter é obtido pela obtenção de elementos do Studentmarks RDD menor que 40 usando o filtro () transformação.

Exemplo 2:

Neste exemplo, criaremos dois RDDs para que o primeiro RDD tenha 20 elementos e o segundo RDD tenha 10 elementos. Depois disso, podemos aplicar uma transformação da Union () a esses dois RDDs.

#import o módulo Pyspark
Importar Pyspark
#Import SparkSession para criar uma sessão
de Pyspark.SQL Import SparkSession
# importar rdd do Pyspark.rdd
de Pyspark.RDD Import rdd
#Crie um aplicativo chamado Linuxhint
Spark_App = SparkSession.construtor.AppName ('Linuxhint').getorcreate ()
# Crie o aluno marca dados com 20 elementos
Student_Marks1 = Spark_App.SparkContext.paralelize ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34]))
# Crie o aluno marca dados com 10 elementos
Student_Marks2 = Spark_App.SparkContext.paralelize ([45,43,23,56,78,21,34,34,56,34])
#Display Data em RDD
Print ("Dados reais nas marcas do aluno 1 RDD:", Student_marks1.mapa (elemento lambda: elemento).colet ()))
#Display Data em RDD
Print ("Dados reais nas marcas do aluno 2 RDD:", Student_marks2.mapa (elemento lambda: elemento).colet ()))
#Apply Union () Transformação executando a Union nos 2 RDDs acima
Print ("Transformação da União em Two RDD", Student_Marks1.Union (Student_Marks2).colet ()))

Saída:

Dados reais nas marcas de estudante 1 rdd: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Dados reais nas marcas do aluno 2 RDD: [45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Transformação da União em dois RDD [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]

Podemos ver que dois RDDs são combinados usando a transformação Union ().

Conclusão

A partir deste tutorial Pyspark, vemos três transformações aplicadas ao RDD. A transformação do map () é usada para mapear, transformando elementos em um RDD, filtro () é usado para executar operações de filtro e criar um novo RDD filtrado a partir do RDD existente. Finalmente, discutimos o Union () RDD que é usado para combinar dois RDDs.