KeyedSream question

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

KeyedSream question

Michael Latta
I am new to Flink and trying to understand the keyBy and KeyedStream.  From the short doc description I expected it to partition the data such that the following flatMap would only see elements with the same key.  That events with different keys would be presented to different instances of FlatMapFunction.  But, I am seeing it present all events in the stream to the same FlatMapFunction.

Michael
Reply | Threaded
Open this post in threaded view
|

Re: KeyedSream question

Amit Jain
Hi,

KeyBy operation partition the data on given key and make sure same slot will
get all future data belonging to same key. In default implementation, it can
also map subset of keys in your DataStream to same slot.

Assuming you have number of keys equal to number running slot then you may
specify your custom keyBy operation to the achieve the same.


Could you specify your case.

--
Thanks
Amit  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: KeyedSream question

Fabian Hueske-2
Amit is correct. keyBy() ensures that all records with the same key are processed by the same paralllel instance of a function.
This is different from "a parallel instance only sees records of one key".

I had a look at the docs [1].
I agree that "Logically partitions a stream into disjoint partitions, each partition containing elements of the same key." can be easily interpreted as you did.
I've pushed a commit to clarify the description. The docs should be updated soon.


2018-04-05 6:21 GMT+02:00 Amit Jain <[hidden email]>:
Hi,

KeyBy operation partition the data on given key and make sure same slot will
get all future data belonging to same key. In default implementation, it can
also map subset of keys in your DataStream to same slot.

Assuming you have number of keys equal to number running slot then you may
specify your custom keyBy operation to the achieve the same.


Could you specify your case.

--
Thanks
Amit



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: KeyedSream question

Michael Latta
Thanks for the clarification. I was just trying to understand the intended behavior. It would have been nice if Flink tracked state for downstream operators by key, but I can do that with a map in the downstream functions. 

Michael

Sent from my iPad

On Apr 5, 2018, at 2:30 AM, Fabian Hueske <[hidden email]> wrote:

Amit is correct. keyBy() ensures that all records with the same key are processed by the same paralllel instance of a function.
This is different from "a parallel instance only sees records of one key".

I had a look at the docs [1].
I agree that "Logically partitions a stream into disjoint partitions, each partition containing elements of the same key." can be easily interpreted as you did.
I've pushed a commit to clarify the description. The docs should be updated soon.


2018-04-05 6:21 GMT+02:00 Amit Jain <[hidden email]>:
Hi,

KeyBy operation partition the data on given key and make sure same slot will
get all future data belonging to same key. In default implementation, it can
also map subset of keys in your DataStream to same slot.

Assuming you have number of keys equal to number running slot then you may
specify your custom keyBy operation to the achieve the same.


Could you specify your case.

--
Thanks
Amit



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: KeyedSream question

Fabian Hueske-2
Hi,

I think Flink is exactly doing what you are looking for.
If you use keyed state [1], Flink will put the state always in the context of the key of the currently processed record.
So if you have a MapFunction with keyed state, and the map() method is called with a record that has a key A, the state will be the state for key A. If the next record has a key B, the state will be for key B.

Best,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#keyed-state

2018-04-05 14:08 GMT+02:00 Michael Latta <[hidden email]>:
Thanks for the clarification. I was just trying to understand the intended behavior. It would have been nice if Flink tracked state for downstream operators by key, but I can do that with a map in the downstream functions. 

Michael

Sent from my iPad

On Apr 5, 2018, at 2:30 AM, Fabian Hueske <[hidden email]> wrote:

Amit is correct. keyBy() ensures that all records with the same key are processed by the same paralllel instance of a function.
This is different from "a parallel instance only sees records of one key".

I had a look at the docs [1].
I agree that "Logically partitions a stream into disjoint partitions, each partition containing elements of the same key." can be easily interpreted as you did.
I've pushed a commit to clarify the description. The docs should be updated soon.


2018-04-05 6:21 GMT+02:00 Amit Jain <[hidden email]>:
Hi,

KeyBy operation partition the data on given key and make sure same slot will
get all future data belonging to same key. In default implementation, it can
also map subset of keys in your DataStream to same slot.

Assuming you have number of keys equal to number running slot then you may
specify your custom keyBy operation to the achieve the same.


Could you specify your case.

--
Thanks
Amit



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: KeyedSream question

Michael Latta
Yes. It took a bit of digging in the website to find RichFlatMapFunction to get managed state. 

Michael

Sent from my iPad

On Apr 6, 2018, at 3:29 AM, Fabian Hueske <[hidden email]> wrote:

Hi,

I think Flink is exactly doing what you are looking for.
If you use keyed state [1], Flink will put the state always in the context of the key of the currently processed record.
So if you have a MapFunction with keyed state, and the map() method is called with a record that has a key A, the state will be the state for key A. If the next record has a key B, the state will be for key B.

Best,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#keyed-state

2018-04-05 14:08 GMT+02:00 Michael Latta <[hidden email]>:
Thanks for the clarification. I was just trying to understand the intended behavior. It would have been nice if Flink tracked state for downstream operators by key, but I can do that with a map in the downstream functions. 

Michael

Sent from my iPad

On Apr 5, 2018, at 2:30 AM, Fabian Hueske <[hidden email]> wrote:

Amit is correct. keyBy() ensures that all records with the same key are processed by the same paralllel instance of a function.
This is different from "a parallel instance only sees records of one key".

I had a look at the docs [1].
I agree that "Logically partitions a stream into disjoint partitions, each partition containing elements of the same key." can be easily interpreted as you did.
I've pushed a commit to clarify the description. The docs should be updated soon.


2018-04-05 6:21 GMT+02:00 Amit Jain <[hidden email]>:
Hi,

KeyBy operation partition the data on given key and make sure same slot will
get all future data belonging to same key. In default implementation, it can
also map subset of keys in your DataStream to same slot.

Assuming you have number of keys equal to number running slot then you may
specify your custom keyBy operation to the achieve the same.


Could you specify your case.

--
Thanks
Amit



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: KeyedSream question

Shailesh Jain
I have a question related to KeyedStream, asking it here instead of starting a new thread.

If I assign timestamps on a keyed stream, the resulting stream is not keyed. So essentially I would need to apply the key by operator again after the assign timestamps operator.
Why should assigning timestamps to events change the stream from Keyed to Non-Keyed?

Thanks,
Shailesh

On Fri, Apr 6, 2018 at 4:31 PM, Michael Latta <[hidden email]> wrote:
Yes. It took a bit of digging in the website to find RichFlatMapFunction to get managed state. 

Michael

Sent from my iPad

On Apr 6, 2018, at 3:29 AM, Fabian Hueske <[hidden email]> wrote:

Hi,

I think Flink is exactly doing what you are looking for.
If you use keyed state [1], Flink will put the state always in the context of the key of the currently processed record.
So if you have a MapFunction with keyed state, and the map() method is called with a record that has a key A, the state will be the state for key A. If the next record has a key B, the state will be for key B.

Best,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#keyed-state

2018-04-05 14:08 GMT+02:00 Michael Latta <[hidden email]>:
Thanks for the clarification. I was just trying to understand the intended behavior. It would have been nice if Flink tracked state for downstream operators by key, but I can do that with a map in the downstream functions. 

Michael

Sent from my iPad

On Apr 5, 2018, at 2:30 AM, Fabian Hueske <[hidden email]> wrote:

Amit is correct. keyBy() ensures that all records with the same key are processed by the same paralllel instance of a function.
This is different from "a parallel instance only sees records of one key".

I had a look at the docs [1].
I agree that "Logically partitions a stream into disjoint partitions, each partition containing elements of the same key." can be easily interpreted as you did.
I've pushed a commit to clarify the description. The docs should be updated soon.


2018-04-05 6:21 GMT+02:00 Amit Jain <[hidden email]>:
Hi,

KeyBy operation partition the data on given key and make sure same slot will
get all future data belonging to same key. In default implementation, it can
also map subset of keys in your DataStream to same slot.

Assuming you have number of keys equal to number running slot then you may
specify your custom keyBy operation to the achieve the same.


Could you specify your case.

--
Thanks
Amit



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/