Dynamic partitioning for stream output

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Dynamic partitioning for stream output

Juho Autio
Could you suggest how to dynamically partition data with Flink streaming?

We've looked at RollingSink, that takes care of writing batches to S3, but it doesn't allow defining the partition dynamically based on the tuple fields.

Our data is coming from Kafka and essentially has the kafka topic and a date, among other fields.

We'd like to consume all topics (also automatically subscribe to new ones) and write to S3 partitioned by topic and date, for example:

s3://bucket/path/topic=topic2/date=20160522/
s3://bucket/path/topic=topic2/date=20160523/
s3://bucket/path/topic=topic1/date=20160522/
s3://bucket/path/topic=topic1/date=20160523/

There are two problems with RollingSink as it is now:
- Only allows partitioning by date
- Flushes the batch every time the path changes. In our case the stream can for example have a random mix of different topics and that would mean that RollingSink isn't able to respect the max flush size but keeps flushing the files pretty much on every tuple.

We've thought that we could implement a sink that internally creates and handles multiple RollingSink instances as needed for partitions. But it would be great to first hear any suggestions that you might have.

If we have to extend RollingSink, it would be nice to make it take a partitioning function as a parameter. The function would be called for each tuple to create the output path.
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic partitioning for stream output

Kostas Kloudas
Hi Juho,

If I understand correctly, you want a custom RollingSink that caches some
buckets, one for each topic/date key, and whenever the volume of data buffered
exceeds a limit, then it flushes to disk, right?

If this is the case, then you are right that this is not currently supported
out-of-the-box, but it would be interesting to update the RollingSink
to support such scenarios.

One clarification: when you say that you want partition by date,
you mean the date of the event, right? Not the processing time.

Kostas

> On May 24, 2016, at 1:22 PM, Juho Autio <[hidden email]> wrote:
>
> Could you suggest how to dynamically partition data with Flink streaming?
>
> We've looked at RollingSink, that takes care of writing batches to S3, but
> it doesn't allow defining the partition dynamically based on the tuple
> fields.
>
> Our data is coming from Kafka and essentially has the kafka topic and a
> date, among other fields.
>
> We'd like to consume all topics (also automatically subscribe to new ones)
> and write to S3 partitioned by topic and date, for example:
>
> s3://bucket/path/topic=topic2/date=20160522/
> s3://bucket/path/topic=topic2/date=20160523/
> s3://bucket/path/topic=topic1/date=20160522/
> s3://bucket/path/topic=topic1/date=20160523/
>
> There are two problems with RollingSink as it is now:
> - Only allows partitioning by date
> - Flushes the batch every time the path changes. In our case the stream can
> for example have a random mix of different topics and that would mean that
> RollingSink isn't able to respect the max flush size but keeps flushing the
> files pretty much on every tuple.
>
> We've thought that we could implement a sink that internally creates and
> handles multiple RollingSink instances as needed for partitions. But it
> would be great to first hear any suggestions that you might have.
>
> If we have to extend RollingSink, it would be nice to make it take a
> partitioning function as a parameter. The function would be called for each
> tuple to create the output path.
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-partitioning-for-stream-output-tp7122.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Dynamic partitioning for stream output

Juho Autio
Thanks, indeed the desired behavior is to flush if bucket size exceeds a limit but also if the bucket has been open long enough. Contrary to the current RollingSink we don't want to flush all the time if the bucket changes but have multiple buckets "open" as needed.

In our case the date to use for partitioning comes from an event field, but needs to be formatted, too. The partitioning feature should be generic, allowing to pass a function that formats the bucket path for each tuple.

Does it seem like a valid plan to create a sink that internally caches multiple rolling sinks?

On Tue, May 24, 2016 at 3:50 PM, Kostas Kloudas <[hidden email]> wrote:
Hi Juho,

If I understand correctly, you want a custom RollingSink that caches some
buckets, one for each topic/date key, and whenever the volume of data buffered
exceeds a limit, then it flushes to disk, right?

If this is the case, then you are right that this is not currently supported
out-of-the-box, but it would be interesting to update the RollingSink
to support such scenarios.

One clarification: when you say that you want partition by date,
you mean the date of the event, right? Not the processing time.

Kostas

> On May 24, 2016, at 1:22 PM, Juho Autio <[hidden email]> wrote:
>
> Could you suggest how to dynamically partition data with Flink streaming?
>
> We've looked at RollingSink, that takes care of writing batches to S3, but
> it doesn't allow defining the partition dynamically based on the tuple
> fields.
>
> Our data is coming from Kafka and essentially has the kafka topic and a
> date, among other fields.
>
> We'd like to consume all topics (also automatically subscribe to new ones)
> and write to S3 partitioned by topic and date, for example:
>
> s3://bucket/path/topic=topic2/date=20160522/
> s3://bucket/path/topic=topic2/date=20160523/
> s3://bucket/path/topic=topic1/date=20160522/
> s3://bucket/path/topic=topic1/date=20160523/
>
> There are two problems with RollingSink as it is now:
> - Only allows partitioning by date
> - Flushes the batch every time the path changes. In our case the stream can
> for example have a random mix of different topics and that would mean that
> RollingSink isn't able to respect the max flush size but keeps flushing the
> files pretty much on every tuple.
>
> We've thought that we could implement a sink that internally creates and
> handles multiple RollingSink instances as needed for partitions. But it
> would be great to first hear any suggestions that you might have.
>
> If we have to extend RollingSink, it would be nice to make it take a
> partitioning function as a parameter. The function would be called for each
> tuple to create the output path.
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-partitioning-for-stream-output-tp7122.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic partitioning for stream output

Juho Autio
Related issue: https://issues.apache.org/jira/browse/FLINK-2672

On Wed, May 25, 2016 at 9:21 AM, Juho Autio <[hidden email]> wrote:
Thanks, indeed the desired behavior is to flush if bucket size exceeds a limit but also if the bucket has been open long enough. Contrary to the current RollingSink we don't want to flush all the time if the bucket changes but have multiple buckets "open" as needed.

In our case the date to use for partitioning comes from an event field, but needs to be formatted, too. The partitioning feature should be generic, allowing to pass a function that formats the bucket path for each tuple.

Does it seem like a valid plan to create a sink that internally caches multiple rolling sinks?


On Tue, May 24, 2016 at 3:50 PM, Kostas Kloudas <[hidden email]> wrote:
Hi Juho,

If I understand correctly, you want a custom RollingSink that caches some
buckets, one for each topic/date key, and whenever the volume of data buffered
exceeds a limit, then it flushes to disk, right?

If this is the case, then you are right that this is not currently supported
out-of-the-box, but it would be interesting to update the RollingSink
to support such scenarios.

One clarification: when you say that you want partition by date,
you mean the date of the event, right? Not the processing time.

Kostas

> On May 24, 2016, at 1:22 PM, Juho Autio <[hidden email]> wrote:
>
> Could you suggest how to dynamically partition data with Flink streaming?
>
> We've looked at RollingSink, that takes care of writing batches to S3, but
> it doesn't allow defining the partition dynamically based on the tuple
> fields.
>
> Our data is coming from Kafka and essentially has the kafka topic and a
> date, among other fields.
>
> We'd like to consume all topics (also automatically subscribe to new ones)
> and write to S3 partitioned by topic and date, for example:
>
> s3://bucket/path/topic=topic2/date=20160522/
> s3://bucket/path/topic=topic2/date=20160523/
> s3://bucket/path/topic=topic1/date=20160522/
> s3://bucket/path/topic=topic1/date=20160523/
>
> There are two problems with RollingSink as it is now:
> - Only allows partitioning by date
> - Flushes the batch every time the path changes. In our case the stream can
> for example have a random mix of different topics and that would mean that
> RollingSink isn't able to respect the max flush size but keeps flushing the
> files pretty much on every tuple.
>
> We've thought that we could implement a sink that internally creates and
> handles multiple RollingSink instances as needed for partitions. But it
> would be great to first hear any suggestions that you might have.
>
> If we have to extend RollingSink, it would be nice to make it take a
> partitioning function as a parameter. The function would be called for each
> tuple to create the output path.
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-partitioning-for-stream-output-tp7122.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic partitioning for stream output

Kostas Kloudas
In reply to this post by Juho Autio
Hi Juho,

To be more aligned with the semantics in Flink, I would suggest a solution with a single modified RollingSink that caches 
multiple buckets (from the Bucketer) and flushes (some of) them to disk whenever certain time or space criteria are met. 

I would say that it is worth modifying the rolling sink so that it can such use cases (different flushing policies). 
Aljoscha, as the writer of the original Rolling Sink, what do you think?

Kostas

On May 25, 2016, at 8:21 AM, Juho Autio <[hidden email]> wrote:

Thanks, indeed the desired behavior is to flush if bucket size exceeds a limit but also if the bucket has been open long enough. Contrary to the current RollingSink we don't want to flush all the time if the bucket changes but have multiple buckets "open" as needed.

In our case the date to use for partitioning comes from an event field, but needs to be formatted, too. The partitioning feature should be generic, allowing to pass a function that formats the bucket path for each tuple.

Does it seem like a valid plan to create a sink that internally caches multiple rolling sinks?

On Tue, May 24, 2016 at 3:50 PM, Kostas Kloudas <[hidden email]> wrote:
Hi Juho,

If I understand correctly, you want a custom RollingSink that caches some
buckets, one for each topic/date key, and whenever the volume of data buffered
exceeds a limit, then it flushes to disk, right?

If this is the case, then you are right that this is not currently supported
out-of-the-box, but it would be interesting to update the RollingSink
to support such scenarios.

One clarification: when you say that you want partition by date,
you mean the date of the event, right? Not the processing time.

Kostas

> On May 24, 2016, at 1:22 PM, Juho Autio <[hidden email]> wrote:
>
> Could you suggest how to dynamically partition data with Flink streaming?
>
> We've looked at RollingSink, that takes care of writing batches to S3, but
> it doesn't allow defining the partition dynamically based on the tuple
> fields.
>
> Our data is coming from Kafka and essentially has the kafka topic and a
> date, among other fields.
>
> We'd like to consume all topics (also automatically subscribe to new ones)
> and write to S3 partitioned by topic and date, for example:
>
> <a href="s3://bucket/path/topic=topic2/date=20160522/" class="">s3://bucket/path/topic=topic2/date=20160522/
> <a href="s3://bucket/path/topic=topic2/date=20160523/" class="">s3://bucket/path/topic=topic2/date=20160523/
> <a href="s3://bucket/path/topic=topic1/date=20160522/" class="">s3://bucket/path/topic=topic1/date=20160522/
> <a href="s3://bucket/path/topic=topic1/date=20160523/" class="">s3://bucket/path/topic=topic1/date=20160523/
>
> There are two problems with RollingSink as it is now:
> - Only allows partitioning by date
> - Flushes the batch every time the path changes. In our case the stream can
> for example have a random mix of different topics and that would mean that
> RollingSink isn't able to respect the max flush size but keeps flushing the
> files pretty much on every tuple.
>
> We've thought that we could implement a sink that internally creates and
> handles multiple RollingSink instances as needed for partitions. But it
> would be great to first hear any suggestions that you might have.
>
> If we have to extend RollingSink, it would be nice to make it take a
> partitioning function as a parameter. The function would be called for each
> tuple to create the output path.
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-partitioning-for-stream-output-tp7122.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Dynamic partitioning for stream output

Aljoscha Krettek
Hi,
while I think it would be possible to do it by creating a "meta sink" that contains several RollingSinks I think the approach of integrating it into the current RollinkSink is better.

I think it's mostly a question of style and architectural purity but also of resource consumption and maintainability. If there are several RollingSinks in one other sink instead of just one RollingSink then we duplicate all of the internal structures of RollingSink. For maintainability, we would have to be very careful when interacting with the nested sources to ensure that they really can behave as proper sources. (watermarks, checkpoints, closing/disposing come to mind now but this might grow in the future.)

Cheers,
Aljoscha

On Wed, 25 May 2016 at 11:35 Kostas Kloudas <[hidden email]> wrote:
Hi Juho,

To be more aligned with the semantics in Flink, I would suggest a solution with a single modified RollingSink that caches 
multiple buckets (from the Bucketer) and flushes (some of) them to disk whenever certain time or space criteria are met. 

I would say that it is worth modifying the rolling sink so that it can such use cases (different flushing policies). 
Aljoscha, as the writer of the original Rolling Sink, what do you think?

Kostas

On May 25, 2016, at 8:21 AM, Juho Autio <[hidden email]> wrote:

Thanks, indeed the desired behavior is to flush if bucket size exceeds a limit but also if the bucket has been open long enough. Contrary to the current RollingSink we don't want to flush all the time if the bucket changes but have multiple buckets "open" as needed.

In our case the date to use for partitioning comes from an event field, but needs to be formatted, too. The partitioning feature should be generic, allowing to pass a function that formats the bucket path for each tuple.

Does it seem like a valid plan to create a sink that internally caches multiple rolling sinks?

On Tue, May 24, 2016 at 3:50 PM, Kostas Kloudas <[hidden email]> wrote:
Hi Juho,

If I understand correctly, you want a custom RollingSink that caches some
buckets, one for each topic/date key, and whenever the volume of data buffered
exceeds a limit, then it flushes to disk, right?

If this is the case, then you are right that this is not currently supported
out-of-the-box, but it would be interesting to update the RollingSink
to support such scenarios.

One clarification: when you say that you want partition by date,
you mean the date of the event, right? Not the processing time.

Kostas

> On May 24, 2016, at 1:22 PM, Juho Autio <[hidden email]> wrote:
>
> Could you suggest how to dynamically partition data with Flink streaming?
>
> We've looked at RollingSink, that takes care of writing batches to S3, but
> it doesn't allow defining the partition dynamically based on the tuple
> fields.
>
> Our data is coming from Kafka and essentially has the kafka topic and a
> date, among other fields.
>
> We'd like to consume all topics (also automatically subscribe to new ones)
> and write to S3 partitioned by topic and date, for example:
>
> s3://bucket/path/topic=topic2/date=20160522/
> s3://bucket/path/topic=topic2/date=20160523/
> s3://bucket/path/topic=topic1/date=20160522/
> s3://bucket/path/topic=topic1/date=20160523/
>
> There are two problems with RollingSink as it is now:
> - Only allows partitioning by date
> - Flushes the batch every time the path changes. In our case the stream can
> for example have a random mix of different topics and that would mean that
> RollingSink isn't able to respect the max flush size but keeps flushing the
> files pretty much on every tuple.
>
> We've thought that we could implement a sink that internally creates and
> handles multiple RollingSink instances as needed for partitions. But it
> would be great to first hear any suggestions that you might have.
>
> If we have to extend RollingSink, it would be nice to make it take a
> partitioning function as a parameter. The function would be called for each
> tuple to create the output path.
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-partitioning-for-stream-output-tp7122.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Dynamic partitioning for stream output

Josh
Hi guys,

I've been working on this feature as I needed something similar. Have a look at my issue here https://issues.apache.org/jira/browse/FLINK-4190 and changes here https://github.com/joshfg/flink/tree/flink-4190
The changes follow Kostas's suggestion in this thread.

Thanks,
Josh


On Thu, May 26, 2016 at 3:27 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
while I think it would be possible to do it by creating a "meta sink" that contains several RollingSinks I think the approach of integrating it into the current RollinkSink is better.

I think it's mostly a question of style and architectural purity but also of resource consumption and maintainability. If there are several RollingSinks in one other sink instead of just one RollingSink then we duplicate all of the internal structures of RollingSink. For maintainability, we would have to be very careful when interacting with the nested sources to ensure that they really can behave as proper sources. (watermarks, checkpoints, closing/disposing come to mind now but this might grow in the future.)

Cheers,
Aljoscha

On Wed, 25 May 2016 at 11:35 Kostas Kloudas <[hidden email]> wrote:
Hi Juho,

To be more aligned with the semantics in Flink, I would suggest a solution with a single modified RollingSink that caches 
multiple buckets (from the Bucketer) and flushes (some of) them to disk whenever certain time or space criteria are met. 

I would say that it is worth modifying the rolling sink so that it can such use cases (different flushing policies). 
Aljoscha, as the writer of the original Rolling Sink, what do you think?

Kostas

On May 25, 2016, at 8:21 AM, Juho Autio <[hidden email]> wrote:

Thanks, indeed the desired behavior is to flush if bucket size exceeds a limit but also if the bucket has been open long enough. Contrary to the current RollingSink we don't want to flush all the time if the bucket changes but have multiple buckets "open" as needed.

In our case the date to use for partitioning comes from an event field, but needs to be formatted, too. The partitioning feature should be generic, allowing to pass a function that formats the bucket path for each tuple.

Does it seem like a valid plan to create a sink that internally caches multiple rolling sinks?

On Tue, May 24, 2016 at 3:50 PM, Kostas Kloudas <[hidden email]> wrote:
Hi Juho,

If I understand correctly, you want a custom RollingSink that caches some
buckets, one for each topic/date key, and whenever the volume of data buffered
exceeds a limit, then it flushes to disk, right?

If this is the case, then you are right that this is not currently supported
out-of-the-box, but it would be interesting to update the RollingSink
to support such scenarios.

One clarification: when you say that you want partition by date,
you mean the date of the event, right? Not the processing time.

Kostas

> On May 24, 2016, at 1:22 PM, Juho Autio <[hidden email]> wrote:
>
> Could you suggest how to dynamically partition data with Flink streaming?
>
> We've looked at RollingSink, that takes care of writing batches to S3, but
> it doesn't allow defining the partition dynamically based on the tuple
> fields.
>
> Our data is coming from Kafka and essentially has the kafka topic and a
> date, among other fields.
>
> We'd like to consume all topics (also automatically subscribe to new ones)
> and write to S3 partitioned by topic and date, for example:
>
> s3://bucket/path/topic=topic2/date=20160522/
> s3://bucket/path/topic=topic2/date=20160523/
> s3://bucket/path/topic=topic1/date=20160522/
> s3://bucket/path/topic=topic1/date=20160523/
>
> There are two problems with RollingSink as it is now:
> - Only allows partitioning by date
> - Flushes the batch every time the path changes. In our case the stream can
> for example have a random mix of different topics and that would mean that
> RollingSink isn't able to respect the max flush size but keeps flushing the
> files pretty much on every tuple.
>
> We've thought that we could implement a sink that internally creates and
> handles multiple RollingSink instances as needed for partitions. But it
> would be great to first hear any suggestions that you might have.
>
> If we have to extend RollingSink, it would be nice to make it take a
> partitioning function as a parameter. The function would be called for each
> tuple to create the output path.
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-partitioning-for-stream-output-tp7122.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.