Csv to windows?

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Csv to windows?

Felix Neutatz
Hi everybody,

I finally reached streaming territory. For a student project I want to implement CluStream for Flink. I guess this is especially interesting to try queryable state :)

But I have problems at the first steps. My input data is a csv file of records. For the start I just want to window this csv. I don't want to use AllWindows because it's not parallelizable.

So my first question is: Can I window by processing time, like this: 
connectionRecordsT.keyBy(processing_time).timeWindow(Time.milliseconds(1000L))
I didn't find a way, so I added in the csv an index column and tried to use a countWindow:
DataStreamSource<String> source = env.readTextFile(file.getPath());

DataStream<Tuple2<Long,Vector>> connectionRecords = source.map(new MapToVector()).setParallelism(4);

connectionRecords.keyBy(0).countWindow(10).apply (
new WindowFunction<Tuple2<Long,Vector>, Tuple1<Integer>, Tuple, GlobalWindow>() {
public void apply (Tuple tuple,
GlobalWindow window,
Iterable<Tuple2<Long, Vector>> values,
Collector<Tuple1<Integer>> out) throws Exception {
int sum = 0;
Iterator iterator = values.iterator();
while (iterator.hasNext () ) {
Tuple2<Long,Vector> t = (Tuple2<Long,Vector>)iterator.next();
sum += 1;
}
out.collect (new Tuple1<Integer>(new Integer(sum)));
}
}).writeAsCsv("text");
To check whether everything works I just count the elements per window and write them into a csv file. 
Flink generates the files but all are empty. Can you tell me, what I did wrong?
Best regards,
Felix
Reply | Threaded
Open this post in threaded view
|

Re: Csv to windows?

Till Rohrmann
Hi Felix,

I'm not sure whether grouping/keyBy by processing time makes semantically any sense. This can be anything depending on the execution order. Therefore, there is not build in mechanism to group by processing time. But you can always write a mapper which assigns the current processing time to the stream record and use this field for grouping.

Concerning your second problem, could you check the path of the file? At the moment Flink fails silently if the path is not valid. It might be that you have a simple typo in the path. I've opened an issue to fix this issue [1].


Cheers,
Till





On Sun, Nov 6, 2016 at 12:36 PM, Felix Neutatz <[hidden email]> wrote:
Hi everybody,

I finally reached streaming territory. For a student project I want to implement CluStream for Flink. I guess this is especially interesting to try queryable state :)

But I have problems at the first steps. My input data is a csv file of records. For the start I just want to window this csv. I don't want to use AllWindows because it's not parallelizable.

So my first question is: Can I window by processing time, like this: 
connectionRecordsT.keyBy(processing_time).timeWindow(Time.milliseconds(1000L))
I didn't find a way, so I added in the csv an index column and tried to use a countWindow:
DataStreamSource<String> source = env.readTextFile(file.getPath());

DataStream<Tuple2<Long,Vector>> connectionRecords = source.map(new MapToVector()).setParallelism(4);

connectionRecords.keyBy(0).countWindow(10).apply (
new WindowFunction<Tuple2<Long,Vector>, Tuple1<Integer>, Tuple, GlobalWindow>() {
public void apply (Tuple tuple,
GlobalWindow window,
Iterable<Tuple2<Long, Vector>> values,
Collector<Tuple1<Integer>> out) throws Exception {
int sum = 0;
Iterator iterator = values.iterator();
while (iterator.hasNext () ) {
Tuple2<Long,Vector> t = (Tuple2<Long,Vector>)iterator.next();
sum += 1;
}
out.collect (new Tuple1<Integer>(new Integer(sum)));
}
}).writeAsCsv("text");
To check whether everything works I just count the elements per window and write them into a csv file. 
Flink generates the files but all are empty. Can you tell me, what I did wrong?
Best regards,
Felix

Reply | Threaded
Open this post in threaded view
|

Re: Csv to windows?

Felix Neutatz
Hi Till,

the mapper solution makes sense :)

Unfortunately, in my case it was not a typo in the path. I checked and saw that the records are read. 


I am happy for any ideas.

Best regards,
Felix

2016-11-07 16:15 GMT+01:00 Till Rohrmann <[hidden email]>:
Hi Felix,

I'm not sure whether grouping/keyBy by processing time makes semantically any sense. This can be anything depending on the execution order. Therefore, there is not build in mechanism to group by processing time. But you can always write a mapper which assigns the current processing time to the stream record and use this field for grouping.

Concerning your second problem, could you check the path of the file? At the moment Flink fails silently if the path is not valid. It might be that you have a simple typo in the path. I've opened an issue to fix this issue [1].


Cheers,
Till





On Sun, Nov 6, 2016 at 12:36 PM, Felix Neutatz <[hidden email]> wrote:
Hi everybody,

I finally reached streaming territory. For a student project I want to implement CluStream for Flink. I guess this is especially interesting to try queryable state :)

But I have problems at the first steps. My input data is a csv file of records. For the start I just want to window this csv. I don't want to use AllWindows because it's not parallelizable.

So my first question is: Can I window by processing time, like this: 
connectionRecordsT.keyBy(processing_time).timeWindow(Time.milliseconds(1000L))
I didn't find a way, so I added in the csv an index column and tried to use a countWindow:
DataStreamSource<String> source = env.readTextFile(file.getPath());

DataStream<Tuple2<Long,Vector>> connectionRecords = source.map(new MapToVector()).setParallelism(4);

connectionRecords.keyBy(0).countWindow(10).apply (
new WindowFunction<Tuple2<Long,Vector>, Tuple1<Integer>, Tuple, GlobalWindow>() {
public void apply (Tuple tuple,
GlobalWindow window,
Iterable<Tuple2<Long, Vector>> values,
Collector<Tuple1<Integer>> out) throws Exception {
int sum = 0;
Iterator iterator = values.iterator();
while (iterator.hasNext () ) {
Tuple2<Long,Vector> t = (Tuple2<Long,Vector>)iterator.next();
sum += 1;
}
out.collect (new Tuple1<Integer>(new Integer(sum)));
}
}).writeAsCsv("text");
To check whether everything works I just count the elements per window and write them into a csv file. 
Flink generates the files but all are empty. Can you tell me, what I did wrong?
Best regards,
Felix


Reply | Threaded
Open this post in threaded view
|

Re: Csv to windows?

Yassine MARZOUGUI
Hi Flelix,

As I see in kddcup.newtestdata_small_unlabeled_index, the first field of connectionRecords (splits[0]), is unique for each record, therefore when apply keyBy(0), it will logically partition your stream by that field and each partition will contain only one element. So the countWindow(2) actually never fires because it never reaches 2 elements. That's why your files stay empty.

Could you please go into more detail about what the expected output is? Then we might be able to figure out the proper way to achieve it.

Best,
Yassine

2016-11-07 19:18 GMT+01:00 Felix Neutatz <[hidden email]>:
Hi Till,

the mapper solution makes sense :)

Unfortunately, in my case it was not a typo in the path. I checked and saw that the records are read. 


I am happy for any ideas.

Best regards,
Felix

2016-11-07 16:15 GMT+01:00 Till Rohrmann <[hidden email]>:
Hi Felix,

I'm not sure whether grouping/keyBy by processing time makes semantically any sense. This can be anything depending on the execution order. Therefore, there is not build in mechanism to group by processing time. But you can always write a mapper which assigns the current processing time to the stream record and use this field for grouping.

Concerning your second problem, could you check the path of the file? At the moment Flink fails silently if the path is not valid. It might be that you have a simple typo in the path. I've opened an issue to fix this issue [1].


Cheers,
Till





On Sun, Nov 6, 2016 at 12:36 PM, Felix Neutatz <[hidden email]> wrote:
Hi everybody,

I finally reached streaming territory. For a student project I want to implement CluStream for Flink. I guess this is especially interesting to try queryable state :)

But I have problems at the first steps. My input data is a csv file of records. For the start I just want to window this csv. I don't want to use AllWindows because it's not parallelizable.

So my first question is: Can I window by processing time, like this: 
connectionRecordsT.keyBy(processing_time).timeWindow(Time.milliseconds(1000L))
I didn't find a way, so I added in the csv an index column and tried to use a countWindow:
DataStreamSource<String> source = env.readTextFile(file.getPath());

DataStream<Tuple2<Long,Vector>> connectionRecords = source.map(new MapToVector()).setParallelism(4);

connectionRecords.keyBy(0).countWindow(10).apply (
new WindowFunction<Tuple2<Long,Vector>, Tuple1<Integer>, Tuple, GlobalWindow>() {
public void apply (Tuple tuple,
GlobalWindow window,
Iterable<Tuple2<Long, Vector>> values,
Collector<Tuple1<Integer>> out) throws Exception {
int sum = 0;
Iterator iterator = values.iterator();
while (iterator.hasNext () ) {
Tuple2<Long,Vector> t = (Tuple2<Long,Vector>)iterator.next();
sum += 1;
}
out.collect (new Tuple1<Integer>(new Integer(sum)));
}
}).writeAsCsv("text");
To check whether everything works I just count the elements per window and write them into a csv file. 
Flink generates the files but all are empty. Can you tell me, what I did wrong?
Best regards,
Felix



Reply | Threaded
Open this post in threaded view
|

Re: Csv to windows?

Felix Neutatz

Hi Yassine,

thanks that explains it :)

Best regards,
Felix


On Nov 7, 2016 21:28, "Yassine MARZOUGUI" <[hidden email]> wrote:
Hi Flelix,

As I see in kddcup.newtestdata_small_unlabeled_index, the first field of connectionRecords (splits[0]), is unique for each record, therefore when apply keyBy(0), it will logically partition your stream by that field and each partition will contain only one element. So the countWindow(2) actually never fires because it never reaches 2 elements. That's why your files stay empty.

Could you please go into more detail about what the expected output is? Then we might be able to figure out the proper way to achieve it.

Best,
Yassine

2016-11-07 19:18 GMT+01:00 Felix Neutatz <[hidden email]>:
Hi Till,

the mapper solution makes sense :)

Unfortunately, in my case it was not a typo in the path. I checked and saw that the records are read. 


I am happy for any ideas.

Best regards,
Felix

2016-11-07 16:15 GMT+01:00 Till Rohrmann <[hidden email]>:
Hi Felix,

I'm not sure whether grouping/keyBy by processing time makes semantically any sense. This can be anything depending on the execution order. Therefore, there is not build in mechanism to group by processing time. But you can always write a mapper which assigns the current processing time to the stream record and use this field for grouping.

Concerning your second problem, could you check the path of the file? At the moment Flink fails silently if the path is not valid. It might be that you have a simple typo in the path. I've opened an issue to fix this issue [1].


Cheers,
Till





On Sun, Nov 6, 2016 at 12:36 PM, Felix Neutatz <[hidden email]> wrote:
Hi everybody,

I finally reached streaming territory. For a student project I want to implement CluStream for Flink. I guess this is especially interesting to try queryable state :)

But I have problems at the first steps. My input data is a csv file of records. For the start I just want to window this csv. I don't want to use AllWindows because it's not parallelizable.

So my first question is: Can I window by processing time, like this: 
connectionRecordsT.keyBy(processing_time).timeWindow(Time.milliseconds(1000L))
I didn't find a way, so I added in the csv an index column and tried to use a countWindow:
DataStreamSource<String> source = env.readTextFile(file.getPath());

DataStream<Tuple2<Long,Vector>> connectionRecords = source.map(new MapToVector()).setParallelism(4);

connectionRecords.keyBy(0).countWindow(10).apply (
new WindowFunction<Tuple2<Long,Vector>, Tuple1<Integer>, Tuple, GlobalWindow>() {
public void apply (Tuple tuple,
GlobalWindow window,
Iterable<Tuple2<Long, Vector>> values,
Collector<Tuple1<Integer>> out) throws Exception {
int sum = 0;
Iterator iterator = values.iterator();
while (iterator.hasNext () ) {
Tuple2<Long,Vector> t = (Tuple2<Long,Vector>)iterator.next();
sum += 1;
}
out.collect (new Tuple1<Integer>(new Integer(sum)));
}
}).writeAsCsv("text");
To check whether everything works I just count the elements per window and write them into a csv file. 
Flink generates the files but all are empty. Can you tell me, what I did wrong?
Best regards,
Felix



Reply | Threaded
Open this post in threaded view
|

Re: Csv to windows?

Felix Neutatz
Hi everybody,

I found a new problem. The algorithm I want to implement needs a global ReducingState. What do I mean with that:
I want to calculate a local aggregation for each task and then combine all these local aggregates to one global aggregate and push this global aggregate to all nodes and continue processing the data stream. If you don't understand my description, I also made some drawings of what I mean: https://docs.google.com/presentation/d/13ei6pzhwNKqNShhdNWXqJaYCG1z0Hsrxfy5sRnqun5M/edit?usp=sharing

I found out that the ReducingState described here: https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html in the CountWindowAverage example only has degree of parallelization = 1 and when I use more keys, I get a higher degree of parallelization, but no global synchronization.

I am really new to streaming, so maybe I follow some bad assumptions. You can also point me to some reading :)

Thank you for your help.

Best regards,
Felix

2016-11-08 10:17 GMT+01:00 Felix Neutatz <[hidden email]>:

Hi Yassine,

thanks that explains it :)

Best regards,
Felix


On Nov 7, 2016 21:28, "Yassine MARZOUGUI" <[hidden email]> wrote:
Hi Flelix,

As I see in kddcup.newtestdata_small_unlabeled_index, the first field of connectionRecords (splits[0]), is unique for each record, therefore when apply keyBy(0), it will logically partition your stream by that field and each partition will contain only one element. So the countWindow(2) actually never fires because it never reaches 2 elements. That's why your files stay empty.

Could you please go into more detail about what the expected output is? Then we might be able to figure out the proper way to achieve it.

Best,
Yassine

2016-11-07 19:18 GMT+01:00 Felix Neutatz <[hidden email]>:
Hi Till,

the mapper solution makes sense :)

Unfortunately, in my case it was not a typo in the path. I checked and saw that the records are read. 


I am happy for any ideas.

Best regards,
Felix

2016-11-07 16:15 GMT+01:00 Till Rohrmann <[hidden email]>:
Hi Felix,

I'm not sure whether grouping/keyBy by processing time makes semantically any sense. This can be anything depending on the execution order. Therefore, there is not build in mechanism to group by processing time. But you can always write a mapper which assigns the current processing time to the stream record and use this field for grouping.

Concerning your second problem, could you check the path of the file? At the moment Flink fails silently if the path is not valid. It might be that you have a simple typo in the path. I've opened an issue to fix this issue [1].


Cheers,
Till





On Sun, Nov 6, 2016 at 12:36 PM, Felix Neutatz <[hidden email]> wrote:
Hi everybody,

I finally reached streaming territory. For a student project I want to implement CluStream for Flink. I guess this is especially interesting to try queryable state :)

But I have problems at the first steps. My input data is a csv file of records. For the start I just want to window this csv. I don't want to use AllWindows because it's not parallelizable.

So my first question is: Can I window by processing time, like this: 
connectionRecordsT.keyBy(processing_time).timeWindow(Time.milliseconds(1000L))
I didn't find a way, so I added in the csv an index column and tried to use a countWindow:
DataStreamSource<String> source = env.readTextFile(file.getPath());

DataStream<Tuple2<Long,Vector>> connectionRecords = source.map(new MapToVector()).setParallelism(4);

connectionRecords.keyBy(0).countWindow(10).apply (
new WindowFunction<Tuple2<Long,Vector>, Tuple1<Integer>, Tuple, GlobalWindow>() {
public void apply (Tuple tuple,
GlobalWindow window,
Iterable<Tuple2<Long, Vector>> values,
Collector<Tuple1<Integer>> out) throws Exception {
int sum = 0;
Iterator iterator = values.iterator();
while (iterator.hasNext () ) {
Tuple2<Long,Vector> t = (Tuple2<Long,Vector>)iterator.next();
sum += 1;
}
out.collect (new Tuple1<Integer>(new Integer(sum)));
}
}).writeAsCsv("text");
To check whether everything works I just count the elements per window and write them into a csv file. 
Flink generates the files but all are empty. Can you tell me, what I did wrong?
Best regards,
Felix




Reply | Threaded
Open this post in threaded view
|

Re: Csv to windows?

Ufuk Celebi
I think this is independent of streaming. If you want to compute the aggregate over all keys and data you need to do this in a single task, e.g. use a (flat)map with parallelism 1, do the aggregation there and then broadcast to downstream operators. Does this make sense or am I overlooking something?

On 12 November 2016 at 12:18:04, Felix Neutatz ([hidden email]) wrote:
> > want to calculate a local aggregation for each task and then
> combine all these local aggregates to one global aggregate and
> push this global aggregate to all nodes and continue processing
> the data stream. If you don't understand my description, I also
> made some drawings of what I mean: https://docs.google.com/presentation/d/13ei6pzhwNKqNShhdNWXqJaYCG1z0Hsrxfy5sRnqun5M/edit?usp=sharing 
>

Reply | Threaded
Open this post in threaded view
|

Re: Csv to windows?

Felix Neutatz

2016-11-14 9:41 GMT+01:00 Ufuk Celebi <[hidden email]>:
I think this is independent of streaming. If you want to compute the aggregate over all keys and data you need to do this in a single task, e.g. use a (flat)map with parallelism 1, do the aggregation there and then broadcast to downstream operators. Does this make sense or am I overlooking something?

On 12 November 2016 at 12:18:04, Felix Neutatz ([hidden email]) wrote:
> > want to calculate a local aggregation for each task and then
> combine all these local aggregates to one global aggregate and
> push this global aggregate to all nodes and continue processing
> the data stream. If you don't understand my description, I also
> made some drawings of what I mean: https://docs.google.com/presentation/d/13ei6pzhwNKqNShhdNWXqJaYCG1z0Hsrxfy5sRnqun5M/edit?usp=sharing
>