Pyspark - função de atraso

Pyspark - função de atraso
A função lag () no Pyspark está disponível no módulo de janela que é usado para retornar os valores das linhas anteriores às linhas atuais. Primeiro, a função lag () retorna nulo para as linhas superiores. É preciso um parâmetro de deslocamento que representa o número total de linhas, de modo que os valores da linha anterior sejam retornados às próximas linhas. Para as primeiras linhas superiores, os nulos (deslocados) são colocados.

É possível particionar as linhas no quadro de dados usando a função da janela. Está disponível no Pyspark.SQL.janela módulo.

Sintaxe:

dataframe_obj.withcolumn ("lag_column", lag ("coluna", deslocamento).sobre (partição))

São necessários dois parâmetros:

  1. A coluna é o nome da coluna no quadro de dados Pyspark, no qual os valores de linha atrasados ​​são colocados com base nos valores nesta coluna.
  2. O deslocamento especifica o número inteiro para retornar esse número de linhas anteriores aos valores atuais da linha.

Passos:

  1. Crie um quadro de dados Pyspark que tenha alguns valores semelhantes em pelo menos uma coluna.
  2. Partição Os dados usando o método Partionby () disponíveis na função da janela e encomende -os com base na coluna usando a função Orderby ().

Sintaxe:

partição = janela.Partionby ("coluna").ordem ("coluna")

Podemos solicitar os dados particionados com a coluna particionada ou qualquer outra coluna.

Agora, você pode usar a função lag () nas linhas particionadas usando o sobre() função.

Adicionamos uma coluna para armazenar o número da linha usando o withcolumn () função.

Sintaxe:

dataframe_obj.withcolumn ("lag_column", lag ("coluna", deslocamento).sobre (partição))

Aqui, o nome especifica o nome da linha e o dataframe_obj é o nosso pyspark dataframe.

Vamos implementar o código.

Exemplo 1:

Aqui, criamos um quadro de dados Pyspark que possui 5 colunas - ['sujeito_id', 'nome', 'idade', 'tecnologia1', 'tecnologia2'] com 10 linhas e particionando as linhas com base em Technology1 Usando a função da janela. Depois disso, atrasamos 1 fila.

Importar Pyspark
de Pyspark.Importação SQL *
Spark_App = SparkSession.construtor.nome do aplicativo('_').getorcreate ()
alunos = [(4, 'sravan', 23, 'php', 'testes'),
(4, 'sravan', 23, 'php', 'testing'),
(46, 'Mounika', 22, '.Net ',' html '),
(4, 'Deepika', 21, 'Oracle', 'html'),
(46, 'Mounika', 22, 'Oracle', 'Testing'),
(12, 'Chandrika', 22, 'Hadoop', 'C#'),
(12, 'Chandrika', 22, 'Oracle', 'Testing'),
(4, 'Sravan', 23, 'Oracle', 'C#'),
(4, 'Deepika', 21, 'Php', 'C#'),
(46, 'Mounika', 22, '.Net ',' teste ')
]
dataframe_obj = spark_app.CreatedataFrame (estudantes, ['sujeito_id', 'nome', 'idade', 'tecnologia1', 'tecnologia2']))
print ("---------- DataFrame real ----------")
dataframe_obj.mostrar()
# importe a função da janela
de Pyspark.SQL.Janela de importação de janela
#import o atraso do Pyspark.SQL.funções
de Pyspark.SQL.Funções Importar lag
#Partition O DataFrame com base nos valores na coluna Technology1 e
#orde as linhas em cada partição com base na coluna Subjetion_id
partição = janela.partitionby ("tecnologia1").ordem ('sujeito_id')
print ("---------- Particionado DataFrame ----------")
#Agora mencione o atraso com offset-1 com base em sujeito_id
dataframe_obj.Withcolumn ("Lag", Lag ("Subvenço_id", 1).sobre (partição)).mostrar()

Saída:

Explicação:

Na primeira saída, representa os dados reais presentes no quadro de dados. Na segunda saída, a partição é feita com base no Technology1 coluna.

O número total de partições é 4.

Partição 1:

O .A rede ocorreu duas vezes na primeira partição. Desde que especificamos o deslocamento de lag como 1, o primeiro .O valor líquido é nulo e o próximo .O valor líquido é o valor anterior da linha subject_id - 46.

Partição 2:

Hadoop ocorreu uma vez na segunda partição. Então, lag é nulo.

Partição 3:

Oracle ocorreu quatro vezes na terceira partição.

Para o primeiro oráculo, o lag é nulo.

Para o segundo Oracle, o valor do atraso é 4 (já que o valor anterior da linha sujeito_id é 4).

Para o terceiro oráculo, o valor do atraso é 4 (já que o valor anterior da linha sujeito_ID é 4).

Para o quarto oráculo, o valor do atraso é 12 (já que o valor anterior da linha sujeito_ID é 12).

Partição 4:

PHP ocorreu três vezes na quarta partição.

O valor de atraso para o 1º PHP é nulo.

O valor de atraso para o 2º PHP é 4 (já que o valor da linha anterior_ID é 4).

O valor de atraso para o 3º PHP é 4 (já que o valor da linha anterior_id é 4).

Exemplo 2:

Atrasar as linhas por 2. Certifique -se de criar o Pyspark DataFrame, como visto no Exemplo 1.

# importe a função da janela
de Pyspark.SQL.Janela de importação de janela
#import o atraso do Pyspark.SQL.funções
de Pyspark.SQL.Funções Importar lag
#Partition O DataFrame com base nos valores na coluna Technology1 e
#orde as linhas em cada partição com base na coluna Subjetion_id
partição = janela.partitionby ("tecnologia1").ordem ('sujeito_id')
print ("---------- Particionado DataFrame ----------")
#Agora mencione o atraso com offset-2 com base em sujeito_id
dataframe_obj.Withcolumn ("Lag", Lag ("Subvenço_id", 2).sobre (partição)).mostrar()

Saída:

Explicação:

A partição é feita com base no Technology1 coluna. O número total de partições é 4.

Partição 1:

O .A rede ocorreu duas vezes na primeira partição. Como especificamos o deslocamento de lag como 2, o deslocamento é nulo para ambos os valores.

Partição 2:

Hadoop ocorreu uma vez na segunda partição. Então, lag é nulo.

Partição 3:

Oracle ocorreu quatro vezes na terceira partição.

Para o primeiro e o segundo oráculo, o lag é nulo.

Para o terceiro oráculo, o valor do atraso é 4 (já que as 2 linhas anteriores devem ser 4).

Para o quarto oráculo, o valor do atraso é 4 (já que as 2 linhas anteriores, sujeito_id, o valor é 4).

Partição 4:

PHP ocorreu três vezes na quarta partição.

O valor de atraso para o 1º e o 2º PHP é nulo.

O valor de atraso para o 3º PHP é 4 (já que as 2 linhas anteriores, sujeito_id, o valor é 4).

Exemplo 3:

Fique as linhas em 2 com base na coluna de idade. Certifique -se de criar o Pyspark DataFrame, como visto no Exemplo 1.

# importe a função da janela
de Pyspark.SQL.Janela de importação de janela
#import o atraso do Pyspark.SQL.funções
de Pyspark.SQL.Funções Importar lag
#Partition O DataFrame com base nos valores na coluna Technology1 e
#Order as linhas em cada partição com base na coluna de idade
partição = janela.partitionby ("tecnologia1").ordem ('idade')
print ("---------- Particionado DataFrame ----------")
#Agora mencione o atraso com o deslocamento-2 com base na idade
dataframe_obj.Withcolumn ("Lag", Lag ("Age", 2).sobre (partição)).mostrar()

Saída:

Explicação:

A partição é feita com base no Technology1 A coluna e o atraso são definidos com base na coluna AGE. O número total de partições é 4.

Partição 1:

O .A rede ocorreu duas vezes na primeira partição. Como especificamos o deslocamento de lag como 2, o deslocamento é nulo para ambos os valores.

Partição 2:

Hadoop ocorreu uma vez na segunda partição. Então, lag é nulo.

Partição 3:

Oracle ocorreu quatro vezes na terceira partição.

Para o primeiro e o segundo oráculo, o lag é nulo.

Para o terceiro oráculo, o valor do atraso é 21 (o valor da idade das duas linhas anteriores é 21).

Para o quarto oráculo, o valor do atraso é 22 (o valor da idade das duas linhas anteriores é 22).

Partição 4:

PHP ocorreu três vezes na quarta partição.

O valor de atraso para o 1º e o 2º PHP é nulo.

O valor de atraso para o 3º HP é 21 (o valor da idade das duas linhas anteriores é 21).

Conclusão

Aprendemos como obter os valores de atraso no quadro de dados do Pyspark em linhas particionadas. A função lag () no Pyspark está disponível no módulo de janela que é usado para retornar os valores das linhas anteriores às linhas atuais. Aprendemos os diferentes exemplos definindo as diferentes compensações.