Custom File Sink using EventTime and defined custom file name for parquet file

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Custom File Sink using EventTime and defined custom file name for parquet file

David Magalhães
Hi, I'm working for the first time with Flink and I'm trying to create solution that will store events from Kafka into Parquet files in S3. This also should support re-injection of events from Parquet files into a Kafka topic.

Here is the code with a simple usage of StreamingFileSink with BulkEncode that will get the events and store in parquet files. The files will be partition by account_id and year and month (yyyyMM). The issue with this approach is when running the backfill from a certain point in time, it will be hard to not generate duplicated events, since we will not override the same files, as the filename is generate by "part-<sub_task_id>-<sequencial_number>".

To add predictability, I've used a tumbling window to aggregate multiple GenericRecord, in order to write the parquet file with a list of them. For that I've created a custom file sink, but I'm not sure of the properties I am going to lose compared to the Streaming File Sink. Here is the code. Still, there is something missing in this solution to close a window for with a giving timeout, so it can write into the sink the last events if no more events are sent.

Another work around, would be create a StreamingFileSink with a RowEncoder, and receive a List of GenericRecord, and create a custom Encoder with AvroParquetWritter to write to a File. This way I have access to a custom rolling policy. But this looks like truly inefficient. Here is the code.

Am I overthinking this solution ? I'm know there are some issues (recently closed) for the StreamingFileSink to support more custom rolling policies in BulkEncode, like https://issues.apache.org/jira/browse/FLINK-13027, but I just notice that now.
Reply | Threaded
Open this post in threaded view
|

Re: Custom File Sink using EventTime and defined custom file name for parquet file

Leonard Xu
Hi, David

For you first description, I’m a little confused about duplicated records when backfilling, could you describe your usage scenario/code more?
 
I remembered a backfill user solution from Pinterest which is very similar to yours and using Flink too[1], hope that can help you.

Best,
Leonard


在 2020年1月10日,12:14,David Magalhães <[hidden email]> 写道:

Hi, I'm working for the first time with Flink and I'm trying to create solution that will store events from Kafka into Parquet files in S3. This also should support re-injection of events from Parquet files into a Kafka topic.

Here is the code with a simple usage of StreamingFileSink with BulkEncode that will get the events and store in parquet files. The files will be partition by account_id and year and month (yyyyMM). The issue with this approach is when running the backfill from a certain point in time, it will be hard to not generate duplicated events, since we will not override the same files, as the filename is generate by "part-<sub_task_id>-<sequencial_number>".

To add predictability, I've used a tumbling window to aggregate multiple GenericRecord, in order to write the parquet file with a list of them. For that I've created a custom file sink, but I'm not sure of the properties I am going to lose compared to the Streaming File Sink. Here is the code. Still, there is something missing in this solution to close a window for with a giving timeout, so it can write into the sink the last events if no more events are sent.

Another work around, would be create a StreamingFileSink with a RowEncoder, and receive a List of GenericRecord, and create a custom Encoder with AvroParquetWritter to write to a File. This way I have access to a custom rolling policy. But this looks like truly inefficient. Here is the code.

Am I overthinking this solution ? I'm know there are some issues (recently closed) for the StreamingFileSink to support more custom rolling policies in BulkEncode, like https://issues.apache.org/jira/browse/FLINK-13027, but I just notice that now.

Reply | Threaded
Open this post in threaded view
|

Re: Custom File Sink using EventTime and defined custom file name for parquet file

Till Rohrmann
Hi David,

I'm pulling in Kostas who worked on the StreamingFileSink and might be able to answer some of your questions.

Cheers,
Till

On Mon, Jan 13, 2020 at 2:45 PM Leonard Xu <[hidden email]> wrote:
Hi, David

For you first description, I’m a little confused about duplicated records when backfilling, could you describe your usage scenario/code more?
 
I remembered a backfill user solution from Pinterest which is very similar to yours and using Flink too[1], hope that can help you.

Best,
Leonard


在 2020年1月10日,12:14,David Magalhães <[hidden email]> 写道:

Hi, I'm working for the first time with Flink and I'm trying to create solution that will store events from Kafka into Parquet files in S3. This also should support re-injection of events from Parquet files into a Kafka topic.

Here is the code with a simple usage of StreamingFileSink with BulkEncode that will get the events and store in parquet files. The files will be partition by account_id and year and month (yyyyMM). The issue with this approach is when running the backfill from a certain point in time, it will be hard to not generate duplicated events, since we will not override the same files, as the filename is generate by "part-<sub_task_id>-<sequencial_number>".

To add predictability, I've used a tumbling window to aggregate multiple GenericRecord, in order to write the parquet file with a list of them. For that I've created a custom file sink, but I'm not sure of the properties I am going to lose compared to the Streaming File Sink. Here is the code. Still, there is something missing in this solution to close a window for with a giving timeout, so it can write into the sink the last events if no more events are sent.

Another work around, would be create a StreamingFileSink with a RowEncoder, and receive a List of GenericRecord, and create a custom Encoder with AvroParquetWritter to write to a File. This way I have access to a custom rolling policy. But this looks like truly inefficient. Here is the code.

Am I overthinking this solution ? I'm know there are some issues (recently closed) for the StreamingFileSink to support more custom rolling policies in BulkEncode, like https://issues.apache.org/jira/browse/FLINK-13027, but I just notice that now.

Reply | Threaded
Open this post in threaded view
|

Re: Custom File Sink using EventTime and defined custom file name for parquet file

David Magalhães
Sorry, I've only saw the replies today. 

Regarding my previous email, 

Still, there is something missing in this solution to close a window for with a giving timeout, so it can write into the sink the last events if no more events are sent.

I've fixed this using a custom trigger, 

val flag = ctx.getPartitionedState(valueStateDescriptor).value()

// Flag only used to register one trigger per window. Flag is cleaned when FIRE action is executed.
if (!flag) {
  val delay = window.getEnd - window.getStart
  ctx.getPartitionedState(valueStateDescriptor).update(true)
  ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime + delay)
  ctx.registerEventTimeTimer(window.maxTimestamp())
}

TriggerResult.CONTINUE

Leonard, by "duplicated events" I mean store the same event on different parquet files, since the file format was "part-X-Y". So, if I start to process the same stream again (from a point in the past) I couldn't overwrite the files with exactly the same name. 

I think I've read a blogpost about them (Pinterest), I will check the video.

Kostas, replied to only me, I'm adding his response here.

Hi David,
I skimmed through the solution with the window before the sink.
If this solution fits your needs, I think you could:
1)  just specify a BucketAssigner instead of writing a custom sink,
this will allow you to not lose any functionality from the
StreamingFileSink
2)  for the timeout requirement, you could use a (keyed) process
function with map state to hold your event-time windows. The key will
be the window start (or interval) and you can register timers to fire
at the end of the window or after a certain period of inactivity. I
think that [1] can be a good starting point.
I hope this helps,
Kostas
[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html

I think I can only define partition name on BucketAssigner, because I don't want to have many partition (currently I've accountId and yyyyMM (year and month)). I've checked that on Flink 1.10 [1] we will have access to configure a prefix and suffix for the filename, where I could add the day and hour to the prefix, and when I needed to store again the same events I could start from specific time (probably match with a Kafka offset) and remove the files with prefix date newer than this time.

The only scenario for this case is when for some reason Flink is writing bad files (events with wrong information for some reason), that need to be stored (processed) again.

For 2), my implementation with the trigger solved this.

[1] https://github.com/apache/flink/blob/master/docs/dev/connectors/streamfile_sink.md

On Tue, Jan 14, 2020 at 6:28 PM Till Rohrmann <[hidden email]> wrote:
Hi David,

I'm pulling in Kostas who worked on the StreamingFileSink and might be able to answer some of your questions.

Cheers,
Till

On Mon, Jan 13, 2020 at 2:45 PM Leonard Xu <[hidden email]> wrote:
Hi, David

For you first description, I’m a little confused about duplicated records when backfilling, could you describe your usage scenario/code more?
 
I remembered a backfill user solution from Pinterest which is very similar to yours and using Flink too[1], hope that can help you.

Best,
Leonard


在 2020年1月10日,12:14,David Magalhães <[hidden email]> 写道:

Hi, I'm working for the first time with Flink and I'm trying to create solution that will store events from Kafka into Parquet files in S3. This also should support re-injection of events from Parquet files into a Kafka topic.

Here is the code with a simple usage of StreamingFileSink with BulkEncode that will get the events and store in parquet files. The files will be partition by account_id and year and month (yyyyMM). The issue with this approach is when running the backfill from a certain point in time, it will be hard to not generate duplicated events, since we will not override the same files, as the filename is generate by "part-<sub_task_id>-<sequencial_number>".

To add predictability, I've used a tumbling window to aggregate multiple GenericRecord, in order to write the parquet file with a list of them. For that I've created a custom file sink, but I'm not sure of the properties I am going to lose compared to the Streaming File Sink. Here is the code. Still, there is something missing in this solution to close a window for with a giving timeout, so it can write into the sink the last events if no more events are sent.

Another work around, would be create a StreamingFileSink with a RowEncoder, and receive a List of GenericRecord, and create a custom Encoder with AvroParquetWritter to write to a File. This way I have access to a custom rolling policy. But this looks like truly inefficient. Here is the code.

Am I overthinking this solution ? I'm know there are some issues (recently closed) for the StreamingFileSink to support more custom rolling policies in BulkEncode, like https://issues.apache.org/jira/browse/FLINK-13027, but I just notice that now.

Reply | Threaded
Open this post in threaded view
|

Re: Custom File Sink using EventTime and defined custom file name for parquet file

Kostas Kloudas-5
Oops, sorry for not sending the reply to everyone
and thanks David for reposting it here.
Great to hear that you solved your issue!

Kostas



On Wed, Jan 15, 2020 at 1:57 PM David Magalhães <[hidden email]> wrote:

>
> Sorry, I've only saw the replies today.
>
> Regarding my previous email,
>
>> Still, there is something missing in this solution to close a window for with a giving timeout, so it can write into the sink the last events if no more events are sent.
>
>
> I've fixed this using a custom trigger,
>
> val flag = ctx.getPartitionedState(valueStateDescriptor).value()
>
> // Flag only used to register one trigger per window. Flag is cleaned when FIRE action is executed.
> if (!flag) {
>   val delay = window.getEnd - window.getStart
>   ctx.getPartitionedState(valueStateDescriptor).update(true)
>   ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime + delay)
>   ctx.registerEventTimeTimer(window.maxTimestamp())
> }
>
> TriggerResult.CONTINUE
>
> Leonard, by "duplicated events" I mean store the same event on different parquet files, since the file format was "part-X-Y". So, if I start to process the same stream again (from a point in the past) I couldn't overwrite the files with exactly the same name.
>
> I think I've read a blogpost about them (Pinterest), I will check the video.
>
> Kostas, replied to only me, I'm adding his response here.
>
>> Hi David,
>> I skimmed through the solution with the window before the sink.
>> If this solution fits your needs, I think you could:
>> 1)  just specify a BucketAssigner instead of writing a custom sink,
>> this will allow you to not lose any functionality from the
>> StreamingFileSink
>> 2)  for the timeout requirement, you could use a (keyed) process
>> function with map state to hold your event-time windows. The key will
>> be the window start (or interval) and you can register timers to fire
>> at the end of the window or after a certain period of inactivity. I
>> think that [1] can be a good starting point.
>> I hope this helps,
>> Kostas
>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>
>
> I think I can only define partition name on BucketAssigner, because I don't want to have many partition (currently I've accountId and yyyyMM (year and month)). I've checked that on Flink 1.10 [1] we will have access to configure a prefix and suffix for the filename, where I could add the day and hour to the prefix, and when I needed to store again the same events I could start from specific time (probably match with a Kafka offset) and remove the files with prefix date newer than this time.
>
> The only scenario for this case is when for some reason Flink is writing bad files (events with wrong information for some reason), that need to be stored (processed) again.
>
> For 2), my implementation with the trigger solved this.
>
> [1] https://github.com/apache/flink/blob/master/docs/dev/connectors/streamfile_sink.md
>
> On Tue, Jan 14, 2020 at 6:28 PM Till Rohrmann <[hidden email]> wrote:
>>
>> Hi David,
>>
>> I'm pulling in Kostas who worked on the StreamingFileSink and might be able to answer some of your questions.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jan 13, 2020 at 2:45 PM Leonard Xu <[hidden email]> wrote:
>>>
>>> Hi, David
>>>
>>> For you first description, I’m a little confused about duplicated records when backfilling, could you describe your usage scenario/code more?
>>>
>>> I remembered a backfill user solution from Pinterest which is very similar to yours and using Flink too[1], hope that can help you.
>>>
>>> Best,
>>> Leonard
>>>
>>> [1] https://www.youtube.com/watch?v=3-X6FJ5JS4E&list=PLDX4T_cnKjD207Aa8b5CsZjc7Z_KRezGz&index=64
>>>
>>> 在 2020年1月10日,12:14,David Magalhães <[hidden email]> 写道:
>>>
>>> Hi, I'm working for the first time with Flink and I'm trying to create solution that will store events from Kafka into Parquet files in S3. This also should support re-injection of events from Parquet files into a Kafka topic.
>>>
>>> Here is the code with a simple usage of StreamingFileSink with BulkEncode that will get the events and store in parquet files. The files will be partition by account_id and year and month (yyyyMM). The issue with this approach is when running the backfill from a certain point in time, it will be hard to not generate duplicated events, since we will not override the same files, as the filename is generate by "part-<sub_task_id>-<sequencial_number>".
>>>
>>> To add predictability, I've used a tumbling window to aggregate multiple GenericRecord, in order to write the parquet file with a list of them. For that I've created a custom file sink, but I'm not sure of the properties I am going to lose compared to the Streaming File Sink. Here is the code. Still, there is something missing in this solution to close a window for with a giving timeout, so it can write into the sink the last events if no more events are sent.
>>>
>>> Another work around, would be create a StreamingFileSink with a RowEncoder, and receive a List of GenericRecord, and create a custom Encoder with AvroParquetWritter to write to a File. This way I have access to a custom rolling policy. But this looks like truly inefficient. Here is the code.
>>>
>>> Am I overthinking this solution ? I'm know there are some issues (recently closed) for the StreamingFileSink to support more custom rolling policies in BulkEncode, like https://issues.apache.org/jira/browse/FLINK-13027, but I just notice that now.
>>>
>>>