Processing post to sink?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Processing post to sink?

Theo
Hi there,

In my pipeline, I write data into a partitioned parquet directory via StreamingFileSink and a partitioner like:
@Override
public String getBucketId(V element, Context context) {
return "partitionkey=" + element.getPartitionkey();
}
That works well so far.

Now I need to know when all sink instances are fully "done" with a partition in order to trigger some export jobs (for machine learning/model training) and also notify Impala about the new (final) partition.

In my case, I can well define "fully done". The partitionkey is directly deduced from event time and my watermarks guarantee no late arrivals. So once a watermark passes a certain event time, I know that the prior partition is completed and can trigger my stuff. Well not directly: Once the watermark passes, I need to wait for the next checkpoint to be completed because only then, the parquet files are committed and the partition is fully written to.

The question is: How do I implement my "partition-completed"-condition check in Flink? It pretty much comes down to that I want to do some processing _after_ a Sink based on the sinks progress. (Watermark+checkpoints)

The only idea I got up with so far is: Make the sink a process-function which also emits elements. Only on a completed checkpoint, emit an element with the current watermark downstream. In the next step, assign event timestamps based on these events and merge the parallel subtasks into one, thus keeping track of the global watermark. In the task with parallelism 1, I could then issue my impala queries and export jobs. (Which should not be called by multiple parallel instances simultaneously).

Do you have any better ideas for implementation or is this the best way to go? I thought about just building a custom sink inheriting from StreamingFileSink, but I don't know how to trigger my jobs then only once . I _could_ check for my sink parallel subtask index to be something like 0 and only in that case trigger the subtasks, but I have heavy skew in my parallel instances: Some process millions of elements, whereas other process just 10 events a day. If my "notification-sink-subtask" would end up on a partition with those few events, I would get way too seldom new triggers. And I further wouldn't know if the other instances also had already committed there parquet files.

What kind of problems do I need to expect when making a sink a process-function?

Best regards
Theo
Reply | Threaded
Open this post in threaded view
|

Re: Processing post to sink?

Timothy Victor
Why not implement your own SinkFunction, or maybe inherit from the one you are using now?

Tim

On Sat, Dec 14, 2019, 8:52 AM Theo Diefenthal <[hidden email]> wrote:
Hi there,

In my pipeline, I write data into a partitioned parquet directory via StreamingFileSink and a partitioner like:
@Override
public String getBucketId(V element, Context context) {
return "partitionkey=" + element.getPartitionkey();
}
That works well so far.

Now I need to know when all sink instances are fully "done" with a partition in order to trigger some export jobs (for machine learning/model training) and also notify Impala about the new (final) partition.

In my case, I can well define "fully done". The partitionkey is directly deduced from event time and my watermarks guarantee no late arrivals. So once a watermark passes a certain event time, I know that the prior partition is completed and can trigger my stuff. Well not directly: Once the watermark passes, I need to wait for the next checkpoint to be completed because only then, the parquet files are committed and the partition is fully written to.

The question is: How do I implement my "partition-completed"-condition check in Flink? It pretty much comes down to that I want to do some processing _after_ a Sink based on the sinks progress. (Watermark+checkpoints)

The only idea I got up with so far is: Make the sink a process-function which also emits elements. Only on a completed checkpoint, emit an element with the current watermark downstream. In the next step, assign event timestamps based on these events and merge the parallel subtasks into one, thus keeping track of the global watermark. In the task with parallelism 1, I could then issue my impala queries and export jobs. (Which should not be called by multiple parallel instances simultaneously).

Do you have any better ideas for implementation or is this the best way to go? I thought about just building a custom sink inheriting from StreamingFileSink, but I don't know how to trigger my jobs then only once . I _could_ check for my sink parallel subtask index to be something like 0 and only in that case trigger the subtasks, but I have heavy skew in my parallel instances: Some process millions of elements, whereas other process just 10 events a day. If my "notification-sink-subtask" would end up on a partition with those few events, I would get way too seldom new triggers. And I further wouldn't know if the other instances also had already committed there parquet files.

What kind of problems do I need to expect when making a sink a process-function?

Best regards
Theo
Reply | Threaded
Open this post in threaded view
|

Re: Processing post to sink?

Theo
Hi Tim,

As I said:

> Do you have any better ideas for implementation or is this the best way to go? I thought about just building a custom sink inheriting from StreamingFileSink, but I don't know how to trigger my > jobs then only once . I _could_ check for my sink parallel subtask index to be something like 0 and only in that case trigger the subtasks, but I have heavy skew in my parallel instances:
> Some process millions of elements, whereas other process just 10 events a day. If my "notification-sink-subtask" would end up on a partition with those few events, I would get way too
> seldom new triggers. And I further wouldn't know if the other instances also had already committed there parquet files.

I don't know how to make that within a sink function. I kind of need a "synchronisation-barrier" after all "notify-checkpoint-complete"-calls to all sink instances. Can you tell me on how to do that in my own sink function?

Best regards
Theo


Von: "Timothy Victor" <[hidden email]>
An: "Theo Diefenthal" <[hidden email]>
CC: "user" <[hidden email]>
Gesendet: Samstag, 14. Dezember 2019 16:27:31
Betreff: Re: Processing post to sink?

Why not implement your own SinkFunction, or maybe inherit from the one you are using now?

Tim

On Sat, Dec 14, 2019, 8:52 AM Theo Diefenthal <[hidden email]> wrote:
Hi there,

In my pipeline, I write data into a partitioned parquet directory via StreamingFileSink and a partitioner like:
@Override
public String getBucketId(V element, Context context) {
return "partitionkey=" + element.getPartitionkey();
}
That works well so far.

Now I need to know when all sink instances are fully "done" with a partition in order to trigger some export jobs (for machine learning/model training) and also notify Impala about the new (final) partition.

In my case, I can well define "fully done". The partitionkey is directly deduced from event time and my watermarks guarantee no late arrivals. So once a watermark passes a certain event time, I know that the prior partition is completed and can trigger my stuff. Well not directly: Once the watermark passes, I need to wait for the next checkpoint to be completed because only then, the parquet files are committed and the partition is fully written to.

The question is: How do I implement my "partition-completed"-condition check in Flink? It pretty much comes down to that I want to do some processing _after_ a Sink based on the sinks progress. (Watermark+checkpoints)

The only idea I got up with so far is: Make the sink a process-function which also emits elements. Only on a completed checkpoint, emit an element with the current watermark downstream. In the next step, assign event timestamps based on these events and merge the parallel subtasks into one, thus keeping track of the global watermark. In the task with parallelism 1, I could then issue my impala queries and export jobs. (Which should not be called by multiple parallel instances simultaneously).

Do you have any better ideas for implementation or is this the best way to go? I thought about just building a custom sink inheriting from StreamingFileSink, but I don't know how to trigger my jobs then only once . I _could_ check for my sink parallel subtask index to be something like 0 and only in that case trigger the subtasks, but I have heavy skew in my parallel instances: Some process millions of elements, whereas other process just 10 events a day. If my "notification-sink-subtask" would end up on a partition with those few events, I would get way too seldom new triggers. And I further wouldn't know if the other instances also had already committed there parquet files.

What kind of problems do I need to expect when making a sink a process-function?

Best regards
Theo


--
SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln
Theo Diefenthal

T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575
[hidden email] - www.scoop-software.de
Sitz der Gesellschaft: Köln, Handelsregister: Köln,
Handelsregisternummer: HRB 36625
Geschäftsführung: Dr. Oleg Balovnev, Frank Heinen,
Martin Müller-Rohde, Dr. Wolfgang Reddig, Roland Scheel
Reply | Threaded
Open this post in threaded view
|

Re: Processing post to sink?

rmetzger0
Hey Theo,

your solution of turning the sink into a process function should work. I'm just not sure how easy it is to re-use the StreamingFileSink inside it.
Have you considered sending all the records to a parallelism=1 process function sitting "next" to the StreamingFileSink. You could track the watermarks and partitions in there, and listen to the "notifyCheckpointComplete()" calls. Since that ProcessFunction is receiving data at the same rate as the sink, it should align with the sinks. I'm not 100% sure if this solution really works, but I wanted to bring to see if you've considered it.

Best,
Robert



On Sat, Dec 14, 2019 at 7:08 PM Theo Diefenthal <[hidden email]> wrote:
Hi Tim,

As I said:

> Do you have any better ideas for implementation or is this the best way to go? I thought about just building a custom sink inheriting from StreamingFileSink, but I don't know how to trigger my > jobs then only once . I _could_ check for my sink parallel subtask index to be something like 0 and only in that case trigger the subtasks, but I have heavy skew in my parallel instances:
> Some process millions of elements, whereas other process just 10 events a day. If my "notification-sink-subtask" would end up on a partition with those few events, I would get way too
> seldom new triggers. And I further wouldn't know if the other instances also had already committed there parquet files.

I don't know how to make that within a sink function. I kind of need a "synchronisation-barrier" after all "notify-checkpoint-complete"-calls to all sink instances. Can you tell me on how to do that in my own sink function?

Best regards
Theo


Von: "Timothy Victor" <[hidden email]>
An: "Theo Diefenthal" <[hidden email]>
CC: "user" <[hidden email]>
Gesendet: Samstag, 14. Dezember 2019 16:27:31
Betreff: Re: Processing post to sink?

Why not implement your own SinkFunction, or maybe inherit from the one you are using now?

Tim

On Sat, Dec 14, 2019, 8:52 AM Theo Diefenthal <[hidden email]> wrote:
Hi there,

In my pipeline, I write data into a partitioned parquet directory via StreamingFileSink and a partitioner like:
@Override
public String getBucketId(V element, Context context) {
return "partitionkey=" + element.getPartitionkey();
}
That works well so far.

Now I need to know when all sink instances are fully "done" with a partition in order to trigger some export jobs (for machine learning/model training) and also notify Impala about the new (final) partition.

In my case, I can well define "fully done". The partitionkey is directly deduced from event time and my watermarks guarantee no late arrivals. So once a watermark passes a certain event time, I know that the prior partition is completed and can trigger my stuff. Well not directly: Once the watermark passes, I need to wait for the next checkpoint to be completed because only then, the parquet files are committed and the partition is fully written to.

The question is: How do I implement my "partition-completed"-condition check in Flink? It pretty much comes down to that I want to do some processing _after_ a Sink based on the sinks progress. (Watermark+checkpoints)

The only idea I got up with so far is: Make the sink a process-function which also emits elements. Only on a completed checkpoint, emit an element with the current watermark downstream. In the next step, assign event timestamps based on these events and merge the parallel subtasks into one, thus keeping track of the global watermark. In the task with parallelism 1, I could then issue my impala queries and export jobs. (Which should not be called by multiple parallel instances simultaneously).

Do you have any better ideas for implementation or is this the best way to go? I thought about just building a custom sink inheriting from StreamingFileSink, but I don't know how to trigger my jobs then only once . I _could_ check for my sink parallel subtask index to be something like 0 and only in that case trigger the subtasks, but I have heavy skew in my parallel instances: Some process millions of elements, whereas other process just 10 events a day. If my "notification-sink-subtask" would end up on a partition with those few events, I would get way too seldom new triggers. And I further wouldn't know if the other instances also had already committed there parquet files.

What kind of problems do I need to expect when making a sink a process-function?

Best regards
Theo


--
SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln
Theo Diefenthal

T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575
[hidden email] - www.scoop-software.de
Sitz der Gesellschaft: Köln, Handelsregister: Köln,
Handelsregisternummer: HRB 36625
Geschäftsführung: Dr. Oleg Balovnev, Frank Heinen,
Martin Müller-Rohde, Dr. Wolfgang Reddig, Roland Scheel