Understanding connected streams use without timestamps

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Understanding connected streams use without timestamps

Theodore Vasiloudis
Hello all,

I was playing around with the the IncrementalLearningSkeleton example and I had a couple of questions regarding the behavior of connected streams.

In the example the elements are assigned timestamps, and there is a stream, model, that produces
Double[] elements by ingesting and processing a stream of training Integer data points.

DataStream<Double[]> model = trainingData
                .assignTimestampsAndWatermarks(new LinearTimestamp())
                .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
                .apply(new PartialModelBuilder());


The model stream is then connected onto a newData stream which allows us to use the
constantly updated model stream to make predictions for the incoming stream of newData,
by having a model variable shared between the two map functions in the coMap class.
The shared model var is updated every time an element from the model stream arrives (starts
out as null)

DataStream<Integer> prediction = newData.connect(model).map(new Predictor());

My confusion comes when I tried a slightly different approach [2], without using timestamps
or watermarks. In my example I simply create countWindows of say 100 elements,
and I use readTextFile to read in the trainingData and newData :

DataStream<ArrayList<Double>> model = trainingData
        .countWindowAll(100)
        .apply(new PartialModelBuilder());


When I then connect the model stream to the newData stream, the map1 function of the
comap never sees the model as not null, as it seems that the map functions are executed
in order: first the map1 function is executed for all the newData elements, then the map2
function is executed for all the model elements.

So how does having or not having timestamps affect the behavior of the connected stream?

How would I handle such a case if the notion of timestamps does not apply for my data?
(i.e. here I'm interested in streaming historical data, I assume their order does not matter)
Reply | Threaded
Open this post in threaded view
|

Re: Understanding connected streams use without timestamps

Gyula Fóra
Hi :)

The execution of the Connected functions (map1/map2 in this case) are not affected by the timestamps. In other words it is pretty much arbitrary which input arrives at the CoMapFunction first.

So I think you did everything correctly.

Gyula

Theodore Vasiloudis <[hidden email]> ezt írta (időpont: 2016. nov. 21., H, 12:07):
Hello all,

I was playing around with the the IncrementalLearningSkeleton example and I had a couple of questions regarding the behavior of connected streams.

In the example the elements are assigned timestamps, and there is a stream, model, that produces
Double[] elements by ingesting and processing a stream of training Integer data points.

DataStream<Double[]> model = trainingData
                .assignTimestampsAndWatermarks(new LinearTimestamp())
                .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
                .apply(new PartialModelBuilder());


The model stream is then connected onto a newData stream which allows us to use the
constantly updated model stream to make predictions for the incoming stream of newData,
by having a model variable shared between the two map functions in the coMap class.
The shared model var is updated every time an element from the model stream arrives (starts
out as null)

DataStream<Integer> prediction = newData.connect(model).map(new Predictor());

My confusion comes when I tried a slightly different approach [2], without using timestamps
or watermarks. In my example I simply create countWindows of say 100 elements,
and I use readTextFile to read in the trainingData and newData :

DataStream<ArrayList<Double>> model = trainingData
        .countWindowAll(100)
        .apply(new PartialModelBuilder());


When I then connect the model stream to the newData stream, the map1 function of the
comap never sees the model as not null, as it seems that the map functions are executed
in order: first the map1 function is executed for all the newData elements, then the map2
function is executed for all the model elements.

So how does having or not having timestamps affect the behavior of the connected stream?

How would I handle such a case if the notion of timestamps does not apply for my data?
(i.e. here I'm interested in streaming historical data, I assume their order does not matter)
Reply | Threaded
Open this post in threaded view
|

Re: Understanding connected streams use without timestamps

Theodore Vasiloudis
Thanks for the clarification Gyula!

In that case, is it possible currently to make one of the two connected streams stall until the other stream has produced at least one output before it starts producing as well?


On Mon, Nov 21, 2016 at 3:16 PM, Gyula Fóra <[hidden email]> wrote:
Hi :)

The execution of the Connected functions (map1/map2 in this case) are not affected by the timestamps. In other words it is pretty much arbitrary which input arrives at the CoMapFunction first.

So I think you did everything correctly.

Gyula

Theodore Vasiloudis <[hidden email]> ezt írta (időpont: 2016. nov. 21., H, 12:07):
Hello all,

I was playing around with the the IncrementalLearningSkeleton example and I had a couple of questions regarding the behavior of connected streams.

In the example the elements are assigned timestamps, and there is a stream, model, that produces
Double[] elements by ingesting and processing a stream of training Integer data points.

DataStream<Double[]> model = trainingData
                .assignTimestampsAndWatermarks(new LinearTimestamp())
                .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
                .apply(new PartialModelBuilder());


The model stream is then connected onto a newData stream which allows us to use the
constantly updated model stream to make predictions for the incoming stream of newData,
by having a model variable shared between the two map functions in the coMap class.
The shared model var is updated every time an element from the model stream arrives (starts
out as null)

DataStream<Integer> prediction = newData.connect(model).map(new Predictor());

My confusion comes when I tried a slightly different approach [2], without using timestamps
or watermarks. In my example I simply create countWindows of say 100 elements,
and I use readTextFile to read in the trainingData and newData :

DataStream<ArrayList<Double>> model = trainingData
        .countWindowAll(100)
        .apply(new PartialModelBuilder());


When I then connect the model stream to the newData stream, the map1 function of the
comap never sees the model as not null, as it seems that the map functions are executed
in order: first the map1 function is executed for all the newData elements, then the map2
function is executed for all the model elements.

So how does having or not having timestamps affect the behavior of the connected stream?

How would I handle such a case if the notion of timestamps does not apply for my data?
(i.e. here I'm interested in streaming historical data, I assume their order does not matter)

Reply | Threaded
Open this post in threaded view
|

Re: Understanding connected streams use without timestamps

Aljoscha Krettek
Nope, not right now but this is pretty much what we're trying to solve with side inputs: https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit

On Mon, 21 Nov 2016 at 16:11 Theodore Vasiloudis <[hidden email]> wrote:
Thanks for the clarification Gyula!

In that case, is it possible currently to make one of the two connected streams stall until the other stream has produced at least one output before it starts producing as well?


On Mon, Nov 21, 2016 at 3:16 PM, Gyula Fóra <[hidden email]> wrote:
Hi :)

The execution of the Connected functions (map1/map2 in this case) are not affected by the timestamps. In other words it is pretty much arbitrary which input arrives at the CoMapFunction first.

So I think you did everything correctly.

Gyula

Theodore Vasiloudis <[hidden email]> ezt írta (időpont: 2016. nov. 21., H, 12:07):
Hello all,

I was playing around with the the IncrementalLearningSkeleton example and I had a couple of questions regarding the behavior of connected streams.

In the example the elements are assigned timestamps, and there is a stream, model, that produces
Double[] elements by ingesting and processing a stream of training Integer data points.

DataStream<Double[]> model = trainingData
                .assignTimestampsAndWatermarks(new LinearTimestamp())
                .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
                .apply(new PartialModelBuilder());


The model stream is then connected onto a newData stream which allows us to use the
constantly updated model stream to make predictions for the incoming stream of newData,
by having a model variable shared between the two map functions in the coMap class.
The shared model var is updated every time an element from the model stream arrives (starts
out as null)

DataStream<Integer> prediction = newData.connect(model).map(new Predictor());

My confusion comes when I tried a slightly different approach [2], without using timestamps
or watermarks. In my example I simply create countWindows of say 100 elements,
and I use readTextFile to read in the trainingData and newData :

DataStream<ArrayList<Double>> model = trainingData
        .countWindowAll(100)
        .apply(new PartialModelBuilder());


When I then connect the model stream to the newData stream, the map1 function of the
comap never sees the model as not null, as it seems that the map functions are executed
in order: first the map1 function is executed for all the newData elements, then the map2
function is executed for all the model elements.

So how does having or not having timestamps affect the behavior of the connected stream?

How would I handle such a case if the notion of timestamps does not apply for my data?
(i.e. here I'm interested in streaming historical data, I assume their order does not matter)