[DISCUSS] FLIP-143: Unified Sink API

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] FLIP-143: Unified Sink API

Guowei Ma
Hi, devs & users

As discussed in FLIP-131[1], Flink will deprecate the DataSet API in favor of DataStream API and Table API. Users should be able to use DataStream API to write jobs that support both bounded and unbounded execution modes. However, Flink does not provide a sink API to guarantee the Exactly-once semantics in both bounded and unbounded scenarios, which blocks the unification.

So we want to introduce a new unified sink API which could let the user develop the sink once and run it everywhere. You could find more details in FLIP-143[2]. 

The FLIP contains some open questions that I'd really appreciate inputs from the community. Some of the open questions include: 
  1. We provide two alternative Sink API in the FLIP. The only difference between the two versions is how to expose the state to the user. We want to know which one is your preference?
  2. How does the sink API support to write to the Hive? 
  3. Is the sink an operator or a topology?

Best,
Guowei
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-143: Unified Sink API

Steven Wu
Guowei,

Thanks a lot for the proposal and starting the discussion thread. Very excited. 

For the big question of "Is the sink an operator or a topology?", I have a few related sub questions.
* Where should we run the committers? 
* Is the committer parallel or single parallelism?
* Can a single choice satisfy all sinks? 

Trying to envision how some sinks can be implemented with this new unified sink interface.

1. Kafka sink

Kafka supports non-transactional and transactional writes
* Non-transaction writes don't need commit action. we can have parallel writers and no/no-op committers. This is probably true for other non-transactional message queues.
* Transaction writes can be implemented as parallel writers and parallel committers. In this case, I don't know if it makes sense to separate writers and committers into two separate operators, because they probably need to share the same KafkaProducer object.

Either way, both writers and committers probably should run inside task managers.

2. ES sink 

ES sink typically buffers the data up to a certain size or time threshold and then uploads/commits a batch to ES. Writers buffer data and flush when needed, and committer does the HTTP bulk upload to commit. To avoid serialization/deserialization cost, we should run parallel writers and parallel committers and they should be chained or bundled together while running inside task managers

It can also be implemented as parallel writers and no/no-op committers, where all logics (batching and upload) are put inside the writers.

3. Iceberg [1] sink 

It is currently implemented as two-stage operators with parallel writers and single-parallelism committers.
* parallel writers that write records into data files. Upon checkpoint, writers flush and upload the files, and send the metadata/location of the data files to the downstream committer. Writers need to do the flush inside the "prepareSnapshotPreBarrier" method (NOT "snapshotState" method) before forwarding the checkpoint barrier to the committer
* single-parallelism committer operator. It collects data files from upstream writers. During "snapshotState", it saves collected data files (or an uber metadata file) into state. When the checkpoint is completed, inside "notifyCheckpointComplete" it commits those data files to Iceberg tables. The committer has to be single parallelism, because we don't want hundreds or thousands of parallel committers to compete for commit operations with opportunistic concurrency control. It will be very inefficient and probably infeasible if the parallelism is high. Too many tiny commits/transactions can also slow down both the table write and read paths due to too many manifest files.

Right now, both Iceberg writer and committer operators run inside task managers. It has one major drawback. With Iceberg sink, embarrassingly parallel jobs won't be embarrassingly parallel anymore. That breaks the benefit of region recovery for embarrassingly parallel DAG. Conceptually, the Writer-Committer sink pattern is like the mirroring of the FLIP-27 Enumerator-Reader source pattern. It will be better if the committer can run inside the job manager like the SplitEnumerator for the FLIP-27 source.

-----------------------
Additional questions regarding the doc/API
* Any example for the writer shared state (Writer#snapshotSharedState)?
* We allow the case where the writer has no state, right? Meaning WriterS can be Void.

[1] https://iceberg.apache.org/

Thanks,
Steven

On Thu, Sep 10, 2020 at 6:44 AM Guowei Ma <[hidden email]> wrote:
Hi, devs & users

As discussed in FLIP-131[1], Flink will deprecate the DataSet API in favor of DataStream API and Table API. Users should be able to use DataStream API to write jobs that support both bounded and unbounded execution modes. However, Flink does not provide a sink API to guarantee the Exactly-once semantics in both bounded and unbounded scenarios, which blocks the unification.

So we want to introduce a new unified sink API which could let the user develop the sink once and run it everywhere. You could find more details in FLIP-143[2]. 

The FLIP contains some open questions that I'd really appreciate inputs from the community. Some of the open questions include: 
  1. We provide two alternative Sink API in the FLIP. The only difference between the two versions is how to expose the state to the user. We want to know which one is your preference?
  2. How does the sink API support to write to the Hive? 
  3. Is the sink an operator or a topology?

Best,
Guowei