Hello,
I am using Flink v1.10 in a distributed environment to run SQL queries on batch and streaming data. In my setup, data is sharded and distributed across the cluster. Each shard receives streaming updates from some external source. I wish to minimize data movement during query evaluation for performance reasons. For that, I need some construct to advise Flink planner to bind splits (shard) to the host where it is located. I have come across InputSplitAssigner which gives me levers to influence compute colocation for batch queries. Is there a way to do the same for streaming queries as well? Regards, Satyam |
Hi Satyam,
I think you can use the InputSplitAssigner also for streaming pipelines through an InputFormat. You can use StreamExecutionEnvironment#createInput or for SQL you can write your source according to the documentation here: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html#dynamic-table-source If you do not want to use an InputFormat I think there is no easy way to do it now. Best, Dawid On 29/07/2020 13:53, Satyam Shekhar wrote: > Hello, > > I am using Flink v1.10 in a distributed environment to run SQL queries > on batch and streaming data. > > In my setup, data is sharded and distributed across the cluster. Each > shard receives streaming updates from some external source. I wish to > minimize data movement during query evaluation for performance > reasons. For that, I need some construct to advise Flink planner to > bind splits (shard) to the host where it is located. > > I have come across InputSplitAssigner which gives me levers to > influence compute colocation for batch queries. Is there a way to do > the same for streaming queries as well? > > Regards, > Satyam signature.asc (849 bytes) Download Attachment |
Hi Dawid, I am currently on Flink v1.10. Do streaming pipelines support unbounded InputFormat in v1.10? My current setup uses SourceFunction for streaming pipeline and InputFormat for batch queries. I see the documentation for Flink v1.11 describe concepts for Split, SourceReader, and SplitEnumerator to enable streaming queries on unbounded splits. Is that the direction you were pointing to? Regards, Satyam On Thu, Jul 30, 2020 at 6:03 AM Dawid Wysakowicz <[hidden email]> wrote: Hi Satyam, |
Hi Satyam, It should be fine to have unbounded InputFormat. The important thing is not to produce more splits than there are parallel instances of your source. In createInputSplits(int minNumSplits) generate only minNumSplits. It is so that all splits can be assigned immediately. Unfortunately you won't have access to the state in InputFormat. Now that I am thinking this will be problematic with checkpoints as you cannot store the offset, up to when you've read the split. In the SourceFunction stack as far as I know there is no built-in
support for that. As an alternative you could maybe built-in the
split assignment into the SourceFunction. Unfortunately as it
would not happen in a single location you would have to ensure
that the logic can assign all the splits independently in each of
the parallel instances of the source. The Split, SourceReader, and SplitEnumerator are new components introduced in FLIP-27[1]. I am not very familiar with those yet. Unfortunately those are not yet supported in the Table ecosystem. I also don't know if it is possible to assign the splits based on the host machine with them. I am cc'ing Stephan and Becket who worked on those to check if it is already possible with the interfaces. Best, Dawid
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface On 31/07/2020 02:58, Satyam Shekhar
wrote:
signature.asc (849 bytes) Download Attachment |
Free forum by Nabble | Edit this page |