I have a very simple program that just reads all the files in the path. However, flink is not working as expected. Everytime I execute this job I only see flink reading 2 files, even though there are more in that directory. On closer look it appears that it might be related to: [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 task slot(s). My question is, isn't flink supposed to iterate over the directory after those 2 slots become free again? I am assuming this problem is caused because there are only 2 slots.
PDFFileInputFormat format = new PDFFileInputFormat(); // logger.info(Paths.get(".").toAbsolutePath().normalize().toString()); env.createInput(format, TypeInformation.of(StringValue.class)).print(); |
And here is the inputformat code: public class PDFFileInputFormat extends FileInputFormat<String> { /** * */ private static final long serialVersionUID = -4137283038479003711L; private static final Logger logger = LoggerFactory .getLogger(PDFInputFormat.class.getName()); private boolean reached = false; @Override public boolean reachedEnd() throws IOException { logger.info("called reached " + reached); // TODO Auto-generated method stub return reached; } @Override public String nextRecord(String reuse) throws IOException { logger.info("This is where you parse PDF"); String content = new String( Files.readAllBytes(Paths.get(this.currentSplit.getPath().getPath()))); logger.info("Content " + content); reached = true; return content; } } On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <[hidden email]> wrote:
|
Do you set reached to false in open()? Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <[hidden email]>:
|
I didn't override open. I am using open that got inherited from FileInputFormat . Am I supposed to specifically override open? On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske <[hidden email]> wrote:
|
An InputFormat processes multiple InputSplits. open() is called for each InputSplit. I'd override open as follows:If you don't reset reached to false in open() you will only read a single (i.e., the first) InputSplit and skip all others. public void open(FileInputSplit fileSplit) throws IOException { super.open(); reached = false; } 2017-08-01 8:08 GMT+02:00 Mohit Anchlia <[hidden email]>:
|
Thanks that worked. However, what I don't understand is wouldn't the open call that I am inheriting have this logic already inbuilt? I am inheriting FileInputFormat. On Tue, Aug 1, 2017 at 1:42 AM, Fabian Hueske <[hidden email]> wrote:
|
FileInputFormat cannot know about the reached variable that you added in your class. So there is no way it could reset it to false. An alternative implementation without overriding open() could be to change the reachedEnd method to check if the stream is still at offset 0. 2017-08-01 20:22 GMT+02:00 Mohit Anchlia <[hidden email]>:
|
Thanks. I thought the purpose of below method was to supply that information? @Override public boolean reachedEnd() throws IOException { logger.info("Reached " + reached); return reached; } On Wed, Aug 2, 2017 at 1:43 AM, Fabian Hueske <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |