Questions about implementing a flink source

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Questions about implementing a flink source

Evan Palmer

Hello, I’m new to Flink. I’m trying to write a source for Pub/Sub Lite which is a partition based Pub/Sub product, and I have a few questions.


1.


I saw that there are two sets of interfaces used in existing sources: The RichSourceFunction, and the set of interfaces from FLIP-27. It seems like the Source interfaces are preferred for new sources, but I wanted to be sure.


2.


I’m having a little bit of trouble working out how when the currentParallelism returned by the SplitEnumeratorContext [1] can change, and how a source should react to that.


For context, I’m currently thinking about single partitions as “splits”, so a source would have an approximately constant number of splits which each has an potentially unbounded amount of work (at least in continuous mode). Each split will be assigned to some SourceReader by the split enumerator. If the value of currentParallelism changes, it seems like I’ll need to find a way to redistribute my partitions over SourceReaders, or else I'll end up with an unbalanced distribution of partitions to SourceReaders.


I looked at the docs on elastic scaling [2], and it seems like when the parallelism of the source changes, the source will be checkpointed and restored. I think this would mean all the SourceReaders get restarted, and their splits are returned to the SplitEnumerator for reassignment. Is this approximately correct?


[1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.html#currentParallelism--

[2] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/elastic_scaling/


Reply | Threaded
Open this post in threaded view
|

Re: Questions about implementing a flink source

Arvid Heise-4
Hi Evan,

1) You are absolutely correct that we would urge users to add new sources as FLIP-27 and new sinks as FLIP-143. I can provide guidance in both cases.
For FLIP-27 sources, I'd recommend using KafkaSource [1] and FileSource [2] as a starting point. Especially basing the reader implementation on SingleThreadMultiplexSourceReaderBase will give you some performance boost over naive implementations.
It is probably initially overwhelming but there is lots of thought behind the Source interface. We plan on having better documentation and more examples in the next months to ease the ramp up but it's also kind of a hen-egg problem.

I can also provide guidance outside of the ML if it's easier.

2) You are right, the currentParallelism is static in respect to the creation of the SourceReaders. Any change to the parallelism would also cause a recreation of the readers.
Splits are usually checkpointed alongside the readers. On recovery, the readers are restored with their old splits. Only when splits cannot be recovered in the context of a reader (for example downscaling), the splits would be re-added to the enumerator.

Rebalancing can happen in SplitEnumerator#addReader or #handleSplitRequest. The Kafka and File source use even different approaches with eager and lazy initialization respectively. Further, you can send arbitrary events between the enumerator and readers to work out the rebalancing. In theory, you can also dynamically rebalance splits, however, you lose ordering guarantees of the messages at the moment (if you have records r1, r2 in this order in split s and you reassign s, then you may end up with r2, r1 in the sink).


On Mon, May 3, 2021 at 1:40 AM Evan Palmer <[hidden email]> wrote:

Hello, I’m new to Flink. I’m trying to write a source for Pub/Sub Lite which is a partition based Pub/Sub product, and I have a few questions.


1.


I saw that there are two sets of interfaces used in existing sources: The RichSourceFunction, and the set of interfaces from FLIP-27. It seems like the Source interfaces are preferred for new sources, but I wanted to be sure.


2.


I’m having a little bit of trouble working out how when the currentParallelism returned by the SplitEnumeratorContext [1] can change, and how a source should react to that.


For context, I’m currently thinking about single partitions as “splits”, so a source would have an approximately constant number of splits which each has an potentially unbounded amount of work (at least in continuous mode). Each split will be assigned to some SourceReader by the split enumerator. If the value of currentParallelism changes, it seems like I’ll need to find a way to redistribute my partitions over SourceReaders, or else I'll end up with an unbalanced distribution of partitions to SourceReaders.


I looked at the docs on elastic scaling [2], and it seems like when the parallelism of the source changes, the source will be checkpointed and restored. I think this would mean all the SourceReaders get restarted, and their splits are returned to the SplitEnumerator for reassignment. Is this approximately correct?


[1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.html#currentParallelism--

[2] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/elastic_scaling/


Reply | Threaded
Open this post in threaded view
|

Re: Questions about implementing a flink source

Evan Palmer
Hi Arvid, thank you so much for the detailed reply!

A few replies / questions inline. Somewhat relatedly, I'm also wondering where this connector should live. I saw that there's already a pubsub connector in https://github.com/apache/flink/tree/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub, so if flink is willing to host it, perhaps it could live near there?  Alternatively, it could live alongside our client library in https://github.com/googleapis/java-pubsublite

On Mon, May 3, 2021 at 1:54 PM Arvid Heise <[hidden email]> wrote:
Hi Evan,

1) You are absolutely correct that we would urge users to add new sources as FLIP-27 and new sinks as FLIP-143. I can provide guidance in both cases.
For FLIP-27 sources, I'd recommend using KafkaSource [1] and FileSource [2] as a starting point. Especially basing the reader implementation on SingleThreadMultiplexSourceReaderBase will give you some performance boost over naive implementations.
It is probably initially overwhelming but there is lots of thought behind the Source interface. We plan on having better documentation and more examples in the next months to ease the ramp up but it's also kind of a hen-egg problem.

Okay, great, the Source interface seems much easier to work with. I haven't gotten around to thinking about our Sink yet, but I'm sure I'll have some questions when I do :)

I read through SourceReaderBase and SingleThreadMultiplexSourceReaderBase. It seems like these base implementations are mostly designed to help in cases where the client library uses a synchronous pull based approach. Our client library is async - we use a bidirectional stream to pull messages from our brokers and we have some flow control settings to limit the number of bytes and messages outstanding to the client. I'm wondering if because of this, we should just implement the SourceReader interface. In particular, we have a per partition subscriber class which buffers messages up to the flow control limit and exposes an API almost identical to SourceReader's pollNext and IsAvailable. What do you think?

I can also provide guidance outside of the ML if it's easier.

2) You are right, the currentParallelism is static in respect to the creation of the SourceReaders. Any change to the parallelism would also cause a recreation of the readers.
Splits are usually checkpointed alongside the readers. On recovery, the readers are restored with their old splits. Only when splits cannot be recovered in the context of a reader (for example downscaling), the splits would be re-added to the enumerator.

Rebalancing can happen in SplitEnumerator#addReader or #handleSplitRequest. The Kafka and File source use even different approaches with eager and lazy initialization respectively. Further, you can send arbitrary events between the enumerator and readers to work out the rebalancing. In theory, you can also dynamically rebalance splits, however, you lose ordering guarantees of the messages at the moment (if you have records r1, r2 in this order in split s and you reassign s, then you may end up with r2, r1 in the sink).

Ah, okay, this helped a lot. I'd missed that rebalancing dynamically would break ordering guarantees, so when I read through the Kafka source, I was really confused by the lack of rebalancing.

On Mon, May 3, 2021 at 1:40 AM Evan Palmer <[hidden email]> wrote:

Hello, I’m new to Flink. I’m trying to write a source for Pub/Sub Lite which is a partition based Pub/Sub product, and I have a few questions.


1.


I saw that there are two sets of interfaces used in existing sources: The RichSourceFunction, and the set of interfaces from FLIP-27. It seems like the Source interfaces are preferred for new sources, but I wanted to be sure.


2.


I’m having a little bit of trouble working out how when the currentParallelism returned by the SplitEnumeratorContext [1] can change, and how a source should react to that.


For context, I’m currently thinking about single partitions as “splits”, so a source would have an approximately constant number of splits which each has an potentially unbounded amount of work (at least in continuous mode). Each split will be assigned to some SourceReader by the split enumerator. If the value of currentParallelism changes, it seems like I’ll need to find a way to redistribute my partitions over SourceReaders, or else I'll end up with an unbalanced distribution of partitions to SourceReaders.


I looked at the docs on elastic scaling [2], and it seems like when the parallelism of the source changes, the source will be checkpointed and restored. I think this would mean all the SourceReaders get restarted, and their splits are returned to the SplitEnumerator for reassignment. Is this approximately correct?


[1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.html#currentParallelism--

[2] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/elastic_scaling/


Reply | Threaded
Open this post in threaded view
|

Re: Questions about implementing a flink source

Arvid Heise-4
Hi Evan,

A few replies / questions inline. Somewhat relatedly, I'm also wondering where this connector should live. I saw that there's already a pubsub connector in https://github.com/apache/flink/tree/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub, so if flink is willing to host it, perhaps it could live near there?  Alternatively, it could live alongside our client library in https://github.com/googleapis/java-pubsublite.

For a long time, the community has been thinking of moving (most) connectors out of the repository. Especially now with the new source/sink interface, the need to decouple Flink release cycle and connector release cycle is bigger than ever as we do not backport features in our bugfix branches. Thus, Pubsub Lite would only be available in Flink 1.14 and many users would need to wait up to a year to effectively use the source (adaption of new Flink versions is usually slow).
Therefore, I'd definitely encourage you to have the connector along your client library - where the release cycles probably also much better align. I will soon present an idea on how to list all available connectors on Flink's connector page such that from a user's perspective, it wouldn't matter if it's internal and external. If it turns out that the community rather wants to have all connectors still in the main repo, we can look at contributing it at a later point in time.

I read through SourceReaderBase and SingleThreadMultiplexSourceReaderBase. It seems like these base implementations are mostly designed to help in cases where the client library uses a synchronous pull based approach. Our client library is async - we use a bidirectional stream to pull messages from our brokers and we have some flow control settings to limit the number of bytes and messages outstanding to the client. I'm wondering if because of this, we should just implement the SourceReader interface. In particular, we have a per partition subscriber class which buffers messages up to the flow control limit and exposes an API almost identical to SourceReader's pollNext and IsAvailable. What do you think?

Good catch. Yes, the implementation is more or less simulating the async fetching that your library apparently offers already. So feel free to skip it. Of course, if it turns out that you still need certain building blocks, such as record handover, we can also discuss pulling up a common base class to the async sources and the SingleThreadMultiplexSourceReaderBase.
Ah, okay, this helped a lot. I'd missed that rebalancing dynamically would break ordering guarantees, so when I read through the Kafka source, I was really confused by the lack of rebalancing.

We have some ideas on how to make it more dynamic but they are very far down the road and we can hopefully implement them in a transparent way to the sources.

On Fri, May 7, 2021 at 11:23 PM Evan Palmer <[hidden email]> wrote:
Hi Arvid, thank you so much for the detailed reply!

A few replies / questions inline. Somewhat relatedly, I'm also wondering where this connector should live. I saw that there's already a pubsub connector in https://github.com/apache/flink/tree/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub, so if flink is willing to host it, perhaps it could live near there?  Alternatively, it could live alongside our client library in https://github.com/googleapis/java-pubsublite

On Mon, May 3, 2021 at 1:54 PM Arvid Heise <[hidden email]> wrote:
Hi Evan,

1) You are absolutely correct that we would urge users to add new sources as FLIP-27 and new sinks as FLIP-143. I can provide guidance in both cases.
For FLIP-27 sources, I'd recommend using KafkaSource [1] and FileSource [2] as a starting point. Especially basing the reader implementation on SingleThreadMultiplexSourceReaderBase will give you some performance boost over naive implementations.
It is probably initially overwhelming but there is lots of thought behind the Source interface. We plan on having better documentation and more examples in the next months to ease the ramp up but it's also kind of a hen-egg problem.

Okay, great, the Source interface seems much easier to work with. I haven't gotten around to thinking about our Sink yet, but I'm sure I'll have some questions when I do :)

I read through SourceReaderBase and SingleThreadMultiplexSourceReaderBase. It seems like these base implementations are mostly designed to help in cases where the client library uses a synchronous pull based approach. Our client library is async - we use a bidirectional stream to pull messages from our brokers and we have some flow control settings to limit the number of bytes and messages outstanding to the client. I'm wondering if because of this, we should just implement the SourceReader interface. In particular, we have a per partition subscriber class which buffers messages up to the flow control limit and exposes an API almost identical to SourceReader's pollNext and IsAvailable. What do you think?

I can also provide guidance outside of the ML if it's easier.

2) You are right, the currentParallelism is static in respect to the creation of the SourceReaders. Any change to the parallelism would also cause a recreation of the readers.
Splits are usually checkpointed alongside the readers. On recovery, the readers are restored with their old splits. Only when splits cannot be recovered in the context of a reader (for example downscaling), the splits would be re-added to the enumerator.

Rebalancing can happen in SplitEnumerator#addReader or #handleSplitRequest. The Kafka and File source use even different approaches with eager and lazy initialization respectively. Further, you can send arbitrary events between the enumerator and readers to work out the rebalancing. In theory, you can also dynamically rebalance splits, however, you lose ordering guarantees of the messages at the moment (if you have records r1, r2 in this order in split s and you reassign s, then you may end up with r2, r1 in the sink).

Ah, okay, this helped a lot. I'd missed that rebalancing dynamically would break ordering guarantees, so when I read through the Kafka source, I was really confused by the lack of rebalancing.

On Mon, May 3, 2021 at 1:40 AM Evan Palmer <[hidden email]> wrote:

Hello, I’m new to Flink. I’m trying to write a source for Pub/Sub Lite which is a partition based Pub/Sub product, and I have a few questions.


1.


I saw that there are two sets of interfaces used in existing sources: The RichSourceFunction, and the set of interfaces from FLIP-27. It seems like the Source interfaces are preferred for new sources, but I wanted to be sure.


2.


I’m having a little bit of trouble working out how when the currentParallelism returned by the SplitEnumeratorContext [1] can change, and how a source should react to that.


For context, I’m currently thinking about single partitions as “splits”, so a source would have an approximately constant number of splits which each has an potentially unbounded amount of work (at least in continuous mode). Each split will be assigned to some SourceReader by the split enumerator. If the value of currentParallelism changes, it seems like I’ll need to find a way to redistribute my partitions over SourceReaders, or else I'll end up with an unbalanced distribution of partitions to SourceReaders.


I looked at the docs on elastic scaling [2], and it seems like when the parallelism of the source changes, the source will be checkpointed and restored. I think this would mean all the SourceReaders get restarted, and their splits are returned to the SplitEnumerator for reassignment. Is this approximately correct?


[1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.html#currentParallelism--

[2] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/elastic_scaling/


Reply | Threaded
Open this post in threaded view
|

Re: Questions about implementing a flink source

Evan Palmer
Hello again,

Thank you for all of your help so far, I have a few more questions if you have the time :)

1. Deserialization Schema

There's been some debate within my team about whether we should offer a DeserializationSchema and SerializationSchema in our source and sink.

If we include don't include the schemas, our source and sink would be implement Source<...pubsublite.Message> and Sink<...pubsublite.Message>, which is the type our client library currently returns (this type is serializable), and users could transform the messages in a map function after the source. This would make implementing the source somewhat easier, and it doesn't seem like it would be much more difficult for users. On the other hand, I looked around and didn't find any flink sources implemented without a deserialization/serialization schema, so I'm worried that this choice might make our source/sink confusing for users, or that we're missing something. What are your thoughts on this?

2. Order aware rebalancing. 

I want to make sure I understand the problem with rebalancing partitions to different SourceReaders. Does any reassignment of a pub/sub partition between SourceReaders have the potential to cause disorder, or can order be guaranteed by some variant of ensuring that the partition is assigned to only one source reader at a time?

I read through https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/learn-flink/overview/#parallel-dataflows, which made me think that if the user wanted a pipeline like

env.fromSource(source).keyBy("Message Partition", ...).sinkTo(sink)

Then if two different source tasks had messages from a single pub/sub partition, there could be disorder. We're not planning to implement any rebalancing of partitions in our source, but I wanted to make sure I can document this correctly :)

3. Reporting permanent failures in the Sink

Is it sufficient to throw an exception from Committer.commit() in the case where our sink has permanently failed in some way (e.g. the configured topic has been deleted, or the user doesn't have permissions to publish), or is there something else we should be doing?

Evan


On Mon, May 10, 2021 at 9:57 AM Arvid Heise <[hidden email]> wrote:
Hi Evan,

A few replies / questions inline. Somewhat relatedly, I'm also wondering where this connector should live. I saw that there's already a pubsub connector in https://github.com/apache/flink/tree/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub, so if flink is willing to host it, perhaps it could live near there?  Alternatively, it could live alongside our client library in https://github.com/googleapis/java-pubsublite.

For a long time, the community has been thinking of moving (most) connectors out of the repository. Especially now with the new source/sink interface, the need to decouple Flink release cycle and connector release cycle is bigger than ever as we do not backport features in our bugfix branches. Thus, Pubsub Lite would only be available in Flink 1.14 and many users would need to wait up to a year to effectively use the source (adaption of new Flink versions is usually slow).
Therefore, I'd definitely encourage you to have the connector along your client library - where the release cycles probably also much better align. I will soon present an idea on how to list all available connectors on Flink's connector page such that from a user's perspective, it wouldn't matter if it's internal and external. If it turns out that the community rather wants to have all connectors still in the main repo, we can look at contributing it at a later point in time.

Okay, thanks for the context! We will host the connector in our repository.

I read through SourceReaderBase and SingleThreadMultiplexSourceReaderBase. It seems like these base implementations are mostly designed to help in cases where the client library uses a synchronous pull based approach. Our client library is async - we use a bidirectional stream to pull messages from our brokers and we have some flow control settings to limit the number of bytes and messages outstanding to the client. I'm wondering if because of this, we should just implement the SourceReader interface. In particular, we have a per partition subscriber class which buffers messages up to the flow control limit and exposes an API almost identical to SourceReader's pollNext and IsAvailable. What do you think?

Good catch. Yes, the implementation is more or less simulating the async fetching that your library apparently offers already. So feel free to skip it. Of course, if it turns out that you still need certain building blocks, such as record handover, we can also discuss pulling up a common base class to the async sources and the 
SingleThreadMultiplexSourceReaderBase.
Ah, okay, this helped a lot. I'd missed that rebalancing dynamically would break ordering guarantees, so when I read through the Kafka source, I was really confused by the lack of rebalancing.

We have some ideas on how to make it more dynamic but they are very far down the road and we can hopefully implement them in a transparent way to the sources.

On Fri, May 7, 2021 at 11:23 PM Evan Palmer <[hidden email]> wrote:
Hi Arvid, thank you so much for the detailed reply!

A few replies / questions inline. Somewhat relatedly, I'm also wondering where this connector should live. I saw that there's already a pubsub connector in https://github.com/apache/flink/tree/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub, so if flink is willing to host it, perhaps it could live near there?  Alternatively, it could live alongside our client library in https://github.com/googleapis/java-pubsublite

On Mon, May 3, 2021 at 1:54 PM Arvid Heise <[hidden email]> wrote:
Hi Evan,

1) You are absolutely correct that we would urge users to add new sources as FLIP-27 and new sinks as FLIP-143. I can provide guidance in both cases.
For FLIP-27 sources, I'd recommend using KafkaSource [1] and FileSource [2] as a starting point. Especially basing the reader implementation on SingleThreadMultiplexSourceReaderBase will give you some performance boost over naive implementations.
It is probably initially overwhelming but there is lots of thought behind the Source interface. We plan on having better documentation and more examples in the next months to ease the ramp up but it's also kind of a hen-egg problem.

Okay, great, the Source interface seems much easier to work with. I haven't gotten around to thinking about our Sink yet, but I'm sure I'll have some questions when I do :)

I read through SourceReaderBase and SingleThreadMultiplexSourceReaderBase. It seems like these base implementations are mostly designed to help in cases where the client library uses a synchronous pull based approach. Our client library is async - we use a bidirectional stream to pull messages from our brokers and we have some flow control settings to limit the number of bytes and messages outstanding to the client. I'm wondering if because of this, we should just implement the SourceReader interface. In particular, we have a per partition subscriber class which buffers messages up to the flow control limit and exposes an API almost identical to SourceReader's pollNext and IsAvailable. What do you think?

I can also provide guidance outside of the ML if it's easier.

2) You are right, the currentParallelism is static in respect to the creation of the SourceReaders. Any change to the parallelism would also cause a recreation of the readers.
Splits are usually checkpointed alongside the readers. On recovery, the readers are restored with their old splits. Only when splits cannot be recovered in the context of a reader (for example downscaling), the splits would be re-added to the enumerator.

Rebalancing can happen in SplitEnumerator#addReader or #handleSplitRequest. The Kafka and File source use even different approaches with eager and lazy initialization respectively. Further, you can send arbitrary events between the enumerator and readers to work out the rebalancing. In theory, you can also dynamically rebalance splits, however, you lose ordering guarantees of the messages at the moment (if you have records r1, r2 in this order in split s and you reassign s, then you may end up with r2, r1 in the sink).

Ah, okay, this helped a lot. I'd missed that rebalancing dynamically would break ordering guarantees, so when I read through the Kafka source, I was really confused by the lack of rebalancing.

On Mon, May 3, 2021 at 1:40 AM Evan Palmer <[hidden email]> wrote:

Hello, I’m new to Flink. I’m trying to write a source for Pub/Sub Lite which is a partition based Pub/Sub product, and I have a few questions.


1.


I saw that there are two sets of interfaces used in existing sources: The RichSourceFunction, and the set of interfaces from FLIP-27. It seems like the Source interfaces are preferred for new sources, but I wanted to be sure.


2.


I’m having a little bit of trouble working out how when the currentParallelism returned by the SplitEnumeratorContext [1] can change, and how a source should react to that.


For context, I’m currently thinking about single partitions as “splits”, so a source would have an approximately constant number of splits which each has an potentially unbounded amount of work (at least in continuous mode). Each split will be assigned to some SourceReader by the split enumerator. If the value of currentParallelism changes, it seems like I’ll need to find a way to redistribute my partitions over SourceReaders, or else I'll end up with an unbalanced distribution of partitions to SourceReaders.


I looked at the docs on elastic scaling [2], and it seems like when the parallelism of the source changes, the source will be checkpointed and restored. I think this would mean all the SourceReaders get restarted, and their splits are returned to the SplitEnumerator for reassignment. Is this approximately correct?


[1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.html#currentParallelism--

[2] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/elastic_scaling/


Reply | Threaded
Open this post in threaded view
|

Re: Questions about implementing a flink source

Arvid Heise-4
Hi Evan,

1. I'd recommend supporting DeserializationSchema in any case similar to KafkaRecordDeserializationSchema.
First, it aligns with other sources and user expectations.
Second, it's a tad faster and the plan looks easier if you omit a chained task.
Third, you can avoid quite a bit of boilerplate code on user side by having adapters such that a user can use any existing Flink DeserializationSchema to deserialize the payload; so without writing any UDF in 80% of the use cases, the user gets the value that he wants (see KafkaValueOnlyDeserializationSchemaWrapper).
Lastly, we also plan to have first class support for invalid record handling at some point and it might be connected to DeserializationSchema.

2. It's any reassignment while there is still data flowing in the execution graph. It's always a matter if there are parallel roads from source to sink. As long as there is an old record on the road, sending new records on a different road has always the potential of new record overtaking old record.
If you could drain all data (currently not possible) without restarting, then dynamic reassignment would be safe.

Note that without backpressure, it would certainly be enough to wait a couple of seconds after unassigning a partition before reassigning it to avoid any reordering issue. Maybe you could offer a configuration option and the user has to take some responsibility.

I could also see that we could piggyback on aligned checkpoint barriers to not emit any data until the checkpoint has been completed and do the reassignment then. But that's certainly something that the framework should support and that you don't want to implement on your own.

3. Yes if you throw an IOException (or any other exception), the checkpoint would not complete and the task gets restarted (could be in an inconsistent state).

On Tue, Jun 8, 2021 at 10:51 PM Evan Palmer <[hidden email]> wrote:
Hello again,

Thank you for all of your help so far, I have a few more questions if you have the time :)

1. Deserialization Schema

There's been some debate within my team about whether we should offer a DeserializationSchema and SerializationSchema in our source and sink.

If we include don't include the schemas, our source and sink would be implement Source<...pubsublite.Message> and Sink<...pubsublite.Message>, which is the type our client library currently returns (this type is serializable), and users could transform the messages in a map function after the source. This would make implementing the source somewhat easier, and it doesn't seem like it would be much more difficult for users. On the other hand, I looked around and didn't find any flink sources implemented without a deserialization/serialization schema, so I'm worried that this choice might make our source/sink confusing for users, or that we're missing something. What are your thoughts on this?

2. Order aware rebalancing. 

I want to make sure I understand the problem with rebalancing partitions to different SourceReaders. Does any reassignment of a pub/sub partition between SourceReaders have the potential to cause disorder, or can order be guaranteed by some variant of ensuring that the partition is assigned to only one source reader at a time?

I read through https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/learn-flink/overview/#parallel-dataflows, which made me think that if the user wanted a pipeline like

env.fromSource(source).keyBy("Message Partition", ...).sinkTo(sink)

Then if two different source tasks had messages from a single pub/sub partition, there could be disorder. We're not planning to implement any rebalancing of partitions in our source, but I wanted to make sure I can document this correctly :)

3. Reporting permanent failures in the Sink

Is it sufficient to throw an exception from Committer.commit() in the case where our sink has permanently failed in some way (e.g. the configured topic has been deleted, or the user doesn't have permissions to publish), or is there something else we should be doing?

Evan


On Mon, May 10, 2021 at 9:57 AM Arvid Heise <[hidden email]> wrote:
Hi Evan,

A few replies / questions inline. Somewhat relatedly, I'm also wondering where this connector should live. I saw that there's already a pubsub connector in https://github.com/apache/flink/tree/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub, so if flink is willing to host it, perhaps it could live near there?  Alternatively, it could live alongside our client library in https://github.com/googleapis/java-pubsublite.

For a long time, the community has been thinking of moving (most) connectors out of the repository. Especially now with the new source/sink interface, the need to decouple Flink release cycle and connector release cycle is bigger than ever as we do not backport features in our bugfix branches. Thus, Pubsub Lite would only be available in Flink 1.14 and many users would need to wait up to a year to effectively use the source (adaption of new Flink versions is usually slow).
Therefore, I'd definitely encourage you to have the connector along your client library - where the release cycles probably also much better align. I will soon present an idea on how to list all available connectors on Flink's connector page such that from a user's perspective, it wouldn't matter if it's internal and external. If it turns out that the community rather wants to have all connectors still in the main repo, we can look at contributing it at a later point in time.

Okay, thanks for the context! We will host the connector in our repository.

I read through SourceReaderBase and SingleThreadMultiplexSourceReaderBase. It seems like these base implementations are mostly designed to help in cases where the client library uses a synchronous pull based approach. Our client library is async - we use a bidirectional stream to pull messages from our brokers and we have some flow control settings to limit the number of bytes and messages outstanding to the client. I'm wondering if because of this, we should just implement the SourceReader interface. In particular, we have a per partition subscriber class which buffers messages up to the flow control limit and exposes an API almost identical to SourceReader's pollNext and IsAvailable. What do you think?

Good catch. Yes, the implementation is more or less simulating the async fetching that your library apparently offers already. So feel free to skip it. Of course, if it turns out that you still need certain building blocks, such as record handover, we can also discuss pulling up a common base class to the async sources and the 
SingleThreadMultiplexSourceReaderBase.
Ah, okay, this helped a lot. I'd missed that rebalancing dynamically would break ordering guarantees, so when I read through the Kafka source, I was really confused by the lack of rebalancing.

We have some ideas on how to make it more dynamic but they are very far down the road and we can hopefully implement them in a transparent way to the sources.

On Fri, May 7, 2021 at 11:23 PM Evan Palmer <[hidden email]> wrote:
Hi Arvid, thank you so much for the detailed reply!

A few replies / questions inline. Somewhat relatedly, I'm also wondering where this connector should live. I saw that there's already a pubsub connector in https://github.com/apache/flink/tree/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub, so if flink is willing to host it, perhaps it could live near there?  Alternatively, it could live alongside our client library in https://github.com/googleapis/java-pubsublite

On Mon, May 3, 2021 at 1:54 PM Arvid Heise <[hidden email]> wrote:
Hi Evan,

1) You are absolutely correct that we would urge users to add new sources as FLIP-27 and new sinks as FLIP-143. I can provide guidance in both cases.
For FLIP-27 sources, I'd recommend using KafkaSource [1] and FileSource [2] as a starting point. Especially basing the reader implementation on SingleThreadMultiplexSourceReaderBase will give you some performance boost over naive implementations.
It is probably initially overwhelming but there is lots of thought behind the Source interface. We plan on having better documentation and more examples in the next months to ease the ramp up but it's also kind of a hen-egg problem.

Okay, great, the Source interface seems much easier to work with. I haven't gotten around to thinking about our Sink yet, but I'm sure I'll have some questions when I do :)

I read through SourceReaderBase and SingleThreadMultiplexSourceReaderBase. It seems like these base implementations are mostly designed to help in cases where the client library uses a synchronous pull based approach. Our client library is async - we use a bidirectional stream to pull messages from our brokers and we have some flow control settings to limit the number of bytes and messages outstanding to the client. I'm wondering if because of this, we should just implement the SourceReader interface. In particular, we have a per partition subscriber class which buffers messages up to the flow control limit and exposes an API almost identical to SourceReader's pollNext and IsAvailable. What do you think?

I can also provide guidance outside of the ML if it's easier.

2) You are right, the currentParallelism is static in respect to the creation of the SourceReaders. Any change to the parallelism would also cause a recreation of the readers.
Splits are usually checkpointed alongside the readers. On recovery, the readers are restored with their old splits. Only when splits cannot be recovered in the context of a reader (for example downscaling), the splits would be re-added to the enumerator.

Rebalancing can happen in SplitEnumerator#addReader or #handleSplitRequest. The Kafka and File source use even different approaches with eager and lazy initialization respectively. Further, you can send arbitrary events between the enumerator and readers to work out the rebalancing. In theory, you can also dynamically rebalance splits, however, you lose ordering guarantees of the messages at the moment (if you have records r1, r2 in this order in split s and you reassign s, then you may end up with r2, r1 in the sink).

Ah, okay, this helped a lot. I'd missed that rebalancing dynamically would break ordering guarantees, so when I read through the Kafka source, I was really confused by the lack of rebalancing.

On Mon, May 3, 2021 at 1:40 AM Evan Palmer <[hidden email]> wrote:

Hello, I’m new to Flink. I’m trying to write a source for Pub/Sub Lite which is a partition based Pub/Sub product, and I have a few questions.


1.


I saw that there are two sets of interfaces used in existing sources: The RichSourceFunction, and the set of interfaces from FLIP-27. It seems like the Source interfaces are preferred for new sources, but I wanted to be sure.


2.


I’m having a little bit of trouble working out how when the currentParallelism returned by the SplitEnumeratorContext [1] can change, and how a source should react to that.


For context, I’m currently thinking about single partitions as “splits”, so a source would have an approximately constant number of splits which each has an potentially unbounded amount of work (at least in continuous mode). Each split will be assigned to some SourceReader by the split enumerator. If the value of currentParallelism changes, it seems like I’ll need to find a way to redistribute my partitions over SourceReaders, or else I'll end up with an unbalanced distribution of partitions to SourceReaders.


I looked at the docs on elastic scaling [2], and it seems like when the parallelism of the source changes, the source will be checkpointed and restored. I think this would mean all the SourceReaders get restarted, and their splits are returned to the SplitEnumerator for reassignment. Is this approximately correct?


[1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.html#currentParallelism--

[2] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/elastic_scaling/