Kafka KeyedStream source

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka KeyedStream source

Niels Basjes
Hi,

In my scenario I have click stream data that I persist in Kafka.
I use the sessionId as the key to instruct Kafka to put everything with the same sessionId into the same Kafka partition. That way I already have all events of a visitor in a single kafka partition in a fixed order.

When I read this data into Flink I get a generic data stream ontop of which I have to do a keyBy before my processing can continue. Such a keyBy will redistribute the data again to later tasks that can do the actual work.

Is it possible to create an adapted version of the Kafka source that immediately produces a keyed data stream?
 

--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: Kafka KeyedStream source

Tzu-Li (Gordon) Tai
Hi Niels,

Thank you for bringing this up. I recall there was some previous discussion related to this before: [1].

I don’t think this is possible at the moment, mainly because of how the API is designed.

On the other hand, a KeyedStream in Flink is basically just a DataStream with a hash partitioner that is used when deciding which instance of the following downstream operator an emitted record of the stream is sent to.
So, even if we have a Kafka source that directly produces a KeyedStream on “addSource”, redistribution of data can still happen. I.e., if the parallelism of the compute operators right after is different than the number of Kafka partitions, redistribution will happen to let the key space and state be evenly distributed in Flink.

This leads to the argument that we probably need to think about whether retaining the original partitioning of records in Kafka when consumed by Flink is actually only a special case.
Flink, as a parallel compute engine, can freely adjust the parallelism of its operators regardless of the parallelism of Kafka topics (rescaling isn’t actually in yet, but is on the near-future roadmap).

So, under the general case, the parallelism of a Flink operator may be different than the number of Kafka partitions, and therefore redistributing must occur.
For redistribution to not need to take place right after an already partitioned Kafka topic, you’d need identical numbers of 1) Kafka partitions, 2) Flink source instances consuming the partitions, and 3) the parallelism of the keyed computation afterwards. This seems like a very specific situation, considering that you’ll be able to rescale Flink operators as the data’s key space / volume grows.

The main observation, I think, is that Flink itself maintains how the key space is partitioned within the system, which plays a crucial part in rescaling. That’s why by default it doesn’t respect existing partitioning of the key space in Kafka (or other external sources). Even if it initially does at the beginning of a job, partitioning will most likely change as you rescale your job / operators (which is a good thing, to be able to adapt).

Cheers,
Gordon


On January 6, 2017 at 1:38:05 AM, Niels Basjes ([hidden email]) wrote:

Hi,

In my scenario I have click stream data that I persist in Kafka.
I use the sessionId as the key to instruct Kafka to put everything with the same sessionId into the same Kafka partition. That way I already have all events of a visitor in a single kafka partition in a fixed order.

When I read this data into Flink I get a generic data stream ontop of which I have to do a keyBy before my processing can continue. Such a keyBy will redistribute the data again to later tasks that can do the actual work.

Is it possible to create an adapted version of the Kafka source that immediately produces a keyed data stream?
 

--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: Kafka KeyedStream source

Niels Basjes
Hi,

Ok. I think I get it.

WHAT IF:
Assume we create a addKeyedSource(...) which will allow us to add a source that makes some guarantees about the data.
And assume this source returns simply the Kafka partition id as the result of this 'hash' function.
Then if I have 10 kafka partitions I would read these records in and I could filter the data more efficiently because the data would not need to go over the network before this filter.
Afterwards I can scale it up to 'many' tasks for the heavier processing that follows.

As a concept: Could that be made to work?

Niels 

On Mon, Jan 9, 2017 at 9:14 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Niels,

Thank you for bringing this up. I recall there was some previous discussion related to this before: [1].

I don’t think this is possible at the moment, mainly because of how the API is designed.

On the other hand, a KeyedStream in Flink is basically just a DataStream with a hash partitioner that is used when deciding which instance of the following downstream operator an emitted record of the stream is sent to.
So, even if we have a Kafka source that directly produces a KeyedStream on “addSource”, redistribution of data can still happen. I.e., if the parallelism of the compute operators right after is different than the number of Kafka partitions, redistribution will happen to let the key space and state be evenly distributed in Flink.

This leads to the argument that we probably need to think about whether retaining the original partitioning of records in Kafka when consumed by Flink is actually only a special case.
Flink, as a parallel compute engine, can freely adjust the parallelism of its operators regardless of the parallelism of Kafka topics (rescaling isn’t actually in yet, but is on the near-future roadmap).

So, under the general case, the parallelism of a Flink operator may be different than the number of Kafka partitions, and therefore redistributing must occur.
For redistribution to not need to take place right after an already partitioned Kafka topic, you’d need identical numbers of 1) Kafka partitions, 2) Flink source instances consuming the partitions, and 3) the parallelism of the keyed computation afterwards. This seems like a very specific situation, considering that you’ll be able to rescale Flink operators as the data’s key space / volume grows.

The main observation, I think, is that Flink itself maintains how the key space is partitioned within the system, which plays a crucial part in rescaling. That’s why by default it doesn’t respect existing partitioning of the key space in Kafka (or other external sources). Even if it initially does at the beginning of a job, partitioning will most likely change as you rescale your job / operators (which is a good thing, to be able to adapt).

Cheers,
Gordon


On January 6, 2017 at 1:38:05 AM, Niels Basjes ([hidden email]) wrote:

Hi,

In my scenario I have click stream data that I persist in Kafka.
I use the sessionId as the key to instruct Kafka to put everything with the same sessionId into the same Kafka partition. That way I already have all events of a visitor in a single kafka partition in a fixed order.

When I read this data into Flink I get a generic data stream ontop of which I have to do a keyBy before my processing can continue. Such a keyBy will redistribute the data again to later tasks that can do the actual work.

Is it possible to create an adapted version of the Kafka source that immediately produces a keyed data stream?
 

--
Best regards / Met vriendelijke groeten,

Niels Basjes



--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: Kafka KeyedStream source

Tzu-Li (Gordon) Tai
Hi Niels,

If it’s only for simple data filtering that does not depend on the key, a simple “flatMap” or “filter" directly after the source can be chained to the source instances.
What that does is that the filter processing will be done within the same thread as the one fetching data from a Kafka partition, hence no excessive network transfers for this simple filtering.

So, what that sums up to is that you have a FlinkKafkaConsumer as source, do a filter transformation right after, and then a keyBy followed with your heavy-processing, key-wise computations.
Does that makes sense for what you have in mind?

Cheers,
Gordon

On January 11, 2017 at 4:11:26 PM, Niels Basjes ([hidden email]) wrote:

Hi,

Ok. I think I get it.

WHAT IF:
Assume we create a addKeyedSource(...) which will allow us to add a source that makes some guarantees about the data.
And assume this source returns simply the Kafka partition id as the result of this 'hash' function.
Then if I have 10 kafka partitions I would read these records in and I could filter the data more efficiently because the data would not need to go over the network before this filter.
Afterwards I can scale it up to 'many' tasks for the heavier processing that follows.

As a concept: Could that be made to work?

Niels 

On Mon, Jan 9, 2017 at 9:14 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Niels,

Thank you for bringing this up. I recall there was some previous discussion related to this before: [1].

I don’t think this is possible at the moment, mainly because of how the API is designed.

On the other hand, a KeyedStream in Flink is basically just a DataStream with a hash partitioner that is used when deciding which instance of the following downstream operator an emitted record of the stream is sent to.
So, even if we have a Kafka source that directly produces a KeyedStream on “addSource”, redistribution of data can still happen. I.e., if the parallelism of the compute operators right after is different than the number of Kafka partitions, redistribution will happen to let the key space and state be evenly distributed in Flink.

This leads to the argument that we probably need to think about whether retaining the original partitioning of records in Kafka when consumed by Flink is actually only a special case.
Flink, as a parallel compute engine, can freely adjust the parallelism of its operators regardless of the parallelism of Kafka topics (rescaling isn’t actually in yet, but is on the near-future roadmap).

So, under the general case, the parallelism of a Flink operator may be different than the number of Kafka partitions, and therefore redistributing must occur.
For redistribution to not need to take place right after an already partitioned Kafka topic, you’d need identical numbers of 1) Kafka partitions, 2) Flink source instances consuming the partitions, and 3) the parallelism of the keyed computation afterwards. This seems like a very specific situation, considering that you’ll be able to rescale Flink operators as the data’s key space / volume grows.

The main observation, I think, is that Flink itself maintains how the key space is partitioned within the system, which plays a crucial part in rescaling. That’s why by default it doesn’t respect existing partitioning of the key space in Kafka (or other external sources). Even if it initially does at the beginning of a job, partitioning will most likely change as you rescale your job / operators (which is a good thing, to be able to adapt).

Cheers,
Gordon


On January 6, 2017 at 1:38:05 AM, Niels Basjes ([hidden email]) wrote:

Hi,

In my scenario I have click stream data that I persist in Kafka.
I use the sessionId as the key to instruct Kafka to put everything with the same sessionId into the same Kafka partition. That way I already have all events of a visitor in a single kafka partition in a fixed order.

When I read this data into Flink I get a generic data stream ontop of which I have to do a keyBy before my processing can continue. Such a keyBy will redistribute the data again to later tasks that can do the actual work.

Is it possible to create an adapted version of the Kafka source that immediately produces a keyed data stream?
 

--
Best regards / Met vriendelijke groeten,

Niels Basjes



--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: Kafka KeyedStream source

Fabian Hueske-2
Hi Niels,

I think the biggest problem for keyed sources is that Flink must be able to co-locate key-partitioned state with the pre-partitioned data.

This might work, if the key is the partition ID, i.e, not the original key attribue that was hashed to assign events to partitions.
Flink could need to distribute topic partitions to source functions based on its own hash function.

However, if you would like to keyBy the original key attribute, Flink would need to have access to the hash function that was used to assign events to partitions.

Best,
Fabian

2017-01-15 21:48 GMT+01:00 Tzu-Li (Gordon) Tai <[hidden email]>:
Hi Niels,

If it’s only for simple data filtering that does not depend on the key, a simple “flatMap” or “filter" directly after the source can be chained to the source instances.
What that does is that the filter processing will be done within the same thread as the one fetching data from a Kafka partition, hence no excessive network transfers for this simple filtering.

So, what that sums up to is that you have a FlinkKafkaConsumer as source, do a filter transformation right after, and then a keyBy followed with your heavy-processing, key-wise computations.
Does that makes sense for what you have in mind?

Cheers,
Gordon

On January 11, 2017 at 4:11:26 PM, Niels Basjes ([hidden email]) wrote:

Hi,

Ok. I think I get it.

WHAT IF:
Assume we create a addKeyedSource(...) which will allow us to add a source that makes some guarantees about the data.
And assume this source returns simply the Kafka partition id as the result of this 'hash' function.
Then if I have 10 kafka partitions I would read these records in and I could filter the data more efficiently because the data would not need to go over the network before this filter.
Afterwards I can scale it up to 'many' tasks for the heavier processing that follows.

As a concept: Could that be made to work?

Niels 

On Mon, Jan 9, 2017 at 9:14 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Niels,

Thank you for bringing this up. I recall there was some previous discussion related to this before: [1].

I don’t think this is possible at the moment, mainly because of how the API is designed.

On the other hand, a KeyedStream in Flink is basically just a DataStream with a hash partitioner that is used when deciding which instance of the following downstream operator an emitted record of the stream is sent to.
So, even if we have a Kafka source that directly produces a KeyedStream on “addSource”, redistribution of data can still happen. I.e., if the parallelism of the compute operators right after is different than the number of Kafka partitions, redistribution will happen to let the key space and state be evenly distributed in Flink.

This leads to the argument that we probably need to think about whether retaining the original partitioning of records in Kafka when consumed by Flink is actually only a special case.
Flink, as a parallel compute engine, can freely adjust the parallelism of its operators regardless of the parallelism of Kafka topics (rescaling isn’t actually in yet, but is on the near-future roadmap).

So, under the general case, the parallelism of a Flink operator may be different than the number of Kafka partitions, and therefore redistributing must occur.
For redistribution to not need to take place right after an already partitioned Kafka topic, you’d need identical numbers of 1) Kafka partitions, 2) Flink source instances consuming the partitions, and 3) the parallelism of the keyed computation afterwards. This seems like a very specific situation, considering that you’ll be able to rescale Flink operators as the data’s key space / volume grows.

The main observation, I think, is that Flink itself maintains how the key space is partitioned within the system, which plays a crucial part in rescaling. That’s why by default it doesn’t respect existing partitioning of the key space in Kafka (or other external sources). Even if it initially does at the beginning of a job, partitioning will most likely change as you rescale your job / operators (which is a good thing, to be able to adapt).

Cheers,
Gordon


On January 6, 2017 at 1:38:05 AM, Niels Basjes ([hidden email]) wrote:

Hi,

In my scenario I have click stream data that I persist in Kafka.
I use the sessionId as the key to instruct Kafka to put everything with the same sessionId into the same Kafka partition. That way I already have all events of a visitor in a single kafka partition in a fixed order.

When I read this data into Flink I get a generic data stream ontop of which I have to do a keyBy before my processing can continue. Such a keyBy will redistribute the data again to later tasks that can do the actual work.

Is it possible to create an adapted version of the Kafka source that immediately produces a keyed data stream?
 

--
Best regards / Met vriendelijke groeten,

Niels Basjes



--
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply | Threaded
Open this post in threaded view
|

Re: Kafka KeyedStream source

Niels Basjes
Hi,

> However, if you would like to keyBy the original key attribute, Flink would need to have access to the hash function that was used to assign events to partitions.

So if my producing application and my consuming application use the same source attributes AND the same hashing function to determine the partition it should be able to make to work.
Right now I simply provide the 'sessionId' to Kafka and thenit is probably a hashing function IN kafka that does the magic.
I'm not sure if we can control that enough with Kafka right now.


Niels 

On Mon, Jan 16, 2017 at 10:15 AM, Fabian Hueske <[hidden email]> wrote:
Hi Niels,

I think the biggest problem for keyed sources is that Flink must be able to co-locate key-partitioned state with the pre-partitioned data.

This might work, if the key is the partition ID, i.e, not the original key attribue that was hashed to assign events to partitions.
Flink could need to distribute topic partitions to source functions based on its own hash function.

However, if you would like to keyBy the original key attribute, Flink would need to have access to the hash function that was used to assign events to partitions.

Best,
Fabian

2017-01-15 21:48 GMT+01:00 Tzu-Li (Gordon) Tai <[hidden email]>:
Hi Niels,

If it’s only for simple data filtering that does not depend on the key, a simple “flatMap” or “filter" directly after the source can be chained to the source instances.
What that does is that the filter processing will be done within the same thread as the one fetching data from a Kafka partition, hence no excessive network transfers for this simple filtering.

So, what that sums up to is that you have a FlinkKafkaConsumer as source, do a filter transformation right after, and then a keyBy followed with your heavy-processing, key-wise computations.
Does that makes sense for what you have in mind?

Cheers,
Gordon

On January 11, 2017 at 4:11:26 PM, Niels Basjes ([hidden email]) wrote:

Hi,

Ok. I think I get it.

WHAT IF:
Assume we create a addKeyedSource(...) which will allow us to add a source that makes some guarantees about the data.
And assume this source returns simply the Kafka partition id as the result of this 'hash' function.
Then if I have 10 kafka partitions I would read these records in and I could filter the data more efficiently because the data would not need to go over the network before this filter.
Afterwards I can scale it up to 'many' tasks for the heavier processing that follows.

As a concept: Could that be made to work?

Niels 

On Mon, Jan 9, 2017 at 9:14 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Niels,

Thank you for bringing this up. I recall there was some previous discussion related to this before: [1].

I don’t think this is possible at the moment, mainly because of how the API is designed.

On the other hand, a KeyedStream in Flink is basically just a DataStream with a hash partitioner that is used when deciding which instance of the following downstream operator an emitted record of the stream is sent to.
So, even if we have a Kafka source that directly produces a KeyedStream on “addSource”, redistribution of data can still happen. I.e., if the parallelism of the compute operators right after is different than the number of Kafka partitions, redistribution will happen to let the key space and state be evenly distributed in Flink.

This leads to the argument that we probably need to think about whether retaining the original partitioning of records in Kafka when consumed by Flink is actually only a special case.
Flink, as a parallel compute engine, can freely adjust the parallelism of its operators regardless of the parallelism of Kafka topics (rescaling isn’t actually in yet, but is on the near-future roadmap).

So, under the general case, the parallelism of a Flink operator may be different than the number of Kafka partitions, and therefore redistributing must occur.
For redistribution to not need to take place right after an already partitioned Kafka topic, you’d need identical numbers of 1) Kafka partitions, 2) Flink source instances consuming the partitions, and 3) the parallelism of the keyed computation afterwards. This seems like a very specific situation, considering that you’ll be able to rescale Flink operators as the data’s key space / volume grows.

The main observation, I think, is that Flink itself maintains how the key space is partitioned within the system, which plays a crucial part in rescaling. That’s why by default it doesn’t respect existing partitioning of the key space in Kafka (or other external sources). Even if it initially does at the beginning of a job, partitioning will most likely change as you rescale your job / operators (which is a good thing, to be able to adapt).

Cheers,
Gordon


On January 6, 2017 at 1:38:05 AM, Niels Basjes ([hidden email]) wrote:

Hi,

In my scenario I have click stream data that I persist in Kafka.
I use the sessionId as the key to instruct Kafka to put everything with the same sessionId into the same Kafka partition. That way I already have all events of a visitor in a single kafka partition in a fixed order.

When I read this data into Flink I get a generic data stream ontop of which I have to do a keyBy before my processing can continue. Such a keyBy will redistribute the data again to later tasks that can do the actual work.

Is it possible to create an adapted version of the Kafka source that immediately produces a keyed data stream?
 

--
Best regards / Met vriendelijke groeten,

Niels Basjes



--
Best regards / Met vriendelijke groeten,

Niels Basjes




--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: Kafka KeyedStream source

Fabian Hueske-2
Hi Niels,

I was more talking from a theoretical point of view.
Flink does not have a hook to inject a custom hash function (yet). I'm not familiar with the details of the implementation to make an assessment whether this would be possible or how much work it would be. However, several users have asked about this feature. So there is definitely interest for this.

Best, Fabian



2017-01-18 16:42 GMT+01:00 Niels Basjes <[hidden email]>:
Hi,

> However, if you would like to keyBy the original key attribute, Flink would need to have access to the hash function that was used to assign events to partitions.

So if my producing application and my consuming application use the same source attributes AND the same hashing function to determine the partition it should be able to make to work.
Right now I simply provide the 'sessionId' to Kafka and thenit is probably a hashing function IN kafka that does the magic.
I'm not sure if we can control that enough with Kafka right now.


Niels 

On Mon, Jan 16, 2017 at 10:15 AM, Fabian Hueske <[hidden email]> wrote:
Hi Niels,

I think the biggest problem for keyed sources is that Flink must be able to co-locate key-partitioned state with the pre-partitioned data.

This might work, if the key is the partition ID, i.e, not the original key attribue that was hashed to assign events to partitions.
Flink could need to distribute topic partitions to source functions based on its own hash function.

However, if you would like to keyBy the original key attribute, Flink would need to have access to the hash function that was used to assign events to partitions.

Best,
Fabian

2017-01-15 21:48 GMT+01:00 Tzu-Li (Gordon) Tai <[hidden email]>:
Hi Niels,

If it’s only for simple data filtering that does not depend on the key, a simple “flatMap” or “filter" directly after the source can be chained to the source instances.
What that does is that the filter processing will be done within the same thread as the one fetching data from a Kafka partition, hence no excessive network transfers for this simple filtering.

So, what that sums up to is that you have a FlinkKafkaConsumer as source, do a filter transformation right after, and then a keyBy followed with your heavy-processing, key-wise computations.
Does that makes sense for what you have in mind?

Cheers,
Gordon

On January 11, 2017 at 4:11:26 PM, Niels Basjes ([hidden email]) wrote:

Hi,

Ok. I think I get it.

WHAT IF:
Assume we create a addKeyedSource(...) which will allow us to add a source that makes some guarantees about the data.
And assume this source returns simply the Kafka partition id as the result of this 'hash' function.
Then if I have 10 kafka partitions I would read these records in and I could filter the data more efficiently because the data would not need to go over the network before this filter.
Afterwards I can scale it up to 'many' tasks for the heavier processing that follows.

As a concept: Could that be made to work?

Niels 

On Mon, Jan 9, 2017 at 9:14 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Niels,

Thank you for bringing this up. I recall there was some previous discussion related to this before: [1].

I don’t think this is possible at the moment, mainly because of how the API is designed.

On the other hand, a KeyedStream in Flink is basically just a DataStream with a hash partitioner that is used when deciding which instance of the following downstream operator an emitted record of the stream is sent to.
So, even if we have a Kafka source that directly produces a KeyedStream on “addSource”, redistribution of data can still happen. I.e., if the parallelism of the compute operators right after is different than the number of Kafka partitions, redistribution will happen to let the key space and state be evenly distributed in Flink.

This leads to the argument that we probably need to think about whether retaining the original partitioning of records in Kafka when consumed by Flink is actually only a special case.
Flink, as a parallel compute engine, can freely adjust the parallelism of its operators regardless of the parallelism of Kafka topics (rescaling isn’t actually in yet, but is on the near-future roadmap).

So, under the general case, the parallelism of a Flink operator may be different than the number of Kafka partitions, and therefore redistributing must occur.
For redistribution to not need to take place right after an already partitioned Kafka topic, you’d need identical numbers of 1) Kafka partitions, 2) Flink source instances consuming the partitions, and 3) the parallelism of the keyed computation afterwards. This seems like a very specific situation, considering that you’ll be able to rescale Flink operators as the data’s key space / volume grows.

The main observation, I think, is that Flink itself maintains how the key space is partitioned within the system, which plays a crucial part in rescaling. That’s why by default it doesn’t respect existing partitioning of the key space in Kafka (or other external sources). Even if it initially does at the beginning of a job, partitioning will most likely change as you rescale your job / operators (which is a good thing, to be able to adapt).

Cheers,
Gordon


On January 6, 2017 at 1:38:05 AM, Niels Basjes ([hidden email]) wrote:

Hi,

In my scenario I have click stream data that I persist in Kafka.
I use the sessionId as the key to instruct Kafka to put everything with the same sessionId into the same Kafka partition. That way I already have all events of a visitor in a single kafka partition in a fixed order.

When I read this data into Flink I get a generic data stream ontop of which I have to do a keyBy before my processing can continue. Such a keyBy will redistribute the data again to later tasks that can do the actual work.

Is it possible to create an adapted version of the Kafka source that immediately produces a keyed data stream?
 

--
Best regards / Met vriendelijke groeten,

Niels Basjes



--
Best regards / Met vriendelijke groeten,

Niels Basjes




--
Best regards / Met vriendelijke groeten,

Niels Basjes