Hello Everybody,
I'm currently trying to change the state of a CoFlatMapFunction with the help of a connected configuration-stream. The code looks something like this. streamToBeConfigured.connect(configMessageStream) .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector) .flatMap(new FunctionWithConfigurableState()) .addSink(...); The Stream with the actual functionality is keyedBy an Id but the ConfigMessages don't contain any Id to key them by. They are just "key=value"-Strings that should be broadcasted to all instances of the CoFlatMapFunction() regardless of what Id they are keyed by. Is there any way to do that? Best Regards, Julian |
Does the following work?
stream1.keyBy().connect(stream2.broadcast()) On Wed, Oct 26, 2016 at 2:01 PM, Julian Bauß <[hidden email]> wrote: > Hello Everybody, > > I'm currently trying to change the state of a CoFlatMapFunction with the > help of a connected configuration-stream. The code looks something like > this. > > streamToBeConfigured.connect(configMessageStream) > .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector) > .flatMap(new FunctionWithConfigurableState()) > .addSink(...); > > The Stream with the actual functionality is keyedBy an Id but the > ConfigMessages don't contain any Id to key them by. They are just > "key=value"-Strings that should be broadcasted to all instances of the > CoFlatMapFunction() regardless of what Id they are keyed by. > > Is there any way to do that? > > Best Regards, > > Julian |
Hi Ufuk, Thanks for your response. Unfortunately that does not work. Having ValueStateDescriptors in the CoFlatMap on the connected Stream requires a keyBy on the connected Stream. Another solution I can think of would be this: stream1.connect(stream2) .map(new MergeStreamsMapFunction()) // Holds transient state of the last ConfigMessage and maps Stream1's data to a Tuple2<Stream1Data, ConfigMessage> .keyBy(new SomeIdKeySelector()) // KeyBy Id to allow for ValueStateDescriptors and semantically correct partitioning according to business logic .flatMap(new StatefulFlatMapFunction()) // Save latest received ConfigMessage-Value in ValueStateDescriptor here .addSink(...); I have yet to test this. This seems a little complicated but it might work? Best Regards, Julian 2016-10-26 16:09 GMT+02:00 Ufuk Celebi <[hidden email]>: Does the following work? |
Hi Julian, I think it's currently not possible to do that in a fault-tolerant way. (The problem is that the state that results from the broadcast input also needs to be checkpointed, which is not possible right now.) A while back, I created an issue for that: https://issues.apache.org/jira/browse/FLINK-3659. I'm hoping we can still get this in in some form for Flink 1.2. Cheers, Aljoscha On Thu, 27 Oct 2016 at 10:57 Julian Bauß <[hidden email]> wrote:
|
Hi,
I agree with Aljoscha that needs to be solved properly, but it is technically possible to do it now as well (he actually had a PR a while back doing this.) You need to manually change how the transform method works on the connected stream to allow setting the key only one input. You need to use some reflection magic though to create the output operator if you dont want to recompile your own Flink version but it's definitely doable. (I use this technique in several of my production jobs) As for fault-tolerance you need to make sure to checkpoint the broadcasted state using the Checkpointed interface. Cheers, Gyula Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2016. okt. 27., Cs, 12:07):
|
Hi, yes it can be done, in fact I have code like this in the Beam-on-Flink runner: // we have to manually contruct the two-input transform because we're not // allowed to have only one input keyed, normally. TwoInputTransformation< WindowedValue<SingletonKeyedWorkItem<K, InputT>>, RawUnionValue, WindowedValue<KV<K, OutputT>>> rawFlinkTransform = new TwoInputTransformation<>( keyedWorkItemStream.getTransformation(), transformSideInputs.f1.broadcast().getTransformation(), transform.getName(), (TwoInputStreamOperator) doFnOperator, outputTypeInfo, keyedWorkItemStream.getParallelism()); rawFlinkTransform.setStateKeyType(keyedWorkItemStream.getKeyType()); rawFlinkTransform.setStateKeySelectors(keyedWorkItemStream.getKeySelector(), null); @SuppressWarnings({ "unchecked", "rawtypes" }) SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream = new SingleOutputStreamOperator( keyedWorkItemStream.getExecutionEnvironment(), rawFlinkTransform) {}; // we have to cheat around the ctor being protected keyedWorkItemStream.getExecutionEnvironment().addOperator(rawFlinkTransform); where I manually create a TwoInputTransformation and a SIngleOutputStreamOperator. I would absolutely advise against doing that, however. Using the Checkpointed interface works but will lead to a program that cannot be rescaled, i.e. the parallelism cannot be changed once we have that feature in Flink 1.2. Cheers, Aljoscha On Thu, 27 Oct 2016 at 12:15 Gyula Fóra <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |