Broadcast Config through Connected Stream

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Broadcast Config through Connected Stream

Navneeth Krishnan
Hi All,

I looked into an earlier email about the topic broadcast config through connected stream and I couldn't find the conclusion.

I can't do the below approach since I need the config to be published to all operator instances but I need keyed state for external querying.

streamToBeConfigured.connect(configMessageStream)
.keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
.flatMap(new FunctionWithConfigurableState())
.addSink(...);

One of the resolution I found in that mail chain was below. I can use this to solve my issue but is this still the recommended approach?

stream1.connect(stream2)
            .map(new MergeStreamsMapFunction()) // Holds transient state of the last ConfigMessage and maps Stream1's data to a Tuple2<Stream1Data, ConfigMessage>
            .keyBy(new SomeIdKeySelector())         // KeyBy Id to allow for ValueStateDescriptors and semantically correct partitioning according to business logic
            .flatMap(new StatefulFlatMapFunction()) // Save latest received ConfigMessage-Value in ValueStateDescriptor here
            .addSink(...);

Thanks,
Navneeth
Reply | Threaded
Open this post in threaded view
|

Re: Broadcast Config through Connected Stream

Navneeth Krishnan
Hi All,

Any suggestions on this would really help. 

Thanks.

On Tue, Sep 5, 2017 at 2:42 PM, Navneeth Krishnan <[hidden email]> wrote:
Hi All,

I looked into an earlier email about the topic broadcast config through connected stream and I couldn't find the conclusion.

I can't do the below approach since I need the config to be published to all operator instances but I need keyed state for external querying.

streamToBeConfigured.connect(configMessageStream)
.keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
.flatMap(new FunctionWithConfigurableState())
.addSink(...);

One of the resolution I found in that mail chain was below. I can use this to solve my issue but is this still the recommended approach?

stream1.connect(stream2)
            .map(new MergeStreamsMapFunction()) // Holds transient state of the last ConfigMessage and maps Stream1's data to a Tuple2<Stream1Data, ConfigMessage>
            .keyBy(new SomeIdKeySelector())         // KeyBy Id to allow for ValueStateDescriptors and semantically correct partitioning according to business logic
            .flatMap(new StatefulFlatMapFunction()) // Save latest received ConfigMessage-Value in ValueStateDescriptor here
            .addSink(...);

Thanks,
Navneeth

Reply | Threaded
Open this post in threaded view
|

Re: Broadcast Config through Connected Stream

Navneeth Krishnan
Hi,

Any suggestions on this could be achieved?

Thanks

On Thu, Sep 7, 2017 at 8:02 AM, Navneeth Krishnan <[hidden email]> wrote:
Hi All,

Any suggestions on this would really help. 

Thanks.

On Tue, Sep 5, 2017 at 2:42 PM, Navneeth Krishnan <[hidden email]> wrote:
Hi All,

I looked into an earlier email about the topic broadcast config through connected stream and I couldn't find the conclusion.

I can't do the below approach since I need the config to be published to all operator instances but I need keyed state for external querying.

streamToBeConfigured.connect(configMessageStream)
.keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
.flatMap(new FunctionWithConfigurableState())
.addSink(...);

One of the resolution I found in that mail chain was below. I can use this to solve my issue but is this still the recommended approach?

stream1.connect(stream2)
            .map(new MergeStreamsMapFunction()) // Holds transient state of the last ConfigMessage and maps Stream1's data to a Tuple2<Stream1Data, ConfigMessage>
            .keyBy(new SomeIdKeySelector())         // KeyBy Id to allow for ValueStateDescriptors and semantically correct partitioning according to business logic
            .flatMap(new StatefulFlatMapFunction()) // Save latest received ConfigMessage-Value in ValueStateDescriptor here
            .addSink(...);

Thanks,
Navneeth


Reply | Threaded
Open this post in threaded view
|

Re: Broadcast Config through Connected Stream

Aljoscha Krettek
Hi,

I think this is a valid approach, you can even use "operator state" in your map function to make the broadcast config state stateful.

Another approach would be to use internal APIs to hack an operator that has a keyed stream on one input and a broadcast stream on the second input. You can see that approach in action in the Beam Flink Runner [1] but I would strongly recommend against doing that because it is using internal APIs and if the other approach works for you I would stay with that.

Best,
Aljoscha


On 15. Sep 2017, at 07:04, Navneeth Krishnan <[hidden email]> wrote:

Hi,

Any suggestions on this could be achieved?

Thanks

On Thu, Sep 7, 2017 at 8:02 AM, Navneeth Krishnan <[hidden email]> wrote:
Hi All,

Any suggestions on this would really help. 

Thanks.

On Tue, Sep 5, 2017 at 2:42 PM, Navneeth Krishnan <[hidden email]> wrote:
Hi All,

I looked into an earlier email about the topic broadcast config through connected stream and I couldn't find the conclusion.

I can't do the below approach since I need the config to be published to all operator instances but I need keyed state for external querying.

streamToBeConfigured.connect(configMessageStream)
.keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
.flatMap(new FunctionWithConfigurableState())
.addSink(...);

One of the resolution I found in that mail chain was below. I can use this to solve my issue but is this still the recommended approach?

stream1.connect(stream2)
            .map(new MergeStreamsMapFunction()) // Holds transient state of the last ConfigMessage and maps Stream1's data to a Tuple2<Stream1Data, ConfigMessage>
            .keyBy(new SomeIdKeySelector())         // KeyBy Id to allow for ValueStateDescriptors and semantically correct partitioning according to business logic
            .flatMap(new StatefulFlatMapFunction()) // Save latest received ConfigMessage-Value in ValueStateDescriptor here
            .addSink(...);

Thanks,
Navneeth



Reply | Threaded
Open this post in threaded view
|

Re: Broadcast Config through Connected Stream

Navneeth Krishnan
Thanks a lot Aljoscha. That helps.

On Mon, Sep 25, 2017 at 4:47 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

I think this is a valid approach, you can even use "operator state" in your map function to make the broadcast config state stateful.

Another approach would be to use internal APIs to hack an operator that has a keyed stream on one input and a broadcast stream on the second input. You can see that approach in action in the Beam Flink Runner [1] but I would strongly recommend against doing that because it is using internal APIs and if the other approach works for you I would stay with that.

Best,
Aljoscha


On 15. Sep 2017, at 07:04, Navneeth Krishnan <[hidden email]> wrote:

Hi,

Any suggestions on this could be achieved?

Thanks

On Thu, Sep 7, 2017 at 8:02 AM, Navneeth Krishnan <[hidden email]> wrote:
Hi All,

Any suggestions on this would really help. 

Thanks.

On Tue, Sep 5, 2017 at 2:42 PM, Navneeth Krishnan <[hidden email]> wrote:
Hi All,

I looked into an earlier email about the topic broadcast config through connected stream and I couldn't find the conclusion.

I can't do the below approach since I need the config to be published to all operator instances but I need keyed state for external querying.

streamToBeConfigured.connect(configMessageStream)
.keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
.flatMap(new FunctionWithConfigurableState())
.addSink(...);

One of the resolution I found in that mail chain was below. I can use this to solve my issue but is this still the recommended approach?

stream1.connect(stream2)
            .map(new MergeStreamsMapFunction()) // Holds transient state of the last ConfigMessage and maps Stream1's data to a Tuple2<Stream1Data, ConfigMessage>
            .keyBy(new SomeIdKeySelector())         // KeyBy Id to allow for ValueStateDescriptors and semantically correct partitioning according to business logic
            .flatMap(new StatefulFlatMapFunction()) // Save latest received ConfigMessage-Value in ValueStateDescriptor here
            .addSink(...);

Thanks,
Navneeth