Translating StreamTableSource[T] to DynamicTableSource

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Translating StreamTableSource[T] to DynamicTableSource

Yuval Itzchakov
Hi,

I have a StreamTableSource[Row] which I would like to translate into the new DynamicTableSource API. I have a complex source which has two stages:

Untitled Diagram.png

The first stage is a SourceFunction which has parallelism of 1 (implemented SourceFunction) and unloads data from a database to an S3 bucket into multiple files. The second stage is an AbstractStreamOperator which works with varying parallelism, takes the paths into that S3 bucket and reads them in parallel, producing Rows into the stream.

With StreamTableSource[T], I had the expressive power to do this since the API has a function returning a DataStream[T]

image.png

However, in the new DynamicTableSource, I am expected to return a ScanRuntimeProvider, which only has an implementation for SourceFunction:

image.png

This means that I can no longer logically split my source into two different parts and control the parallelism of each stage, which is necessary for the way the source works.

Is there any way to get around this limitation at the moment? 

--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: Translating StreamTableSource[T] to DynamicTableSource

Jingsong Li
Hi Yuval,

After a cursory look at your source, it seems that it fits the FLIP-27 source model. [1]
You can implement the logic of the previous SourceFunction on the enumerator.
This is the way I recommend.

If you don't want to modify it for the time being, you can also use DataStream. The DataStreamScanProvider is already supported in master (1.12) for the new table source. [2]

But, both FLIP-27 SourceProvider and DataStreamScanProvider are supported in master (1.12) for the new table source instead of Flink 1.11.


On Mon, Nov 2, 2020 at 2:17 PM Yuval Itzchakov <[hidden email]> wrote:
Hi,

I have a StreamTableSource[Row] which I would like to translate into the new DynamicTableSource API. I have a complex source which has two stages:

Untitled Diagram.png

The first stage is a SourceFunction which has parallelism of 1 (implemented SourceFunction) and unloads data from a database to an S3 bucket into multiple files. The second stage is an AbstractStreamOperator which works with varying parallelism, takes the paths into that S3 bucket and reads them in parallel, producing Rows into the stream.

With StreamTableSource[T], I had the expressive power to do this since the API has a function returning a DataStream[T]

image.png

However, in the new DynamicTableSource, I am expected to return a ScanRuntimeProvider, which only has an implementation for SourceFunction:

image.png

This means that I can no longer logically split my source into two different parts and control the parallelism of each stage, which is necessary for the way the source works.

Is there any way to get around this limitation at the moment? 

--
Best Regards,
Yuval Itzchakov.


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Translating StreamTableSource[T] to DynamicTableSource

Yuval Itzchakov
Hi Jingsong,
Thanks for the prompt reply.

I have already modified the source to work with the new Source API, but that isn't supporte yet either in 1.11 (as for my previous email :))

Is there an ETA on 1.12?

On Mon, Nov 2, 2020 at 8:29 AM Jingsong Li <[hidden email]> wrote:
Hi Yuval,

After a cursory look at your source, it seems that it fits the FLIP-27 source model. [1]
You can implement the logic of the previous SourceFunction on the enumerator.
This is the way I recommend.

If you don't want to modify it for the time being, you can also use DataStream. The DataStreamScanProvider is already supported in master (1.12) for the new table source. [2]

But, both FLIP-27 SourceProvider and DataStreamScanProvider are supported in master (1.12) for the new table source instead of Flink 1.11.


On Mon, Nov 2, 2020 at 2:17 PM Yuval Itzchakov <[hidden email]> wrote:
Hi,

I have a StreamTableSource[Row] which I would like to translate into the new DynamicTableSource API. I have a complex source which has two stages:

Untitled Diagram.png

The first stage is a SourceFunction which has parallelism of 1 (implemented SourceFunction) and unloads data from a database to an S3 bucket into multiple files. The second stage is an AbstractStreamOperator which works with varying parallelism, takes the paths into that S3 bucket and reads them in parallel, producing Rows into the stream.

With StreamTableSource[T], I had the expressive power to do this since the API has a function returning a DataStream[T]

image.png

However, in the new DynamicTableSource, I am expected to return a ScanRuntimeProvider, which only has an implementation for SourceFunction:

image.png

This means that I can no longer logically split my source into two different parts and control the parallelism of each stage, which is necessary for the way the source works.

Is there any way to get around this limitation at the moment? 

--
Best Regards,
Yuval Itzchakov.


--
Best, Jingsong Lee


--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: Translating StreamTableSource[T] to DynamicTableSource

Jingsong Li
Hi Yuval,

The 1.12 feature cut deadline is November 8, and it is estimated that the test and bugfix will take a month, and may be around December 8.
CC: [hidden email] 

Best,
Jingsong

On Mon, Nov 2, 2020 at 2:37 PM Yuval Itzchakov <[hidden email]> wrote:
Hi Jingsong,
Thanks for the prompt reply.

I have already modified the source to work with the new Source API, but that isn't supporte yet either in 1.11 (as for my previous email :))

Is there an ETA on 1.12?

On Mon, Nov 2, 2020 at 8:29 AM Jingsong Li <[hidden email]> wrote:
Hi Yuval,

After a cursory look at your source, it seems that it fits the FLIP-27 source model. [1]
You can implement the logic of the previous SourceFunction on the enumerator.
This is the way I recommend.

If you don't want to modify it for the time being, you can also use DataStream. The DataStreamScanProvider is already supported in master (1.12) for the new table source. [2]

But, both FLIP-27 SourceProvider and DataStreamScanProvider are supported in master (1.12) for the new table source instead of Flink 1.11.


On Mon, Nov 2, 2020 at 2:17 PM Yuval Itzchakov <[hidden email]> wrote:
Hi,

I have a StreamTableSource[Row] which I would like to translate into the new DynamicTableSource API. I have a complex source which has two stages:

Untitled Diagram.png

The first stage is a SourceFunction which has parallelism of 1 (implemented SourceFunction) and unloads data from a database to an S3 bucket into multiple files. The second stage is an AbstractStreamOperator which works with varying parallelism, takes the paths into that S3 bucket and reads them in parallel, producing Rows into the stream.

With StreamTableSource[T], I had the expressive power to do this since the API has a function returning a DataStream[T]

image.png

However, in the new DynamicTableSource, I am expected to return a ScanRuntimeProvider, which only has an implementation for SourceFunction:

image.png

This means that I can no longer logically split my source into two different parts and control the parallelism of each stage, which is necessary for the way the source works.

Is there any way to get around this limitation at the moment? 

--
Best Regards,
Yuval Itzchakov.


--
Best, Jingsong Lee


--
Best Regards,
Yuval Itzchakov.


--
Best, Jingsong Lee