A question on the Flink "rolling" FoldFunction

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

A question on the Flink "rolling" FoldFunction

min.tan

Hi,

 

I am new to Flink. I have a question on this "rolling" fold function.

 

If its parallelism is large than one, does the "rolling" order remains the same? i.e. it is always keep the "1-2-3-4-5" on an increasing sequence.

 

Regards,

 

Min

 

------------------------------------------------------------------- FoldFunction ---------------------------------------------------------------------------------------------------------------

A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value. 

A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...

DataStream<String> result =
  keyedStream.fold("start", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
  });

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

 

From: jincheng sun [mailto:[hidden email]]
Sent: Freitag, 7. Dezember 2018 02:24
To: [hidden email]
Cc: [hidden email]; [hidden email]
Subject: [External] Re: delay one of the datastream when performing join operation on event-time and watermark

 

Hi Pakesh Kuma,

I think you can using the interval-join, e.g.:

orderStream
    .keyBy(<KeySelector>)
    .intervalJoin(invoiceStream.keyBy(<KeySelector>))
    .between(Time.minutes(-5), Time.minutes(5))

The semantics of interval-join and detailed usage description can refer to https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/joining.html#interval-join

 

Hope to help you, and any feedback is welcome!

 

Bests,

Jincheng

 

 

Rakesh Kumar <[hidden email]> 2018126日周四 下午7:10写道:

Hi,

I have two data sources one is  for order data and another one is for invoice data, these two data i am pushing into kafka topic in json form. I wanted to delay order data for 5 mins because invoice data comes only after order data is generated. So, for that i have written a flink program which will take these two data from kafka and apply watermarks and delay order data for 5 mins. After applying watermarks on these data, i wanted to join these data based on order_id which is present in both order and invoice data. After Joining i wanted to push it to kafka in different topic.

 

But, i am not able to join these data streams with 5 min delay and i am not able to figure it out.

 

I am attaching my flink program below and it's dependency.



Check out our new brand campaign: www.ubs.com/together
E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails.
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/global/en/legalinfo2/privacy.html
Reply | Threaded
Open this post in threaded view
|

Re: A question on the Flink "rolling" FoldFunction

Piotr Nowojski
Hi Min,

Please feel welcomed in the Flink community. One small remark, dev mailing list is for developers of Flink and all of the issues/discussions that arise in the process (discussing how to implement new feature etc), so user mailing list is the right one to ask questions about using Flink :)

Asking your question, Flink doesn’t guarantee order of the elements and especially not when it involves multiple parallel operators. The order can brake whenever there is some network exchange/transfer between operators/tasks, for example after using `keyBy`, `rebalance` or on the border of changing parallelism. Locally, within one instance of your Fold function, order is preserved. In other words, if you need some elements order to process records, you have to make sure that it’s there, for example:


env.addSrouce(….)
.keyBy(….)
.process(new MyElementsSorter())
.fold(…)    

However keep in mind that sorting records in an infinite data stream is a quite tricky concept unless it’s only a “best effort". Usually you would probably like to group records into a window and process those windows.

Piotrek

On 7 Dec 2018, at 09:12, [hidden email] wrote:

Hi,

 

I am new to Flink. I have a question on this "rolling" fold function.

 

If its parallelism is large than one, does the "rolling" order remains the same? i.e. it is always keep the "1-2-3-4-5" on an increasing sequence.

 

Regards,

 

Min

 

------------------------------------------------------------------- FoldFunction ---------------------------------------------------------------------------------------------------------------

A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value. 

A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...

DataStream<String> result =
  keyedStream.fold("start", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
  });

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 
From: jincheng sun [[hidden email]] 
Sent: Freitag, 7. Dezember 2018 02:24
To: [hidden email]
Cc: [hidden email]; [hidden email]
Subject: [External] Re: delay one of the datastream when performing join operation on event-time and watermark
 
Hi Pakesh Kuma,
I think you can using the interval-join, e.g.:
orderStream
    .keyBy(<KeySelector>)
    .intervalJoin(invoiceStream.keyBy(<KeySelector>))
    .between(Time.minutes(-5), Time.minutes(5))
The semantics of interval-join and detailed usage description can refer to https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/joining.html#interval-join
 
Hope to help you, and any feedback is welcome!
 
Bests,
Jincheng
 
 
Rakesh Kumar <[hidden email]> 2018126日周四 下午7:10写道:
Hi,
I have two data sources one is  for order data and another one is for invoice data, these two data i am pushing into kafka topic in json form. I wanted to delay order data for 5 mins because invoice data comes only after order data is generated. So, for that i have written a flink program which will take these two data from kafka and apply watermarks and delay order data for 5 mins. After applying watermarks on these data, i wanted to join these data based on order_id which is present in both order and invoice data. After Joining i wanted to push it to kafka in different topic.
 
But, i am not able to join these data streams with 5 min delay and i am not able to figure it out.
 
I am attaching my flink program below and it's dependency.

Check out our new brand campaign: www.ubs.com/together
E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/global/en/legalinfo2/privacy.html