Difficulty managing keyed streams

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Difficulty managing keyed streams

Gabriel Pelielo | Stone

Hello everyone.


I'm currently developing a flink program to aggregate information about my company's clients' credit card transactions. Each transaction has a clientId and a transactionDate related to it. What I want to do is make a Sliding week time window with size 21 days sliding every 1 hour, comparing the hourly number of transactions of a client to their last week's number of transactions in the same hour, for example: 


  • Monday, August 13th 10:00 to 11:00 => 42 transactions. 
  • Monday, August 20th 10:00 to 11:00 => 20 transactions.
  • Monday, August 27th 10:00 to 11:00 => 29 transactions.


My problem is that if a client does not make a single transaction for a whole hour, I don't get a 0 appended to my resulting list of transaction. What I need as the final result is a list containing 3 elements, each representing the count of transactions, the start and the end of the window, for example:


TransactionProfile(ClientId1, 2018-02-26T14:00, 2018-03-19T14:00, 15, 0, 10).


I have a keyed stream with the key being the clientId and my ideia to solve this problem was to append a 0 to the list of the clientIds that did not make any transactions on that hour whenever another clientId has finished its window, but I don't know how to achieve this. Any help would be appreciated.


I'm coding in Scala with Flink 1.4 and a piece of my code is the following:

val profileStream: DataStream[Profile] = streamFromKafka
.map(openTransaction => Transaction(openTransaction.clientId, openTransaction.transactionDate))
.keyBy(trx => trx.clientId)
.window(SlidingEventWeekTimeWindows.of(Time.days(21), Time.hours(1)))
.aggregate(new CountAggregate(), new TransactionProcessWindowFunction())


Best Regards,






Gabriel Pelielo
Fraud Detection

+ 55 21 98603 9725
[hidden email]

stone.com.br


O conteúdo desta mensagem é confidencial e destinado exclusivamente aos destinatários. Caso a receba por engano, favor destruí-la e notificar o remetente de imediato. O correio eletrônico não configura meio seguro para transmissão de dados e o remetente NÃO se responsabiliza por eventual erro, atraso, extravio, interceptação ou infecção por vírus. Cabe ao destinatário solicitar versão física sempre que necessário.
 
The content of this message is confidential and was intended solely to its recipient. 
In case this message is received by mistake, please destroy it and notify the sender immediately. Electronic mails are not a safe channel for data transmission and the sender accepts NO liability for eventual errors, delays, loss, interception or virus infection. When necessary, the receiver must request a hard-copy version.

Reply | Threaded
Open this post in threaded view
|

Re: Difficulty managing keyed streams

vino yang
Hi Gabriel,

In your scenario, I guess you should be based on Event time. 
In this case, I think you can implement self-triggering by customizing the trigger of the window, and then combine ProcessWindowFunction[1] to define your calculation logic. 
Because most of your time is based on Watermark, and very few scenes require timing triggers.

A similar example is here.[2]


Thanks, vino.


Gabriel Pelielo | Stone <[hidden email]> 于2018年8月27日周一 下午11:29写道:

Hello everyone.


I'm currently developing a flink program to aggregate information about my company's clients' credit card transactions. Each transaction has a clientId and a transactionDate related to it. What I want to do is make a Sliding week time window with size 21 days sliding every 1 hour, comparing the hourly number of transactions of a client to their last week's number of transactions in the same hour, for example: 


  • Monday, August 13th 10:00 to 11:00 => 42 transactions. 
  • Monday, August 20th 10:00 to 11:00 => 20 transactions.
  • Monday, August 27th 10:00 to 11:00 => 29 transactions.


My problem is that if a client does not make a single transaction for a whole hour, I don't get a 0 appended to my resulting list of transaction. What I need as the final result is a list containing 3 elements, each representing the count of transactions, the start and the end of the window, for example:


TransactionProfile(ClientId1, 2018-02-26T14:00, 2018-03-19T14:00, 15, 0, 10).


I have a keyed stream with the key being the clientId and my ideia to solve this problem was to append a 0 to the list of the clientIds that did not make any transactions on that hour whenever another clientId has finished its window, but I don't know how to achieve this. Any help would be appreciated.


I'm coding in Scala with Flink 1.4 and a piece of my code is the following:

val profileStream: DataStream[Profile] = streamFromKafka
.map(openTransaction => Transaction(openTransaction.clientId, openTransaction.transactionDate))
.keyBy(trx => trx.clientId)
.window(SlidingEventWeekTimeWindows.of(Time.days(21), Time.hours(1)))
.aggregate(new CountAggregate(), new TransactionProcessWindowFunction())


Best Regards,






Gabriel Pelielo
Fraud Detection

+ 55 21 98603 9725
[hidden email]

stone.com.br


O conteúdo desta mensagem é confidencial e destinado exclusivamente aos destinatários. Caso a receba por engano, favor destruí-la e notificar o remetente de imediato. O correio eletrônico não configura meio seguro para transmissão de dados e o remetente NÃO se responsabiliza por eventual erro, atraso, extravio, interceptação ou infecção por vírus. Cabe ao destinatário solicitar versão física sempre que necessário.
 
The content of this message is confidential and was intended solely to its recipient. 
In case this message is received by mistake, please destroy it and notify the sender immediately. Electronic mails are not a safe channel for data transmission and the sender accepts NO liability for eventual errors, delays, loss, interception or virus infection. When necessary, the receiver must request a hard-copy version.