Customize file assignments logic in flink application

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Customize file assignments logic in flink application

Lu Niu
Hi,

I have a data set backed by a directory of files in which file names are meaningful.

folder1
   +-----file01
   +-----file02
   +-----file03
   +-----file04

I want to control the file assignments in my flink application. For example, when parallelism is 2, worker 1 get file01 and file02 to read and worker2 get 3 and 4. Also each worker get 2 files all at once because reading requires jumping back and forth between those two files.

What's the best way to do this? It seems like FileInputFormat is not extensible in this case. 

Best
Lu


Reply | Threaded
Open this post in threaded view
|

Re: Customize file assignments logic in flink application

Zhu Zhu
Hi Lu,

Implementing your own InputFormat and InputSplitAssigner(which has the interface "InputSplit getNextInputSplit(String host, int taskId)") created by it should work if you want to assign InputSplit to tasks according to the task index and file name patterns.
To assign 2 InputSplits in one request, you can implement a new InputSplit which wraps multiple FileInputSplits. And you may need to define in your InputFormat on how to process the new InputSplit.

Thanks,
Zhu Zhu

Lu Niu <[hidden email]> 于2019年8月15日周四 上午12:26写道:
Hi,

I have a data set backed by a directory of files in which file names are meaningful.

folder1
   +-----file01
   +-----file02
   +-----file03
   +-----file04

I want to control the file assignments in my flink application. For example, when parallelism is 2, worker 1 get file01 and file02 to read and worker2 get 3 and 4. Also each worker get 2 files all at once because reading requires jumping back and forth between those two files.

What's the best way to do this? It seems like FileInputFormat is not extensible in this case. 

Best
Lu


Reply | Threaded
Open this post in threaded view
|

Re: Customize file assignments logic in flink application

Lu Niu
Hi, Zhu

Thanks for reply! I found using SplittableIterator is also doable to some extent. How to choose between these two?

Best
Lu

On Wed, Aug 14, 2019 at 8:02 PM Zhu Zhu <[hidden email]> wrote:
Hi Lu,

Implementing your own InputFormat and InputSplitAssigner(which has the interface "InputSplit getNextInputSplit(String host, int taskId)") created by it should work if you want to assign InputSplit to tasks according to the task index and file name patterns.
To assign 2 InputSplits in one request, you can implement a new InputSplit which wraps multiple FileInputSplits. And you may need to define in your InputFormat on how to process the new InputSplit.

Thanks,
Zhu Zhu

Lu Niu <[hidden email]> 于2019年8月15日周四 上午12:26写道:
Hi,

I have a data set backed by a directory of files in which file names are meaningful.

folder1
   +-----file01
   +-----file02
   +-----file03
   +-----file04

I want to control the file assignments in my flink application. For example, when parallelism is 2, worker 1 get file01 and file02 to read and worker2 get 3 and 4. Also each worker get 2 files all at once because reading requires jumping back and forth between those two files.

What's the best way to do this? It seems like FileInputFormat is not extensible in this case. 

Best
Lu


Reply | Threaded
Open this post in threaded view
|

Re: Customize file assignments logic in flink application

Zhu Zhu
Hi Lu,

I think it's OK to choose any way as long as it works.
Though I've no idea how you would extend SplittableIterator in your case. The underlying is ParallelIteratorInputFormat and its processing is not matched to a certain subtask index.

Thanks,
Zhu Zhu

Lu Niu <[hidden email]> 于2019年8月16日周五 上午12:48写道:
Hi, Zhu

Thanks for reply! I found using SplittableIterator is also doable to some extent. How to choose between these two?

Best
Lu

On Wed, Aug 14, 2019 at 8:02 PM Zhu Zhu <[hidden email]> wrote:
Hi Lu,

Implementing your own InputFormat and InputSplitAssigner(which has the interface "InputSplit getNextInputSplit(String host, int taskId)") created by it should work if you want to assign InputSplit to tasks according to the task index and file name patterns.
To assign 2 InputSplits in one request, you can implement a new InputSplit which wraps multiple FileInputSplits. And you may need to define in your InputFormat on how to process the new InputSplit.

Thanks,
Zhu Zhu

Lu Niu <[hidden email]> 于2019年8月15日周四 上午12:26写道:
Hi,

I have a data set backed by a directory of files in which file names are meaningful.

folder1
   +-----file01
   +-----file02
   +-----file03
   +-----file04

I want to control the file assignments in my flink application. For example, when parallelism is 2, worker 1 get file01 and file02 to read and worker2 get 3 and 4. Also each worker get 2 files all at once because reading requires jumping back and forth between those two files.

What's the best way to do this? It seems like FileInputFormat is not extensible in this case. 

Best
Lu


Reply | Threaded
Open this post in threaded view
|

Re: Customize file assignments logic in flink application

Lu Niu
Yes. you are right. SplittableIterator will cause each worker list all the files. thanks!

best
Lu

On Fri, Aug 16, 2019 at 12:33 AM Zhu Zhu <[hidden email]> wrote:
Hi Lu,

I think it's OK to choose any way as long as it works.
Though I've no idea how you would extend SplittableIterator in your case. The underlying is ParallelIteratorInputFormat and its processing is not matched to a certain subtask index.

Thanks,
Zhu Zhu

Lu Niu <[hidden email]> 于2019年8月16日周五 上午12:48写道:
Hi, Zhu

Thanks for reply! I found using SplittableIterator is also doable to some extent. How to choose between these two?

Best
Lu

On Wed, Aug 14, 2019 at 8:02 PM Zhu Zhu <[hidden email]> wrote:
Hi Lu,

Implementing your own InputFormat and InputSplitAssigner(which has the interface "InputSplit getNextInputSplit(String host, int taskId)") created by it should work if you want to assign InputSplit to tasks according to the task index and file name patterns.
To assign 2 InputSplits in one request, you can implement a new InputSplit which wraps multiple FileInputSplits. And you may need to define in your InputFormat on how to process the new InputSplit.

Thanks,
Zhu Zhu

Lu Niu <[hidden email]> 于2019年8月15日周四 上午12:26写道:
Hi,

I have a data set backed by a directory of files in which file names are meaningful.

folder1
   +-----file01
   +-----file02
   +-----file03
   +-----file04

I want to control the file assignments in my flink application. For example, when parallelism is 2, worker 1 get file01 and file02 to read and worker2 get 3 and 4. Also each worker get 2 files all at once because reading requires jumping back and forth between those two files.

What's the best way to do this? It seems like FileInputFormat is not extensible in this case. 

Best
Lu