Hi everybody,
I finally reached streaming territory. For a student project I want to implement CluStream for Flink. I guess this is especially interesting to try queryable state :) But I have problems at the first steps. My input data is a csv file of records. For the start I just want to window this csv. I don't want to use AllWindows because it's not parallelizable. So my first question is: Can I window by processing time, like this: connectionRecordsT.keyBy(processing_time).timeWindow(Time.milliseconds(1000L)) I didn't find a way, so I added in the csv an index column and tried to use a countWindow: DataStreamSource<String> source = env.readTextFile(file.getPath()); |
Hi Felix, I'm not sure whether grouping/keyBy by processing time makes semantically any sense. This can be anything depending on the execution order. Therefore, there is not build in mechanism to group by processing time. But you can always write a mapper which assigns the current processing time to the stream record and use this field for grouping. Concerning your second problem, could you check the path of the file? At the moment Flink fails silently if the path is not valid. It might be that you have a simple typo in the path. I've opened an issue to fix this issue [1]. Cheers, Till On Sun, Nov 6, 2016 at 12:36 PM, Felix Neutatz <[hidden email]> wrote:
|
Hi Till, the mapper solution makes sense :) Unfortunately, in my case it was not a typo in the path. I checked and saw that the records are read. You can find the whole program here: https://github.com/FelixNeutatz/CluStream/blob/master/flink-java-project/src/main/java/org/apache/flink/clustream/StreamingJobIndex.java I am happy for any ideas. Best regards, Felix 2016-11-07 16:15 GMT+01:00 Till Rohrmann <[hidden email]>:
|
Hi Flelix, As I see in kddcup.newtestdata_small_un Could you please go into more detail about what the expected output is? Then we might be able to figure out the proper way to achieve it. Best, Yassine 2016-11-07 19:18 GMT+01:00 Felix Neutatz <[hidden email]>:
|
Hi Yassine, thanks that explains it :) Best regards, On Nov 7, 2016 21:28, "Yassine MARZOUGUI" <[hidden email]> wrote:
|
Hi everybody, I found a new problem. The algorithm I want to implement needs a global ReducingState. What do I mean with that: I want to calculate a local aggregation for each task and then combine all these local aggregates to one global aggregate and push this global aggregate to all nodes and continue processing the data stream. If you don't understand my description, I also made some drawings of what I mean: https://docs.google.com/presentation/d/13ei6pzhwNKqNShhdNWXqJaYCG1z0Hsrxfy5sRnqun5M/edit?usp=sharing I found out that the ReducingState described here: https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html in the CountWindowAverage example only has degree of parallelization = 1 and when I use more keys, I get a higher degree of parallelization, but no global synchronization. I am really new to streaming, so maybe I follow some bad assumptions. You can also point me to some reading :) Thank you for your help. Best regards, Felix 2016-11-08 10:17 GMT+01:00 Felix Neutatz <[hidden email]>:
|
I think this is independent of streaming. If you want to compute the aggregate over all keys and data you need to do this in a single task, e.g. use a (flat)map with parallelism 1, do the aggregation there and then broadcast to downstream operators. Does this make sense or am I overlooking something?
On 12 November 2016 at 12:18:04, Felix Neutatz ([hidden email]) wrote: > > want to calculate a local aggregation for each task and then > combine all these local aggregates to one global aggregate and > push this global aggregate to all nodes and continue processing > the data stream. If you don't understand my description, I also > made some drawings of what I mean: https://docs.google.com/presentation/d/13ei6pzhwNKqNShhdNWXqJaYCG1z0Hsrxfy5sRnqun5M/edit?usp=sharing > |
I found the solution here: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-td6456.html 2016-11-14 9:41 GMT+01:00 Ufuk Celebi <[hidden email]>: I think this is independent of streaming. If you want to compute the aggregate over all keys and data you need to do this in a single task, e.g. use a (flat)map with parallelism 1, do the aggregation there and then broadcast to downstream operators. Does this make sense or am I overlooking something? |
Free forum by Nabble | Edit this page |