Issue with sharing state in CoFlatMapFunction

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Issue with sharing state in CoFlatMapFunction

Vladimir Stoyak
Got stuck a bit with CoFlatMapFunction. It seems to work fine if I place it on the DataStream before window but fails if placed after window's “apply” function.
I was testing two streams, main “Features” on flatMap1 constantly ingesting data and control stream “Model” on flatMap2 changing the model on request.
I am able to set and see b0/b1 properly set in flatMap2, but flatMap1 always see b0 and b1 as was set to 0 at the initialization.
Am I missing something obvious here?
Thanks a lot, Vladimir
public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
    private static final long serialVersionUID = 1L;

    Double b0;
    Double b1;

    public applyModel(){
        b0=0.0;
        b1=0.0;
    }

    @Override
    public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
        System.out.print("Main: " + this + "\n");
    }

    @Override
    public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
        System.out.print("Old Model: " + this + "\n");
        b0 = value.getB0();
        b1 = value.getB1();
        System.out.print("New Model: " + this + "\n");
    }

    @Override
    public String toString(){
        return "CoFlatMapFunction: {b0: " + b0 + ", b1: " + b1 + "}";
    }
}
Reply | Threaded
Open this post in threaded view
|

Re: Issue with sharing state in CoFlatMapFunction

Stephan Ewen
Hi!

Can you give us a bit more context? For example share the structure of the program (what stream get windowed and connected in what way)?

I would guess that the following is the problem:

When you connect one stream to another, then partition n of the first stream connects with partition n of the other stream.
When you do a keyBy().window() then the system reshuffles the data, and the records are in different partitions, meaning that they arrive in other instances of the CoFlatMapFunction.

You can also call keyBy() before both inputs to make sure that the records are properly routed...

Greetings,
Stephan



On Tue, Nov 17, 2015 at 12:29 PM, Vladimir Stoyak <[hidden email]> wrote:
Got stuck a bit with CoFlatMapFunction. It seems to work fine if I place it on the DataStream before window but fails if placed after window's “apply” function.
I was testing two streams, main “Features” on flatMap1 constantly ingesting data and control stream “Model” on flatMap2 changing the model on request.
I am able to set and see b0/b1 properly set in flatMap2, but flatMap1 always see b0 and b1 as was set to 0 at the initialization.
Am I missing something obvious here?
Thanks a lot, Vladimir
public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
    private static final long serialVersionUID = 1L;

    Double b0;
    Double b1;

    public applyModel(){
        b0=0.0;
        b1=0.0;
    }

    @Override
    public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
        System.out.print("Main: " + this + "\n");
    }

    @Override
    public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
        System.out.print("Old Model: " + this + "\n");
        b0 = value.getB0();
        b1 = value.getB1();
        System.out.print("New Model: " + this + "\n");
    }

    @Override
    public String toString(){
        return "CoFlatMapFunction: {b0: " + b0 + ", b1: " + b1 + "}";
    }
}

Reply | Threaded
Open this post in threaded view
|

Re: Issue with sharing state in CoFlatMapFunction

Vladimir Stoyak
My model DataStream is not keyed and does not have any windows, only the main stream has windows and apply function

I have two Kafka Streams, one for events and one for model

DataStream<Model> model_stream = env.addSource(new FlinkKafkaConsumer082<Model>(model_topic, new AvroDeserializationSchema(Model.class), properties));
DataStream<Raw> main_stream = env.addSource(new FlinkKafkaConsumer082<Raw>(raw_topic, new AvroDeserializationSchema(Raw.class), properties));


My topology looks like this:
main_stream
.assignTimestamps(new myTimeExtractor())
.keyBy("event_key")
.window(GlobalWindows.create())
.trigger(new sessionTrigger(session_timeout))
.apply(new AggFunction())
.connect(model_stream)
.flatMap(new applyModel())
.print();

 AggFunction is a simple aggregate function:
Long start_ts=Long.MAX_VALUE;
        Long end_ts=Long.MIN_VALUE;
        Long dwell_time=0L,last_event_ts=0L;
        int size = Lists.newArrayList(values).size();

        for (Raw value: values) {
            if(value.getTs() > end_ts) end_ts = value.getTs();
            if (value.getTs() < start_ts) start_ts = value.getTs();

            if(last_event_ts == 0L){
                last_event_ts = value.getTs();
            } else {
                dwell_time += value.getTs() - last_event_ts;
                last_event_ts = value.getTs();
            }
        }

        out.collect(new Features(tuple.getField(0), tuple.getField(2), tuple.getField(1), start_ts, end_ts, size, dwell_time, Boolean.FALSE)); 



On Tuesday, November 17, 2015 12:59 PM, Stephan Ewen <[hidden email]> wrote:


Hi!

Can you give us a bit more context? For example share the structure of the program (what stream get windowed and connected in what way)?

I would guess that the following is the problem:

When you connect one stream to another, then partition n of the first stream connects with partition n of the other stream.
When you do a keyBy().window() then the system reshuffles the data, and the records are in different partitions, meaning that they arrive in other instances of the CoFlatMapFunction.

You can also call keyBy() before both inputs to make sure that the records are properly routed...

Greetings,
Stephan



On Tue, Nov 17, 2015 at 12:29 PM, Vladimir Stoyak <[hidden email]> wrote:
Got stuck a bit with CoFlatMapFunction. It seems to work fine if I place it on the DataStream before window but fails if placed after window's “apply” function.
I was testing two streams, main “Features” on flatMap1 constantly ingesting data and control stream “Model” on flatMap2 changing the model on request.
I am able to set and see b0/b1 properly set in flatMap2, but flatMap1 always see b0 and b1 as was set to 0 at the initialization.
Am I missing something obvious here?
Thanks a lot, Vladimir
public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
    private static final long serialVersionUID = 1L;

    Double b0;
    Double b1;

    public applyModel(){
        b0=0.0;
        b1=0.0;
    }

    @Override
    public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
        System.out.print("Main: " + this + "\n");
    }

    @Override
    public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
        System.out.print("Old Model: " + this + "\n");
        b0 = value.getB0();
        b1 = value.getB1();
        System.out.print("New Model: " + this + "\n");
    }

    @Override
    public String toString(){
        return "CoFlatMapFunction: {b0: " + b0 + ", b1: " + b1 + "}";
    }
}



Reply | Threaded
Open this post in threaded view
|

Re: Issue with sharing state in CoFlatMapFunction

Stephan Ewen
Is the CoFlatMapFunction intended to be executed in parallel?

If yes, you need some way to deterministically assign which record goes to which parallel instance. In some way the CoFlatMapFunction does a parallel (partitions) join between the model and the result of the session windows, so you need some form of key that selects which partition the elements go to. Does that make sense?

If not, try to set it to parallelism 1 explicitly.

Greetings,
Stephan


On Tue, Nov 17, 2015 at 1:11 PM, Vladimir Stoyak <[hidden email]> wrote:
My model DataStream is not keyed and does not have any windows, only the main stream has windows and apply function

I have two Kafka Streams, one for events and one for model

DataStream<Model> model_stream = env.addSource(new FlinkKafkaConsumer082<Model>(model_topic, new AvroDeserializationSchema(Model.class), properties));
DataStream<Raw> main_stream = env.addSource(new FlinkKafkaConsumer082<Raw>(raw_topic, new AvroDeserializationSchema(Raw.class), properties));


My topology looks like this:
main_stream
.assignTimestamps(new myTimeExtractor())
.keyBy("event_key")
.window(GlobalWindows.create())
.trigger(new sessionTrigger(session_timeout))
.apply(new AggFunction())
.connect(model_stream)
.flatMap(new applyModel())
.print();

 AggFunction is a simple aggregate function:
Long start_ts=Long.MAX_VALUE;
        Long end_ts=Long.MIN_VALUE;
        Long dwell_time=0L,last_event_ts=0L;
        int size = Lists.newArrayList(values).size();

        for (Raw value: values) {
            if(value.getTs() > end_ts) end_ts = value.getTs();
            if (value.getTs() < start_ts) start_ts = value.getTs();

            if(last_event_ts == 0L){
                last_event_ts = value.getTs();
            } else {
                dwell_time += value.getTs() - last_event_ts;
                last_event_ts = value.getTs();
            }
        }

        out.collect(new Features(tuple.getField(0), tuple.getField(2), tuple.getField(1), start_ts, end_ts, size, dwell_time, Boolean.FALSE)); 



On Tuesday, November 17, 2015 12:59 PM, Stephan Ewen <[hidden email]> wrote:


Hi!

Can you give us a bit more context? For example share the structure of the program (what stream get windowed and connected in what way)?

I would guess that the following is the problem:

When you connect one stream to another, then partition n of the first stream connects with partition n of the other stream.
When you do a keyBy().window() then the system reshuffles the data, and the records are in different partitions, meaning that they arrive in other instances of the CoFlatMapFunction.

You can also call keyBy() before both inputs to make sure that the records are properly routed...

Greetings,
Stephan



On Tue, Nov 17, 2015 at 12:29 PM, Vladimir Stoyak <[hidden email]> wrote:
Got stuck a bit with CoFlatMapFunction. It seems to work fine if I place it on the DataStream before window but fails if placed after window's “apply” function.
I was testing two streams, main “Features” on flatMap1 constantly ingesting data and control stream “Model” on flatMap2 changing the model on request.
I am able to set and see b0/b1 properly set in flatMap2, but flatMap1 always see b0 and b1 as was set to 0 at the initialization.
Am I missing something obvious here?
Thanks a lot, Vladimir
public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
    private static final long serialVersionUID = 1L;

    Double b0;
    Double b1;

    public applyModel(){
        b0=0.0;
        b1=0.0;
    }

    @Override
    public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
        System.out.print("Main: " + this + "\n");
    }

    @Override
    public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
        System.out.print("Old Model: " + this + "\n");
        b0 = value.getB0();
        b1 = value.getB1();
        System.out.print("New Model: " + this + "\n");
    }

    @Override
    public String toString(){
        return "CoFlatMapFunction: {b0: " + b0 + ", b1: " + b1 + "}";
    }
}




Reply | Threaded
Open this post in threaded view
|

Re: Issue with sharing state in CoFlatMapFunction

Vladimir Stoyak
Perfect! It does explain my problem.

Thanks a lot



On Tuesday, November 17, 2015 1:43 PM, Stephan Ewen <[hidden email]> wrote:


Is the CoFlatMapFunction intended to be executed in parallel?

If yes, you need some way to deterministically assign which record goes to which parallel instance. In some way the CoFlatMapFunction does a parallel (partitions) join between the model and the result of the session windows, so you need some form of key that selects which partition the elements go to. Does that make sense?

If not, try to set it to parallelism 1 explicitly.

Greetings,
Stephan


On Tue, Nov 17, 2015 at 1:11 PM, Vladimir Stoyak <[hidden email]> wrote:
My model DataStream is not keyed and does not have any windows, only the main stream has windows and apply function

I have two Kafka Streams, one for events and one for model

DataStream<Model> model_stream = env.addSource(new FlinkKafkaConsumer082<Model>(model_topic, new AvroDeserializationSchema(Model.class), properties));
DataStream<Raw> main_stream = env.addSource(new FlinkKafkaConsumer082<Raw>(raw_topic, new AvroDeserializationSchema(Raw.class), properties));


My topology looks like this:
main_stream
.assignTimestamps(new myTimeExtractor())
.keyBy("event_key")
.window(GlobalWindows.create())
.trigger(new sessionTrigger(session_timeout))
.apply(new AggFunction())
.connect(model_stream)
.flatMap(new applyModel())
.print();

 AggFunction is a simple aggregate function:
Long start_ts=Long.MAX_VALUE;
        Long end_ts=Long.MIN_VALUE;
        Long dwell_time=0L,last_event_ts=0L;
        int size = Lists.newArrayList(values).size();

        for (Raw value: values) {
            if(value.getTs() > end_ts) end_ts = value.getTs();
            if (value.getTs() < start_ts) start_ts = value.getTs();

            if(last_event_ts == 0L){
                last_event_ts = value.getTs();
            } else {
                dwell_time += value.getTs() - last_event_ts;
                last_event_ts = value.getTs();
            }
        }

        out.collect(new Features(tuple.getField(0), tuple.getField(2), tuple.getField(1), start_ts, end_ts, size, dwell_time, Boolean.FALSE)); 



On Tuesday, November 17, 2015 12:59 PM, Stephan Ewen <[hidden email]> wrote:


Hi!

Can you give us a bit more context? For example share the structure of the program (what stream get windowed and connected in what way)?

I would guess that the following is the problem:

When you connect one stream to another, then partition n of the first stream connects with partition n of the other stream.
When you do a keyBy().window() then the system reshuffles the data, and the records are in different partitions, meaning that they arrive in other instances of the CoFlatMapFunction.

You can also call keyBy() before both inputs to make sure that the records are properly routed...

Greetings,
Stephan



On Tue, Nov 17, 2015 at 12:29 PM, Vladimir Stoyak <[hidden email]> wrote:
Got stuck a bit with CoFlatMapFunction. It seems to work fine if I place it on the DataStream before window but fails if placed after window's “apply” function.
I was testing two streams, main “Features” on flatMap1 constantly ingesting data and control stream “Model” on flatMap2 changing the model on request.
I am able to set and see b0/b1 properly set in flatMap2, but flatMap1 always see b0 and b1 as was set to 0 at the initialization.
Am I missing something obvious here?
Thanks a lot, Vladimir
public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
    private static final long serialVersionUID = 1L;

    Double b0;
    Double b1;

    public applyModel(){
        b0=0.0;
        b1=0.0;
    }

    @Override
    public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
        System.out.print("Main: " + this + "\n");
    }

    @Override
    public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
        System.out.print("Old Model: " + this + "\n");
        b0 = value.getB0();
        b1 = value.getB1();
        System.out.print("New Model: " + this + "\n");
    }

    @Override
    public String toString(){
        return "CoFlatMapFunction: {b0: " + b0 + ", b1: " + b1 + "}";
    }
}






Reply | Threaded
Open this post in threaded view
|

Re: Issue with sharing state in CoFlatMapFunction

Vladimir Stoyak
Not that I necessarily need that for this particular example, but is there a Global State available? 

IE, how can I make a state available across all parallel instances of an operator?



On Tuesday, November 17, 2015 1:49 PM, Vladimir Stoyak <[hidden email]> wrote:


Perfect! It does explain my problem.

Thanks a lot



On Tuesday, November 17, 2015 1:43 PM, Stephan Ewen <[hidden email]> wrote:


Is the CoFlatMapFunction intended to be executed in parallel?

If yes, you need some way to deterministically assign which record goes to which parallel instance. In some way the CoFlatMapFunction does a parallel (partitions) join between the model and the result of the session windows, so you need some form of key that selects which partition the elements go to. Does that make sense?

If not, try to set it to parallelism 1 explicitly.

Greetings,
Stephan


On Tue, Nov 17, 2015 at 1:11 PM, Vladimir Stoyak <[hidden email]> wrote:
My model DataStream is not keyed and does not have any windows, only the main stream has windows and apply function

I have two Kafka Streams, one for events and one for model

DataStream<Model> model_stream = env.addSource(new FlinkKafkaConsumer082<Model>(model_topic, new AvroDeserializationSchema(Model.class), properties));
DataStream<Raw> main_stream = env.addSource(new FlinkKafkaConsumer082<Raw>(raw_topic, new AvroDeserializationSchema(Raw.class), properties));


My topology looks like this:
main_stream
.assignTimestamps(new myTimeExtractor())
.keyBy("event_key")
.window(GlobalWindows.create())
.trigger(new sessionTrigger(session_timeout))
.apply(new AggFunction())
.connect(model_stream)
.flatMap(new applyModel())
.print();

 AggFunction is a simple aggregate function:
Long start_ts=Long.MAX_VALUE;
        Long end_ts=Long.MIN_VALUE;
        Long dwell_time=0L,last_event_ts=0L;
        int size = Lists.newArrayList(values).size();

        for (Raw value: values) {
            if(value.getTs() > end_ts) end_ts = value.getTs();
            if (value.getTs() < start_ts) start_ts = value.getTs();

            if(last_event_ts == 0L){
                last_event_ts = value.getTs();
            } else {
                dwell_time += value.getTs() - last_event_ts;
                last_event_ts = value.getTs();
            }
        }

        out.collect(new Features(tuple.getField(0), tuple.getField(2), tuple.getField(1), start_ts, end_ts, size, dwell_time, Boolean.FALSE)); 



On Tuesday, November 17, 2015 12:59 PM, Stephan Ewen <[hidden email]> wrote:


Hi!

Can you give us a bit more context? For example share the structure of the program (what stream get windowed and connected in what way)?

I would guess that the following is the problem:

When you connect one stream to another, then partition n of the first stream connects with partition n of the other stream.
When you do a keyBy().window() then the system reshuffles the data, and the records are in different partitions, meaning that they arrive in other instances of the CoFlatMapFunction.

You can also call keyBy() before both inputs to make sure that the records are properly routed...

Greetings,
Stephan



On Tue, Nov 17, 2015 at 12:29 PM, Vladimir Stoyak <[hidden email]> wrote:
Got stuck a bit with CoFlatMapFunction. It seems to work fine if I place it on the DataStream before window but fails if placed after window's “apply” function.
I was testing two streams, main “Features” on flatMap1 constantly ingesting data and control stream “Model” on flatMap2 changing the model on request.
I am able to set and see b0/b1 properly set in flatMap2, but flatMap1 always see b0 and b1 as was set to 0 at the initialization.
Am I missing something obvious here?
Thanks a lot, Vladimir
public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
    private static final long serialVersionUID = 1L;

    Double b0;
    Double b1;

    public applyModel(){
        b0=0.0;
        b1=0.0;
    }

    @Override
    public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
        System.out.print("Main: " + this + "\n");
    }

    @Override
    public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
        System.out.print("Old Model: " + this + "\n");
        b0 = value.getB0();
        b1 = value.getB1();
        System.out.print("New Model: " + this + "\n");
    }

    @Override
    public String toString(){
        return "CoFlatMapFunction: {b0: " + b0 + ", b1: " + b1 + "}";
    }
}








Reply | Threaded
Open this post in threaded view
|

Re: Issue with sharing state in CoFlatMapFunction

Vladimir Stoyak
I know I can use broadcast, but was wondering if there is a better way


DataStream<Model> control_stream = env.addSource(new FlinkKafkaConsumer082<Model>(control_topic, new AvroDeserializationSchema(Model.class), properties)).broadcast();




On Tuesday, November 17, 2015 2:45 PM, Vladimir Stoyak <[hidden email]> wrote:



Not that I necessarily need that for this particular example, but is there a Global State available?

IE, how can I make a state available across all parallel instances of an operator?



On Tuesday, November 17, 2015 1:49 PM, Vladimir Stoyak <[hidden email]> wrote:



Perfect! It does explain my problem.

Thanks a lot



On Tuesday, November 17, 2015 1:43 PM, Stephan Ewen <[hidden email]> wrote:



Is the CoFlatMapFunction intended to be executed in parallel?

If yes, you need some way to deterministically assign which record goes to which parallel instance. In some way the CoFlatMapFunction does a parallel (partitions) join between the model and the result of the session windows, so you need some form of key that selects which partition the elements go to. Does that make sense?

If not, try to set it to parallelism 1 explicitly.

Greetings,
Stephan



On Tue, Nov 17, 2015 at 1:11 PM, Vladimir Stoyak <[hidden email]> wrote:

My model DataStream is not keyed and does not have any windows, only the main stream has windows and apply function

>
>
>I have two Kafka Streams, one for events and one for model
>
>
>DataStream<Model> model_stream = env.addSource(new FlinkKafkaConsumer082<Model>(model_topic, new AvroDeserializationSchema(Model.class), properties));
>DataStream<Raw> main_stream = env.addSource(new FlinkKafkaConsumer082<Raw>(raw_topic, new AvroDeserializationSchema(Raw.class), properties));
>
>
>
>
>
>My topology looks like this:
>main_stream
>.assignTimestamps(new myTimeExtractor())
>.keyBy("event_key")
>.window(GlobalWindows.create())
>.trigger(new sessionTrigger(session_timeout))
>.apply(new AggFunction())
>.connect(model_stream)
>.flatMap(new applyModel())
>.print();
>
>
> AggFunction is a simple aggregate function:
>Long start_ts=Long.MAX_VALUE;
>        Long end_ts=Long.MIN_VALUE;
>        Long dwell_time=0L,last_event_ts=0L;
>        int size = Lists.newArrayList(values).size();
>
>
>        for (Raw value: values) {
>            if(value.getTs() > end_ts) end_ts = value.getTs();
>            if (value.getTs() < start_ts) start_ts = value.getTs();
>
>
>            if(last_event_ts == 0L){
>                last_event_ts = value.getTs();
>            } else {
>                dwell_time += value.getTs() - last_event_ts;
>                last_event_ts = value.getTs();
>            }
>        }
>
>
>        out.collect(new Features(tuple.getField(0), tuple.getField(2), tuple.getField(1), start_ts, end_ts, size, dwell_time, Boolean.FALSE));
>
>
>
>
>
>On Tuesday, November 17, 2015 12:59 PM, Stephan Ewen <[hidden email]> wrote:
>
>
>
>Hi!
>
>
>Can you give us a bit more context? For example share the structure of the program (what stream get windowed and connected in what way)?
>
>
>I would guess that the following is the problem:
>
>
>When you connect one stream to another, then partition n of the first stream connects with partition n of the other stream.
>When you do a keyBy().window() then the system reshuffles the data, and the records are in different partitions, meaning that they arrive in other instances of the CoFlatMapFunction.
>
>
>You can also call keyBy() before both inputs to make sure that the records are properly routed...
>
>
>Greetings,
>Stephan
>
>
>
>
>
>
>On Tue, Nov 17, 2015 at 12:29 PM, Vladimir Stoyak <[hidden email]> wrote:
>
>Got stuck a bit with CoFlatMapFunction. It seems to work fine if I place it on the DataStream before window but fails if placed after window's “apply” function.
>>I was testing two streams, main “Features” on flatMap1 constantly ingesting data and control stream “Model” on flatMap2 changing the model on request.
>>I am able to set and see b0/b1 properly set in flatMap2, but flatMap1 always see b0 and b1 as was set to 0 at the initialization.
>>Am I missing something obvious here?
>>Thanks a lot, Vladimir
>>publicstaticclassapplyModel implementsCoFlatMapFunction<Features,Model,EnrichedFeatures>{privatestaticfinallongserialVersionUID =1L;Doubleb0;Doubleb1;publicapplyModel(){b0=0.0;b1=0.0;}@OverridepublicvoidflatMap1(Featuresvalue,Collector<EnrichedFeatures>out){System.out.print("Main: "+this+"\n");}@OverridepublicvoidflatMap2(Modelvalue,Collector<EnrichedFeatures>out){System.out.print("Old Model: "+this+"\n");b0 =value.getB0();b1 =value.getB1();System.out.print("New Model: "+this+"\n");}@OverridepublicStringtoString(){return"CoFlatMapFunction: {b0: "+b0 +", b1: "+b1 +"}";}}
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Issue with sharing state in CoFlatMapFunction

Stephan Ewen
In reply to this post by Vladimir Stoyak
A global state that all can access read-only is doable via broadcast().

A global state that is available to all for read and update is currently not available. Consistent operations on that would be quite costly, require some form of distributed communication/consensus.

Instead, I would encourage you to go with the following:

1) If you can partition the state, use a keyBy().mapWithState() - That localizes state operations and makes it very fast.

2) If your state is not organized by key, your state is probably very small, and you may be able to use a non-parallel operation.

3) If some operation updates the state and another one accesses it, you can often implement that with iterations and a CoFlatMapFunction (one side is the original input, the other the feedback input).

All approaches in the end localize state access and modifications, which is a good pattern to follow, if possible.

Greetings,
Stephan



On Tue, Nov 17, 2015 at 2:44 PM, Vladimir Stoyak <[hidden email]> wrote:
Not that I necessarily need that for this particular example, but is there a Global State available? 

IE, how can I make a state available across all parallel instances of an operator?



On Tuesday, November 17, 2015 1:49 PM, Vladimir Stoyak <[hidden email]> wrote:


Perfect! It does explain my problem.

Thanks a lot



On Tuesday, November 17, 2015 1:43 PM, Stephan Ewen <[hidden email]> wrote:


Is the CoFlatMapFunction intended to be executed in parallel?

If yes, you need some way to deterministically assign which record goes to which parallel instance. In some way the CoFlatMapFunction does a parallel (partitions) join between the model and the result of the session windows, so you need some form of key that selects which partition the elements go to. Does that make sense?

If not, try to set it to parallelism 1 explicitly.

Greetings,
Stephan


On Tue, Nov 17, 2015 at 1:11 PM, Vladimir Stoyak <[hidden email]> wrote:
My model DataStream is not keyed and does not have any windows, only the main stream has windows and apply function

I have two Kafka Streams, one for events and one for model

DataStream<Model> model_stream = env.addSource(new FlinkKafkaConsumer082<Model>(model_topic, new AvroDeserializationSchema(Model.class), properties));
DataStream<Raw> main_stream = env.addSource(new FlinkKafkaConsumer082<Raw>(raw_topic, new AvroDeserializationSchema(Raw.class), properties));


My topology looks like this:
main_stream
.assignTimestamps(new myTimeExtractor())
.keyBy("event_key")
.window(GlobalWindows.create())
.trigger(new sessionTrigger(session_timeout))
.apply(new AggFunction())
.connect(model_stream)
.flatMap(new applyModel())
.print();

 AggFunction is a simple aggregate function:
Long start_ts=Long.MAX_VALUE;
        Long end_ts=Long.MIN_VALUE;
        Long dwell_time=0L,last_event_ts=0L;
        int size = Lists.newArrayList(values).size();

        for (Raw value: values) {
            if(value.getTs() > end_ts) end_ts = value.getTs();
            if (value.getTs() < start_ts) start_ts = value.getTs();

            if(last_event_ts == 0L){
                last_event_ts = value.getTs();
            } else {
                dwell_time += value.getTs() - last_event_ts;
                last_event_ts = value.getTs();
            }
        }

        out.collect(new Features(tuple.getField(0), tuple.getField(2), tuple.getField(1), start_ts, end_ts, size, dwell_time, Boolean.FALSE)); 



On Tuesday, November 17, 2015 12:59 PM, Stephan Ewen <[hidden email]> wrote:


Hi!

Can you give us a bit more context? For example share the structure of the program (what stream get windowed and connected in what way)?

I would guess that the following is the problem:

When you connect one stream to another, then partition n of the first stream connects with partition n of the other stream.
When you do a keyBy().window() then the system reshuffles the data, and the records are in different partitions, meaning that they arrive in other instances of the CoFlatMapFunction.

You can also call keyBy() before both inputs to make sure that the records are properly routed...

Greetings,
Stephan



On Tue, Nov 17, 2015 at 12:29 PM, Vladimir Stoyak <[hidden email]> wrote:
Got stuck a bit with CoFlatMapFunction. It seems to work fine if I place it on the DataStream before window but fails if placed after window's “apply” function.
I was testing two streams, main “Features” on flatMap1 constantly ingesting data and control stream “Model” on flatMap2 changing the model on request.
I am able to set and see b0/b1 properly set in flatMap2, but flatMap1 always see b0 and b1 as was set to 0 at the initialization.
Am I missing something obvious here?
Thanks a lot, Vladimir
public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
    private static final long serialVersionUID = 1L;

    Double b0;
    Double b1;

    public applyModel(){
        b0=0.0;
        b1=0.0;
    }

    @Override
    public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
        System.out.print("Main: " + this + "\n");
    }

    @Override
    public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
        System.out.print("Old Model: " + this + "\n");
        b0 = value.getB0();
        b1 = value.getB1();
        System.out.print("New Model: " + this + "\n");
    }

    @Override
    public String toString(){
        return "CoFlatMapFunction: {b0: " + b0 + ", b1: " + b1 + "}";
    }
}









Reply | Threaded
Open this post in threaded view
|

Re: Issue with sharing state in CoFlatMapFunction

Anwar Rizal
Broadcast is what we do for the same type of your initial problem indeed.

In another thread, Stephan mentioned a possibility of using OperatorState in ConnectedStream. I think this approach using OperatorState does the business as well.

In my understanding, the approach using broadcast will require you to checkpoint somewhere upstream. I'm not sure if OperatorState on ConnectedStream will be a solution on this though.

On Tue, Nov 17, 2015 at 2:55 PM, Stephan Ewen <[hidden email]> wrote:
A global state that all can access read-only is doable via broadcast().

A global state that is available to all for read and update is currently not available. Consistent operations on that would be quite costly, require some form of distributed communication/consensus.

Instead, I would encourage you to go with the following:

1) If you can partition the state, use a keyBy().mapWithState() - That localizes state operations and makes it very fast.

2) If your state is not organized by key, your state is probably very small, and you may be able to use a non-parallel operation.

3) If some operation updates the state and another one accesses it, you can often implement that with iterations and a CoFlatMapFunction (one side is the original input, the other the feedback input).

All approaches in the end localize state access and modifications, which is a good pattern to follow, if possible.

Greetings,
Stephan



On Tue, Nov 17, 2015 at 2:44 PM, Vladimir Stoyak <[hidden email]> wrote:
Not that I necessarily need that for this particular example, but is there a Global State available? 

IE, how can I make a state available across all parallel instances of an operator?



On Tuesday, November 17, 2015 1:49 PM, Vladimir Stoyak <[hidden email]> wrote:


Perfect! It does explain my problem.

Thanks a lot



On Tuesday, November 17, 2015 1:43 PM, Stephan Ewen <[hidden email]> wrote:


Is the CoFlatMapFunction intended to be executed in parallel?

If yes, you need some way to deterministically assign which record goes to which parallel instance. In some way the CoFlatMapFunction does a parallel (partitions) join between the model and the result of the session windows, so you need some form of key that selects which partition the elements go to. Does that make sense?

If not, try to set it to parallelism 1 explicitly.

Greetings,
Stephan


On Tue, Nov 17, 2015 at 1:11 PM, Vladimir Stoyak <[hidden email]> wrote:
My model DataStream is not keyed and does not have any windows, only the main stream has windows and apply function

I have two Kafka Streams, one for events and one for model

DataStream<Model> model_stream = env.addSource(new FlinkKafkaConsumer082<Model>(model_topic, new AvroDeserializationSchema(Model.class), properties));
DataStream<Raw> main_stream = env.addSource(new FlinkKafkaConsumer082<Raw>(raw_topic, new AvroDeserializationSchema(Raw.class), properties));


My topology looks like this:
main_stream
.assignTimestamps(new myTimeExtractor())
.keyBy("event_key")
.window(GlobalWindows.create())
.trigger(new sessionTrigger(session_timeout))
.apply(new AggFunction())
.connect(model_stream)
.flatMap(new applyModel())
.print();

 AggFunction is a simple aggregate function:
Long start_ts=Long.MAX_VALUE;
        Long end_ts=Long.MIN_VALUE;
        Long dwell_time=0L,last_event_ts=0L;
        int size = Lists.newArrayList(values).size();

        for (Raw value: values) {
            if(value.getTs() > end_ts) end_ts = value.getTs();
            if (value.getTs() < start_ts) start_ts = value.getTs();

            if(last_event_ts == 0L){
                last_event_ts = value.getTs();
            } else {
                dwell_time += value.getTs() - last_event_ts;
                last_event_ts = value.getTs();
            }
        }

        out.collect(new Features(tuple.getField(0), tuple.getField(2), tuple.getField(1), start_ts, end_ts, size, dwell_time, Boolean.FALSE)); 



On Tuesday, November 17, 2015 12:59 PM, Stephan Ewen <[hidden email]> wrote:


Hi!

Can you give us a bit more context? For example share the structure of the program (what stream get windowed and connected in what way)?

I would guess that the following is the problem:

When you connect one stream to another, then partition n of the first stream connects with partition n of the other stream.
When you do a keyBy().window() then the system reshuffles the data, and the records are in different partitions, meaning that they arrive in other instances of the CoFlatMapFunction.

You can also call keyBy() before both inputs to make sure that the records are properly routed...

Greetings,
Stephan



On Tue, Nov 17, 2015 at 12:29 PM, Vladimir Stoyak <[hidden email]> wrote:
Got stuck a bit with CoFlatMapFunction. It seems to work fine if I place it on the DataStream before window but fails if placed after window's “apply” function.
I was testing two streams, main “Features” on flatMap1 constantly ingesting data and control stream “Model” on flatMap2 changing the model on request.
I am able to set and see b0/b1 properly set in flatMap2, but flatMap1 always see b0 and b1 as was set to 0 at the initialization.
Am I missing something obvious here?
Thanks a lot, Vladimir
public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
    private static final long serialVersionUID = 1L;

    Double b0;
    Double b1;

    public applyModel(){
        b0=0.0;
        b1=0.0;
    }

    @Override
    public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
        System.out.print("Main: " + this + "\n");
    }

    @Override
    public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
        System.out.print("Old Model: " + this + "\n");
        b0 = value.getB0();
        b1 = value.getB1();
        System.out.print("New Model: " + this + "\n");
    }

    @Override
    public String toString(){
        return "CoFlatMapFunction: {b0: " + b0 + ", b1: " + b1 + "}";
    }
}