Source reinterpretAsKeyedStream

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Source reinterpretAsKeyedStream

Adrienne Kole
Dear community,

I have a use-case where sources are keyed.
For example, there is a source function with parallelism 10, and each instance has its own key.
I used reinterpretAsKeyedStream to convert source DataStream to KeyedStream, however, I get an IllegalArgument exception.
Is reinterpretAsKeyedStream can be used with source operators as well, or should the operator to be used be already partitioned (by keyby(..)) ?

Thanks,
Adrienne
Reply | Threaded
Open this post in threaded view
|

Re: Source reinterpretAsKeyedStream

Rong Rong
Hi Adrienne,

I think you should be able to reinterpretAsKeyedStream by passing in a DataStreamSource based on the ITCase example [1]. 
Can you share the full code/error logs or the IAE?

--
Rong


On Fri, Mar 29, 2019 at 6:09 AM Adrienne Kole <[hidden email]> wrote:
Dear community,

I have a use-case where sources are keyed.
For example, there is a source function with parallelism 10, and each instance has its own key.
I used reinterpretAsKeyedStream to convert source DataStream to KeyedStream, however, I get an IllegalArgument exception.
Is reinterpretAsKeyedStream can be used with source operators as well, or should the operator to be used be already partitioned (by keyby(..)) ?

Thanks,
Adrienne
Reply | Threaded
Open this post in threaded view
|

Re: Source reinterpretAsKeyedStream

Konstantin Knauf-2
Hi Adrienne,

you can only use DataStream#reinterpretAsKeyedStream on a stream, which has previously been keyed/partitioned by Flink with exactly the same KeySelector as given to reinterpretAsKeyedStream. It does not work with a key-partitioned stream, which has been partitioned by any other process. 

Best,

Konstantin

On Fri, Mar 29, 2019 at 11:47 PM Rong Rong <[hidden email]> wrote:
Hi Adrienne,

I think you should be able to reinterpretAsKeyedStream by passing in a DataStreamSource based on the ITCase example [1]. 
Can you share the full code/error logs or the IAE?

--
Rong


On Fri, Mar 29, 2019 at 6:09 AM Adrienne Kole <[hidden email]> wrote:
Dear community,

I have a use-case where sources are keyed.
For example, there is a source function with parallelism 10, and each instance has its own key.
I used reinterpretAsKeyedStream to convert source DataStream to KeyedStream, however, I get an IllegalArgument exception.
Is reinterpretAsKeyedStream can be used with source operators as well, or should the operator to be used be already partitioned (by keyby(..)) ?

Thanks,
Adrienne


--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   
Reply | Threaded
Open this post in threaded view
|

Re: Source reinterpretAsKeyedStream

Adrienne Kole
Thanks a lot for the replies.

Below I paste my code:


        DataStreamSource<Tuple> source = env.addSource(new MySource());
        KeyedStream<Tuple, Integer> keyedStream =  DataStreamUtils.reinterpretAsKeyedStream(source, new DummyKeySelector(), TypeInformation.of(Integer.class) );
        keyedStream.timeWindow(Time.seconds(1)).apply(new WindowFunction<Tuple, Object, Integer, TimeWindow>() {
            @Override
            public void apply(Integer integer, TimeWindow timeWindow, Iterable<Tuple> iterable, Collector<Object> collector) throws Exception {
                collector.collect(1);
            }
        });
        env.execute("Test");

    static class DummyKeySelector implements KeySelector<Tuple, Integer> {

        @Override
        public Integer getKey(Tuple value) throws Exception {
            return value.getSourceID();
        }
    }

    static class MySource extends RichParallelSourceFunction<Tuple> {
        public MySource() {
            this.sourceID = sourceID;
        }
        @Override
        public void open(Configuration parameters) throws Exception {
            sourceID = sourceID + getRuntimeContext().getIndexOfThisSubtask();
        }

        @Override
        public void run(SourceContext<Tuple> ctx) throws Exception {
            while (true) {
                Tuple tuple = new Tuple(sourceID);
                ctx.collect(tuple);
            }
        }

        @Override
        public void cancel() {

        }
    }


Whatever I do, I get
Caused by: java.lang.IllegalArgumentException
    at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)

When I check the details from the source code, it seems that some keys are not within allowed key range, that is why Flink throws an exception.
In this case, as Konstantin said, it is not possible to interpret source as keyed.
Please correct me if I am wrong.


Thanks,
Adrienne







On Wed, Apr 3, 2019 at 8:08 PM Konstantin Knauf <[hidden email]> wrote:
Hi Adrienne,

you can only use DataStream#reinterpretAsKeyedStream on a stream, which has previously been keyed/partitioned by Flink with exactly the same KeySelector as given to reinterpretAsKeyedStream. It does not work with a key-partitioned stream, which has been partitioned by any other process. 

Best,

Konstantin

On Fri, Mar 29, 2019 at 11:47 PM Rong Rong <[hidden email]> wrote:
Hi Adrienne,

I think you should be able to reinterpretAsKeyedStream by passing in a DataStreamSource based on the ITCase example [1]. 
Can you share the full code/error logs or the IAE?

--
Rong


On Fri, Mar 29, 2019 at 6:09 AM Adrienne Kole <[hidden email]> wrote:
Dear community,

I have a use-case where sources are keyed.
For example, there is a source function with parallelism 10, and each instance has its own key.
I used reinterpretAsKeyedStream to convert source DataStream to KeyedStream, however, I get an IllegalArgument exception.
Is reinterpretAsKeyedStream can be used with source operators as well, or should the operator to be used be already partitioned (by keyby(..)) ?

Thanks,
Adrienne


--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   
Reply | Threaded
Open this post in threaded view
|

Re: Source reinterpretAsKeyedStream

Fabian Hueske-2
Hi,

Konstantin is right. 
reinterpreteAsKeyedStream only works if you call it on a DataStream that was keyBy'ed before (with the same parallelism).
Flink cannot reuse the partioning of another system like Kafka. 

Best, Fabian 


Adrienne Kole <[hidden email]> schrieb am Do., 4. Apr. 2019, 14:33:
Thanks a lot for the replies.

Below I paste my code:


        DataStreamSource<Tuple> source = env.addSource(new MySource());
        KeyedStream<Tuple, Integer> keyedStream =  DataStreamUtils.reinterpretAsKeyedStream(source, new DummyKeySelector(), TypeInformation.of(Integer.class) );
        keyedStream.timeWindow(Time.seconds(1)).apply(new WindowFunction<Tuple, Object, Integer, TimeWindow>() {
            @Override
            public void apply(Integer integer, TimeWindow timeWindow, Iterable<Tuple> iterable, Collector<Object> collector) throws Exception {
                collector.collect(1);
            }
        });
        env.execute("Test");

    static class DummyKeySelector implements KeySelector<Tuple, Integer> {

        @Override
        public Integer getKey(Tuple value) throws Exception {
            return value.getSourceID();
        }
    }

    static class MySource extends RichParallelSourceFunction<Tuple> {
        public MySource() {
            this.sourceID = sourceID;
        }
        @Override
        public void open(Configuration parameters) throws Exception {
            sourceID = sourceID + getRuntimeContext().getIndexOfThisSubtask();
        }

        @Override
        public void run(SourceContext<Tuple> ctx) throws Exception {
            while (true) {
                Tuple tuple = new Tuple(sourceID);
                ctx.collect(tuple);
            }
        }

        @Override
        public void cancel() {

        }
    }


Whatever I do, I get
Caused by: java.lang.IllegalArgumentException
    at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)

When I check the details from the source code, it seems that some keys are not within allowed key range, that is why Flink throws an exception.
In this case, as Konstantin said, it is not possible to interpret source as keyed.
Please correct me if I am wrong.


Thanks,
Adrienne







On Wed, Apr 3, 2019 at 8:08 PM Konstantin Knauf <[hidden email]> wrote:
Hi Adrienne,

you can only use DataStream#reinterpretAsKeyedStream on a stream, which has previously been keyed/partitioned by Flink with exactly the same KeySelector as given to reinterpretAsKeyedStream. It does not work with a key-partitioned stream, which has been partitioned by any other process. 

Best,

Konstantin

On Fri, Mar 29, 2019 at 11:47 PM Rong Rong <[hidden email]> wrote:
Hi Adrienne,

I think you should be able to reinterpretAsKeyedStream by passing in a DataStreamSource based on the ITCase example [1]. 
Can you share the full code/error logs or the IAE?

--
Rong


On Fri, Mar 29, 2019 at 6:09 AM Adrienne Kole <[hidden email]> wrote:
Dear community,

I have a use-case where sources are keyed.
For example, there is a source function with parallelism 10, and each instance has its own key.
I used reinterpretAsKeyedStream to convert source DataStream to KeyedStream, however, I get an IllegalArgument exception.
Is reinterpretAsKeyedStream can be used with source operators as well, or should the operator to be used be already partitioned (by keyby(..)) ?

Thanks,
Adrienne


--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen