Converter Pyspark RDD em DataFrame

Converter Pyspark RDD em DataFrame
Em Python, Pyspark é um módulo Spark usado para fornecer um tipo semelhante de processamento como o Spark.

RDD significa conjuntos de dados distribuídos resilientes. Podemos chamar RDD de estrutura de dados fundamental no Apache Spark.

Sintaxe

Spark_App.SparkContext.paralelize (dados)

Podemos relacionar os dados em um formato tabular. A estrutura de dados usada é dataframe.Formato tabular significa que armazena dados em linhas e colunas.

Em Pyspark, podemos criar um aplicativo de dados de dados do Spark com o método CreateDataFrame ().

Sintaxe

Spark_App.CreatedataFrame (input_data, colunas)

Onde input_data pode ser um dicionário ou uma lista para criar um quadro de dados a partir desses dados, e se o input_data for uma lista de dicionários, as colunas não serão necessárias. Se for uma lista aninhada, temos que fornecer os nomes das colunas.

Agora, vamos discutir como converter Pyspark RDD em DataFrame.

Criação de Pyspark RDD

Neste exemplo, criaremos um RDD chamado alunos e o exibiremos usando a ação colecionada ().

#import o módulo Pyspark
Importar Pyspark
#Import SparkSession para criar uma sessão
de Pyspark.SQL Import SparkSession
# importar rdd do Pyspark.rdd
de Pyspark.RDD Import rdd
#Crie um aplicativo chamado Linuxhint
Spark_App = SparkSession.construtor.AppName ('Linuxhint').getorcreate ()
# Crie dados dos alunos com 5 linhas e 6 atributos
alunos = spark_app.SparkContext.paralelize (['rollno': '001', 'nome': 'sravan', 'idade': 23, 'altura': 5.79, 'Peso': 67, 'Endereço': 'Guntur',
'rollno': '002', 'nome': 'ojaswi', 'idade': 16, 'altura': 3.79, 'Peso': 34, 'Endereço': 'Hyd',
'rollno': '003', 'nome': 'gnanesh chowdary', 'idade': 7, 'altura': 2.79, 'Peso': 17, 'Endereço': 'Patna',
'rollno': '004', 'nome': 'rohith', 'idade': 9, 'altura': 3.69, 'Peso': 28, 'Endereço': 'Hyd',
'rollno': '005', 'nome': 'sridevi', 'idade': 37, 'altura': 5.59, 'Peso': 54, 'Endereço': 'Hyd']))
#Display o RDD usando colecionamento ()
Imprimir (alunos.colet ()))

Saída

['rollno': '001', 'nome': 'sravan', 'idade': 23, 'altura': 5.79, 'Peso': 67, 'Endereço': 'Guntur',
'rollno': '002', 'nome': 'ojaswi', 'idade': 16, 'altura': 3.79, 'Peso': 34, 'Endereço': 'Hyd',
'rollno': '003', 'nome': 'gnanesh chowdary', 'idade': 7, 'altura': 2.79, 'Peso': 17, 'Endereço': 'Patna',
'rollno': '004', 'nome': 'rohith', 'idade': 9, 'altura': 3.69, 'Peso': 28, 'Endereço': 'Hyd',
'rollno': '005', 'nome': 'sridevi', 'idade': 37, 'altura': 5.59, 'Peso': 54, 'Endereço': 'Hyd']

Método 1: Usando CreateDataFrame ()

É possível converter RDD em DataFrame de um aplicativo Spark com o método CreateDataFrame (). Aqui precisamos passar RDD para este método.

Sintaxe

Spark_App.CreatedataFrame (input_rdd)

Onde input_rdd é o RDD.

Exemplo
Neste exemplo, estamos convertendo os alunos - RDD em dataframe usando o método CreateDataFrame ().

#import o módulo Pyspark
Importar Pyspark
#Import SparkSession para criar uma sessão
de Pyspark.SQL Import SparkSession
# importar rdd do Pyspark.rdd
de Pyspark.RDD Import rdd
#Crie um aplicativo chamado Linuxhint
Spark_App = SparkSession.construtor.AppName ('Linuxhint').getorcreate ()
# Crie dados dos alunos com 5 linhas e 6 atributos
alunos = spark_app.SparkContext.paralelize (['rollno': '001', 'nome': 'sravan', 'idade': 23, 'altura': 5.79, 'Peso': 67, 'Endereço': 'Guntur',
'rollno': '002', 'nome': 'ojaswi', 'idade': 16, 'altura': 3.79, 'Peso': 34, 'Endereço': 'Hyd',
'rollno': '003', 'nome': 'gnanesh chowdary', 'idade': 7, 'altura': 2.79, 'Peso': 17, 'Endereço': 'Patna',
'rollno': '004', 'nome': 'rohith', 'idade': 9, 'altura': 3.69, 'Peso': 28, 'Endereço': 'Hyd',
'rollno': '005', 'nome': 'sridevi', 'idade': 37, 'altura': 5.59, 'Peso': 54, 'Endereço': 'Hyd']))
#Verifique o tipo de estudante
Imprimir (Tipo (alunos))
#Convert RDD para DataFrame
df = spark_app.CreatedAtAframe (estudantes)
#Display O DataFrame
df.mostrar()
#verifique o tipo de df
Imprimir (tipo (df))

Saída

Na saída acima, podemos ver que os alunos são um RDD (exibido usando o tipo) e, depois de converter para o DataFrame, exibimos o método DataFrame usando Show () e, para a confirmação, retornamos o tipo de DataFrame.

Método 2: Usando CreateDataFrame () com esquema

Structype ()
Este método é usado para definir a estrutura do quadro de dados Pyspark. Ele aceitará uma lista de tipos de dados, juntamente com nomes de colunas para o DataFrame especificado. Isso é conhecido como o esquema do DataFrame. Ele armazena uma coleção de campos

Structfield ()
Este método é usado dentro do método structtype () do pyspark dataframe. Ele aceitará nomes de colunas com o tipo de dados.

Sintaxe

esquema = structtype ([
Structfield ("Coluna 1", Datatype, True/False),
Structfield ("Coluna 2", Datatype, True/False),
.. ,
Structfield ("Coluna N", Datatype, True/False)])

Onde o esquema se refere ao quadro de dados quando é criado.

Parâmetros

  1. Structype aceita uma lista de Structfields em uma lista separada por uma vírgula.
  2. Structfield () é usado para adicionar colunas ao DataFrame, que leva os nomes das colunas como o primeiro parâmetro e o tipo de dados das colunas específicas como o segundo parâmetro.

Temos que usar os tipos de dados dos métodos importados do Pyspark.SQL.Módulo de tipos.

Os tipos de dados suportados são:

  • Stringtype () - Usado para armazenar valores de string
  • Integertype () - Usado para armazenar valores inteiros ou inteiros longos
  • Floottype () - Usado para armazenar valores de flutuação
  • DoubleType () - Usado para armazenar valores duplos
  1. Valores booleanos como o terceiro parâmetro. Se for verdadeiro, o tipo de dados fornecido será usado, caso contrário, não quando for falso.

Temos que passar esse esquema para o método de dados de dados junto com os dados.

Sintaxe

CreatedAtAframe (dados, esquema = esquema)

Sintaxe

Spark_App.CreatedataFrame (input_rdd)

Onde, input_rdd é o RDD.

Exemplo
Neste exemplo, estamos convertendo os alunos - RDD em DataFrame usando o método CreateDataFrame () com os nomes das colunas - rollno, nome, idade, altura, peso e endereço

#import o módulo Pyspark
Importar Pyspark
#Import SparkSession para criar uma sessão
de Pyspark.SQL Import SparkSession
# importar rdd do Pyspark.rdd
de Pyspark.RDD Import rdd
#e importar tipos de estrutura e tipos de dados
de Pyspark.SQL.Tipos de importação estruttype, structfield, stringtype, integertype, floottype
#Crie um aplicativo chamado Linuxhint
Spark_App = SparkSession.construtor.AppName ('Linuxhint').getorcreate ()
# Crie dados dos alunos com 5 linhas e 6 atributos
alunos = spark_app.SparkContext.paralelize (['rollno': '001', 'nome': 'sravan', 'idade': 23, 'altura': 5.79, 'Peso': 67, 'Endereço': 'Guntur',
'rollno': '002', 'nome': 'ojaswi', 'idade': 16, 'altura': 3.79, 'Peso': 34, 'Endereço': 'Hyd',
'rollno': '003', 'nome': 'gnanesh chowdary', 'idade': 7, 'altura': 2.79, 'Peso': 17, 'Endereço': 'Patna',
'rollno': '004', 'nome': 'rohith', 'idade': 9, 'altura': 3.69, 'Peso': 28, 'Endereço': 'Hyd',
'rollno': '005', 'nome': 'sridevi', 'idade': 37, 'altura': 5.59, 'Peso': 54, 'Endereço': 'Hyd']))
#Verifique o tipo de estudante
Imprimir (Tipo (alunos))
#Define o StructType e Structfields
#para os nomes de colunas abaixo
esquema = structtype ([
Structfield ("rollno", stringtype (), true),
Structfield ("Nome", Stringtype (), True),
Structfield ("Age", Integertype (), verdadeiro),
Structfield ("Height", Floottype (), True),
Structfield ("Peso", Integertype (), True),
Structfield ("Endereço", Stringtype (), True)
]))
#Convert RDD para DataFrame
df = spark_app.CreatedataFrame (estudantes, esquema)
#Display O DataFrame
df.mostrar()
#verifique o tipo de df
Imprimir (tipo (df))

Saída

Na saída acima, podemos ver que os alunos são um RDD (exibido usando o tipo) e, depois de converter para o DataFrame, exibimos o método DataFrame usando Show () e, para a confirmação, retornamos o tipo de DataFrame.

Método 3: Usando Todf ()

Todf () não pega nenhum parâmetro e o converte diretamente no DataFrame.

Sintaxe

input_rdd.Todf ()

Onde, input_rdd é o RDD.

Exemplo
Neste exemplo, estamos convertendo os alunos - RDD em DataFrame usando o método Todf ().

#import o módulo Pyspark
Importar Pyspark
#Import SparkSession para criar uma sessão
de Pyspark.SQL Import SparkSession
# importar rdd do Pyspark.rdd
de Pyspark.RDD Import rdd
#Crie um aplicativo chamado Linuxhint
Spark_App = SparkSession.construtor.AppName ('Linuxhint').getorcreate ()
# Crie dados dos alunos com 5 linhas e 6 atributos
alunos = spark_app.SparkContext.paralelize (['rollno': '001', 'nome': 'sravan', 'idade': 23, 'altura': 5.79, 'Peso': 67, 'Endereço': 'Guntur',
'rollno': '002', 'nome': 'ojaswi', 'idade': 16, 'altura': 3.79, 'Peso': 34, 'Endereço': 'Hyd',
'rollno': '003', 'nome': 'gnanesh chowdary', 'idade': 7, 'altura': 2.79, 'Peso': 17, 'Endereço': 'Patna',
'rollno': '004', 'nome': 'rohith', 'idade': 9, 'altura': 3.69, 'Peso': 28, 'Endereço': 'Hyd',
'rollno': '005', 'nome': 'sridevi', 'idade': 37, 'altura': 5.59, 'Peso': 54, 'Endereço': 'Hyd']))
#Verifique o tipo de estudante
Imprimir (Tipo (alunos))
#Convert RDD para DataFrame
df = estudantes.Todf ()
#Display O DataFrame
df.mostrar()
#verifique o tipo de df
Imprimir (tipo (df))

Saída

Na saída acima, podemos ver que os alunos são um RDD (exibido usando o tipo) e, depois de converter para o DataFrame, exibimos o método DataFrame usando Show () e, para a confirmação, retornamos o tipo de DataFrame.

Conclusão

Neste tutorial de Pyspark, vimos como converter o Pyspark RDD para Pyspark DataFrame usando os métodos CreateTAframe () e Todf (). Se você deseja fornecer nomes de colunas explicitamente, pode usar o segundo método neste tutorial.