A função Lead () no Pyspark está disponível no módulo de janela que é usado para retornar os próximos valores das linhas às linhas atuais. Primeiro, a função Lead () retorna o nulo para a última linha em uma partição. É preciso um parâmetro de deslocamento que representa o número total de linhas, de modo que os próximos valores da linha sejam retornados às linhas reais. Os nulos são colocados para a primeira última linha/s (deslocamento).
É possível particionar as linhas no quadro de dados usando a função da janela. Está disponível no Pyspark.SQL.janela módulo.
Sintaxe:
dataframe_obj.WithColumn ("Lead_column", Lead ("Column", Offset).sobre (partição))
São necessários dois parâmetros:
- O Lead_column é o nome da coluna no quadro de dados Pyspark, no qual os valores da linha com chumbo são colocados com base nos valores nesta coluna.
- O deslocamento especifica o número inteiro para retornar esse número das próximas linhas aos valores atuais da linha.
Passos:
- Crie um quadro de dados Pyspark que tenha alguns valores semelhantes em pelo menos uma coluna.
- Partição Os dados usando o método Partionby () disponíveis na função da janela e encomende -os com base na coluna usando a função Orderby ().
Sintaxe:
partição = janela.Partionby ("coluna").ordem ("coluna")
Podemos solicitar os dados particionados com a coluna particionada ou qualquer outra coluna.
Agora, você pode usar a função Lead () nas linhas particionadas usando o sobre() função.
Adicionamos uma coluna para armazenar o número da linha usando o withcolumn () função.
Sintaxe:
dataframe_obj.WithColumn ("Lead_column", Lead ("Column", Offset).sobre (partição))
Aqui, o nome especifica o nome da linha e o dataframe_obj é o nosso pyspark dataframe.
Vamos implementar o código.
Exemplo 1:
Aqui, criamos um quadro de dados Pyspark que possui 5 colunas - ['sujeito_id', 'nome', 'idade', 'tecnologia1', 'Technology2'] com 10 linhas e particionando as linhas com base nas Technology1 Usando a função da janela. Depois disso, lideramos 1 fila.
Importar Pyspark
de Pyspark.Importação SQL *
Spark_App = SparkSession.construtor.nome do aplicativo('_').getorcreate ()
alunos = [(4, 'sravan', 23, 'php', 'testes'),
(4, 'sravan', 23, 'php', 'testing'),
(46, 'Mounika', 22, '.Net ',' html '),
(4, 'Deepika', 21, 'Oracle', 'html'),
(46, 'Mounika', 22, 'Oracle', 'Testing'),
(12, 'Chandrika', 22, 'Hadoop', 'C#'),
(12, 'Chandrika', 22, 'Oracle', 'Testing'),
(4, 'Sravan', 23, 'Oracle', 'C#'),
(4, 'Deepika', 21, 'Php', 'C#'),
(46, 'Mounika', 22, '.Net ',' teste ')
]
dataframe_obj = spark_app.CreatedataFrame (estudantes, ['sujeito_id', 'nome', 'idade', 'tecnologia1', 'tecnologia2']))
print ("---------- DataFrame real ----------")
dataframe_obj.mostrar()
# importe a função da janela
de Pyspark.SQL.Janela de importação de janela
#import a liderança do Pyspark.SQL.funções
de Pyspark.SQL.funções Importar chumbo
#Partition O DataFrame com base nos valores na coluna Technology1 e
#orde as linhas em cada partição com base na coluna Subjetion_id
partição = janela.partitionby ("tecnologia1").ordem ('sujeito_id')
print ("---------- Particionado DataFrame ----------")
#Agora menciona o lead com offset-1 com base em sujeito_id
dataframe_obj.WithColumn ("Lead", Lead ("Subjetion_id", 1).sobre (partição)).mostrar()
Saída:
Explicação:
A primeira saída representa os dados reais presentes no DataFrame. Na segunda saída, a partição é feita com base no Technology1 coluna.
O número total de partições é 4.
Partição 1:
O .A rede ocorreu duas vezes na primeira partição. Desde que especificamos o deslocamento de chumbo como 1, o último .O valor líquido é nulo e o primeiro .O valor líquido é o próximo valor da linha subject_id - 46.
Partição 2:
Hadoop ocorreu uma vez na segunda partição. Então, o chumbo é nulo.
Partição 3:
Oracle ocorreu quatro vezes na terceira partição.
- Para o último oráculo, o chumbo é nulo.
- Para o primeiro Oracle, o valor do chumbo é 4 (já que o próximo valor da linha sujeito_ID é 4).
- Para o terceiro oráculo, o valor do chumbo é 12 (já que o próximo valor da linha sujeito_ID é 12).
Para o quarto oráculo, o valor do chumbo é 46 (já que o próximo valor da linha sujeito_ID é 46).
Partição 4:
PHP ocorreu três vezes na quarta partição.
- O valor de chumbo para o 3º PHP é nulo.
- O valor do chumbo para o 1º PHP é 4 (já que o próximo valor da linha sujeito_ID é 4).
- O valor do chumbo para o 2º PHP é 4 (já que o próximo valor da linha sujeito_ID é 4).
Exemplo 2:
Lidere as linhas por 2. Certifique -se de criar o Pyspark DataFrame, como visto no Exemplo 1.
# importe a função da janela
de Pyspark.SQL.Janela de importação de janela
#import a liderança do Pyspark.SQL.funções
de Pyspark.SQL.funções Importar chumbo
#Partition O DataFrame com base nos valores na coluna Technology1 e
#orde as linhas em cada partição com base na coluna Subjetion_id
partição = janela.partitionby ("tecnologia1").ordem ('sujeito_id')
print ("---------- Particionado DataFrame ----------")
#Agora menciona o lead com offset-2 com base em sujeito_id
dataframe_obj.Withcolumn ("Lead", Lead ("Subvenço_id", 2).sobre (partição)).mostrar()
Saída:
Explicação:
A partição é feita com base no Technology1 coluna.
O número total de partições é 4.
Partição 1:
O .A rede ocorreu duas vezes na primeira partição. Como especificamos o deslocamento de chumbo como 2, o deslocamento é nulo para ambos os valores.
Partição 2:
Hadoop ocorreu uma vez na segunda partição. Então, o chumbo é nulo.
Partição 3:
Oracle ocorreu quatro vezes na terceira partição.
- Nos dois últimos Oracle, a liderança é nula.
- Para o primeiro oráculo, o valor do chumbo é 12 (já que as próximas 2 linhas sujeitos_id valor são 12).
- Para o segundo Oracle, o valor do chumbo é 46 (já que as próximas 2 linhas sujeitos_id valor são 46).
Partição 4:
PHP ocorreu três vezes na quarta partição.
- Nos dois últimos Oracle, a liderança é nula.
- Para o primeiro PHP, o valor do chumbo é 4 (já que as próximas 2 linhas sujeitos_id valor são 4).
Exemplo 3:
Lidere as linhas por 2 com base na coluna de idade. Certifique -se de criar o Pyspark DataFrame, como visto no Exemplo 1.
# importe a função da janela
de Pyspark.SQL.Janela de importação de janela
#import a liderança do Pyspark.SQL.funções
de Pyspark.SQL.funções Importar chumbo
#Partition O DataFrame com base nos valores na coluna Technology1 e
#orde as linhas em cada partição com base na coluna Subjetion_id
partição = janela.partitionby ("tecnologia1").ordem ('sujeito_id')
print ("---------- Particionado DataFrame ----------")
#Agora menciona o chumbo com offset-2 com base na idade
dataframe_obj.WithColumn ("Lead", Lead ("Age", 2).sobre (partição)).mostrar()
Saída:
Explicação:
A partição é feita com base no Technology1 coluna e o chumbo é definido com base na coluna de idade.
O número total de partições é 4.
Partição 1:
O .A rede ocorreu duas vezes na primeira partição. Como especificamos o deslocamento de chumbo como 2, o deslocamento é nulo para ambos os valores.
Partição 2:
Hadoop ocorreu uma vez na segunda partição. Então, a liderança é nula.
Partição 3:
Oracle ocorreu quatro vezes na terceira partição.
- Para os dois últimos Oracle, a liderança é nula.
- Para o primeiro Oracle, o valor do chumbo é 22 (já que as próximas 2 linhas de idade são 22).
- Para o segundo Oracle, o valor do chumbo é 22 (já que as próximas 2 linhas de idade são 22).
Partição 4:
PHP ocorreu três vezes na quarta partição.
- Para os dois últimos Oracle, a liderança é nula.
- Para o primeiro PHP, o valor do chumbo é 21 (já que as próximas 2 linhas de idade são 21).
Conclusão
Aprendemos como obter os valores de chumbo no Pyspark Dataframe nas linhas particionadas. A função Lead () no Pyspark está disponível no módulo de janela que é usado para retornar os próximos valores da linha às linhas atuais. É preciso um parâmetro de deslocamento que representa o número total de linhas, de modo que os próximos valores da linha sejam retornados às linhas reais. Para a primeira última fila, os nulos (deslocados) são colocados. Aprendemos os diferentes exemplos definindo as diferentes compensações.