Dear community, I have a use-case where sources are keyed. For example, there is a source function with parallelism 10, and each instance has its own key. I used reinterpretAsKeyedStream to convert source DataStream to KeyedStream, however, I get an IllegalArgument exception. Is reinterpretAsKeyedStream can be used with source operators as well, or should the operator to be used be already partitioned (by keyby(..)) ? Thanks, Adrienne |
Hi Adrienne, I think you should be able to reinterpretAsKeyedStream by passing in a DataStreamSource based on the ITCase example [1]. Can you share the full code/error logs or the IAE? -- Rong On Fri, Mar 29, 2019 at 6:09 AM Adrienne Kole <[hidden email]> wrote:
|
Hi Adrienne, you can only use DataStream#reinterpretAsKeyedStream on a stream, which has previously been keyed/partitioned by Flink with exactly the same KeySelector as given to reinterpretAsKeyedStream. It does not work with a key-partitioned stream, which has been partitioned by any other process. Best, Konstantin On Fri, Mar 29, 2019 at 11:47 PM Rong Rong <[hidden email]> wrote:
-- Konstantin Knauf | Solutions Architect +49 160 91394525 Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Data Artisans GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen |
Thanks a lot for the replies. Below I paste my code: DataStreamSource<Tuple> source = env.addSource(new MySource()); KeyedStream<Tuple, Integer> keyedStream = DataStreamUtils.reinterpretAsKeyedStream(source, new DummyKeySelector(), TypeInformation.of(Integer.class) ); keyedStream.timeWindow(Time.seconds(1)).apply(new WindowFunction<Tuple, Object, Integer, TimeWindow>() { @Override public void apply(Integer integer, TimeWindow timeWindow, Iterable<Tuple> iterable, Collector<Object> collector) throws Exception { collector.collect(1); } }); env.execute("Test"); static class DummyKeySelector implements KeySelector<Tuple, Integer> { @Override public Integer getKey(Tuple value) throws Exception { return value.getSourceID(); } } static class MySource extends RichParallelSourceFunction<Tuple> { public MySource() { this.sourceID = sourceID; } @Override public void open(Configuration parameters) throws Exception { sourceID = sourceID + getRuntimeContext().getIndexOfThisSubtask(); } @Override public void run(SourceContext<Tuple> ctx) throws Exception { while (true) { Tuple tuple = new Tuple(sourceID); ctx.collect(tuple); } } @Override public void cancel() { } } Whatever I do, I get Caused by: java.lang.IllegalArgumentException at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) When I check the details from the source code, it seems that some keys are not within allowed key range, that is why Flink throws an exception. In this case, as Konstantin said, it is not possible to interpret source as keyed. Please correct me if I am wrong. Thanks, Adrienne On Wed, Apr 3, 2019 at 8:08 PM Konstantin Knauf <[hidden email]> wrote:
|
Hi, Konstantin is right. reinterpreteAsKeyedStream only works if you call it on a DataStream that was keyBy'ed before (with the same parallelism). Flink cannot reuse the partioning of another system like Kafka. Best, Fabian Adrienne Kole <[hidden email]> schrieb am Do., 4. Apr. 2019, 14:33:
|
Free forum by Nabble | Edit this page |