Pyspark - função principal

Pyspark - função principal
A função Lead () no Pyspark está disponível no módulo de janela que é usado para retornar os próximos valores das linhas às linhas atuais. Primeiro, a função Lead () retorna o nulo para a última linha em uma partição. É preciso um parâmetro de deslocamento que representa o número total de linhas, de modo que os próximos valores da linha sejam retornados às linhas reais. Os nulos são colocados para a primeira última linha/s (deslocamento).

É possível particionar as linhas no quadro de dados usando a função da janela. Está disponível no Pyspark.SQL.janela módulo.

Sintaxe:

dataframe_obj.WithColumn ("Lead_column", Lead ("Column", Offset).sobre (partição))

São necessários dois parâmetros:

  1. O Lead_column é o nome da coluna no quadro de dados Pyspark, no qual os valores da linha com chumbo são colocados com base nos valores nesta coluna.
  2. O deslocamento especifica o número inteiro para retornar esse número das próximas linhas aos valores atuais da linha.

Passos:

  1. Crie um quadro de dados Pyspark que tenha alguns valores semelhantes em pelo menos uma coluna.
  2. Partição Os dados usando o método Partionby () disponíveis na função da janela e encomende -os com base na coluna usando a função Orderby ().

Sintaxe:

partição = janela.Partionby ("coluna").ordem ("coluna")

Podemos solicitar os dados particionados com a coluna particionada ou qualquer outra coluna.

Agora, você pode usar a função Lead () nas linhas particionadas usando o sobre() função.

Adicionamos uma coluna para armazenar o número da linha usando o withcolumn () função.

Sintaxe:

dataframe_obj.WithColumn ("Lead_column", Lead ("Column", Offset).sobre (partição))

Aqui, o nome especifica o nome da linha e o dataframe_obj é o nosso pyspark dataframe.

Vamos implementar o código.

Exemplo 1:

Aqui, criamos um quadro de dados Pyspark que possui 5 colunas - ['sujeito_id', 'nome', 'idade', 'tecnologia1', 'Technology2'] com 10 linhas e particionando as linhas com base nas Technology1 Usando a função da janela. Depois disso, lideramos 1 fila.

Importar Pyspark
de Pyspark.Importação SQL *
Spark_App = SparkSession.construtor.nome do aplicativo('_').getorcreate ()
alunos = [(4, 'sravan', 23, 'php', 'testes'),
(4, 'sravan', 23, 'php', 'testing'),
(46, 'Mounika', 22, '.Net ',' html '),
(4, 'Deepika', 21, 'Oracle', 'html'),
(46, 'Mounika', 22, 'Oracle', 'Testing'),
(12, 'Chandrika', 22, 'Hadoop', 'C#'),
(12, 'Chandrika', 22, 'Oracle', 'Testing'),
(4, 'Sravan', 23, 'Oracle', 'C#'),
(4, 'Deepika', 21, 'Php', 'C#'),
(46, 'Mounika', 22, '.Net ',' teste ')
]
dataframe_obj = spark_app.CreatedataFrame (estudantes, ['sujeito_id', 'nome', 'idade', 'tecnologia1', 'tecnologia2']))
print ("---------- DataFrame real ----------")
dataframe_obj.mostrar()
# importe a função da janela
de Pyspark.SQL.Janela de importação de janela
#import a liderança do Pyspark.SQL.funções
de Pyspark.SQL.funções Importar chumbo
#Partition O DataFrame com base nos valores na coluna Technology1 e
#orde as linhas em cada partição com base na coluna Subjetion_id
partição = janela.partitionby ("tecnologia1").ordem ('sujeito_id')
print ("---------- Particionado DataFrame ----------")
#Agora menciona o lead com offset-1 com base em sujeito_id
dataframe_obj.WithColumn ("Lead", Lead ("Subjetion_id", 1).sobre (partição)).mostrar()

Saída:

Explicação:

A primeira saída representa os dados reais presentes no DataFrame. Na segunda saída, a partição é feita com base no Technology1 coluna.

O número total de partições é 4.

Partição 1:

O .A rede ocorreu duas vezes na primeira partição. Desde que especificamos o deslocamento de chumbo como 1, o último .O valor líquido é nulo e o primeiro .O valor líquido é o próximo valor da linha subject_id - 46.

Partição 2:

Hadoop ocorreu uma vez na segunda partição. Então, o chumbo é nulo.

Partição 3:

Oracle ocorreu quatro vezes na terceira partição.

  1. Para o último oráculo, o chumbo é nulo.
  2. Para o primeiro Oracle, o valor do chumbo é 4 (já que o próximo valor da linha sujeito_ID é 4).
  3. Para o terceiro oráculo, o valor do chumbo é 12 (já que o próximo valor da linha sujeito_ID é 12).

Para o quarto oráculo, o valor do chumbo é 46 (já que o próximo valor da linha sujeito_ID é 46).

Partição 4:

PHP ocorreu três vezes na quarta partição.

  1. O valor de chumbo para o 3º PHP é nulo.
  2. O valor do chumbo para o 1º PHP é 4 (já que o próximo valor da linha sujeito_ID é 4).
  3. O valor do chumbo para o 2º PHP é 4 (já que o próximo valor da linha sujeito_ID é 4).

Exemplo 2:

Lidere as linhas por 2. Certifique -se de criar o Pyspark DataFrame, como visto no Exemplo 1.

# importe a função da janela
de Pyspark.SQL.Janela de importação de janela
#import a liderança do Pyspark.SQL.funções
de Pyspark.SQL.funções Importar chumbo
#Partition O DataFrame com base nos valores na coluna Technology1 e
#orde as linhas em cada partição com base na coluna Subjetion_id
partição = janela.partitionby ("tecnologia1").ordem ('sujeito_id')
print ("---------- Particionado DataFrame ----------")
#Agora menciona o lead com offset-2 com base em sujeito_id
dataframe_obj.Withcolumn ("Lead", Lead ("Subvenço_id", 2).sobre (partição)).mostrar()

Saída:

Explicação:

A partição é feita com base no Technology1 coluna.

O número total de partições é 4.

Partição 1:

O .A rede ocorreu duas vezes na primeira partição. Como especificamos o deslocamento de chumbo como 2, o deslocamento é nulo para ambos os valores.

Partição 2:

Hadoop ocorreu uma vez na segunda partição. Então, o chumbo é nulo.

Partição 3:

Oracle ocorreu quatro vezes na terceira partição.

  • Nos dois últimos Oracle, a liderança é nula.
  • Para o primeiro oráculo, o valor do chumbo é 12 (já que as próximas 2 linhas sujeitos_id valor são 12).
  • Para o segundo Oracle, o valor do chumbo é 46 (já que as próximas 2 linhas sujeitos_id valor são 46).

Partição 4:

PHP ocorreu três vezes na quarta partição.

  • Nos dois últimos Oracle, a liderança é nula.
  • Para o primeiro PHP, o valor do chumbo é 4 (já que as próximas 2 linhas sujeitos_id valor são 4).

Exemplo 3:

Lidere as linhas por 2 com base na coluna de idade. Certifique -se de criar o Pyspark DataFrame, como visto no Exemplo 1.

# importe a função da janela
de Pyspark.SQL.Janela de importação de janela
#import a liderança do Pyspark.SQL.funções
de Pyspark.SQL.funções Importar chumbo
#Partition O DataFrame com base nos valores na coluna Technology1 e
#orde as linhas em cada partição com base na coluna Subjetion_id
partição = janela.partitionby ("tecnologia1").ordem ('sujeito_id')
print ("---------- Particionado DataFrame ----------")
#Agora menciona o chumbo com offset-2 com base na idade
dataframe_obj.WithColumn ("Lead", Lead ("Age", 2).sobre (partição)).mostrar()

Saída:

Explicação:

A partição é feita com base no Technology1 coluna e o chumbo é definido com base na coluna de idade.

O número total de partições é 4.

Partição 1:

O .A rede ocorreu duas vezes na primeira partição. Como especificamos o deslocamento de chumbo como 2, o deslocamento é nulo para ambos os valores.

Partição 2:

Hadoop ocorreu uma vez na segunda partição. Então, a liderança é nula.

Partição 3:

Oracle ocorreu quatro vezes na terceira partição.

  • Para os dois últimos Oracle, a liderança é nula.
  • Para o primeiro Oracle, o valor do chumbo é 22 (já que as próximas 2 linhas de idade são 22).
  • Para o segundo Oracle, o valor do chumbo é 22 (já que as próximas 2 linhas de idade são 22).

Partição 4:

PHP ocorreu três vezes na quarta partição.

  • Para os dois últimos Oracle, a liderança é nula.
  • Para o primeiro PHP, o valor do chumbo é 21 (já que as próximas 2 linhas de idade são 21).

Conclusão

Aprendemos como obter os valores de chumbo no Pyspark Dataframe nas linhas particionadas. A função Lead () no Pyspark está disponível no módulo de janela que é usado para retornar os próximos valores da linha às linhas atuais. É preciso um parâmetro de deslocamento que representa o número total de linhas, de modo que os próximos valores da linha sejam retornados às linhas reais. Para a primeira última fila, os nulos (deslocados) são colocados. Aprendemos os diferentes exemplos definindo as diferentes compensações.