Hello Flinkers!
I run into some strange behavior when reading from a folder of input files. When the number of input files in the folder exceeds the number of task slots I noticed that the size of my datasets varies with each run. It seems as if the transformations don't wait for all input files to be read. When I have equal or more task slots than there are files, there are no problems. I'm using a custom input format. Could there be a problem with my custom input format, and if so what could I be forgetting? Kind regards and thank you for your time! Pieter |
I assume this concerns the streaming API? Can you share your program and/or the custom input format code? On Mon, Oct 5, 2015 at 12:33 PM, Pieter Hameete <[hidden email]> wrote:
|
Hi Stephen, it concerns the DataSet API. The program im running can be found at https://github.com/PHameete/dawn-flink/blob/development/src/main/scala/wis/dawnflink/performance/xmark/XMarkQuery11.scala The Custom Input Format at https://github.com/PHameete/dawn-flink/blob/development/src/main/scala/wis/dawnflink/parsing/xml/XML2DawnInputFormat.java Cheers! 2015-10-05 12:38 GMT+02:00 Stephan Ewen <[hidden email]>:
|
If you have more files than task slots, then some tasks will get multiple files. That means that open() and close() are called multiple times on the input format. Make sure that your input format tolerates that and does not get confused with lingering state (maybe create a new SimpleInputProjection as well) On Mon, Oct 5, 2015 at 12:41 PM, Pieter Hameete <[hidden email]> wrote:
|
Hi Stephen, it was not the SimpleInputProjection, because that is a stateless object. The boolean endReached was not reset upon opening a new file however, so for each consecutive file no records were parsed. Thanks alot for your help! - Pieter 2015-10-05 12:50 GMT+02:00 Stephan Ewen <[hidden email]>:
|
Okay, nice to hear it works out! On Mon, Oct 5, 2015 at 1:50 PM, Pieter Hameete <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |