Odd flink behaviour

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Odd flink behaviour

Mohit Anchlia

I have a very simple program that just reads all the files in the path. However, flink is not working as expected.

Everytime I execute this job I only see flink reading 2 files, even though there are more in that directory. On closer look it appears that it might be related to:

[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 task slot(s).

My question is, isn't flink supposed to iterate over the directory after those 2 slots become free again? I am assuming this problem is caused because there are only 2 slots.


Code ---

  PDFFileInputFormat format = new PDFFileInputFormat();
  format.setFilePath(args[0]);
  format.setNestedFileEnumeration(true);
  logger.info("Number of splits " + format.getNumSplits());

  // logger.info(Paths.get(".").toAbsolutePath().normalize().toString());

  env.createInput(format, TypeInformation.of(StringValue.class)).print();

Reply | Threaded
Open this post in threaded view
|

Re: Odd flink behaviour

Mohit Anchlia
And here is the inputformat code:

public class PDFFileInputFormat extends FileInputFormat<String> {
 /**
  *
  */
 private static final long serialVersionUID = -4137283038479003711L;
 private static final Logger logger = LoggerFactory
   .getLogger(PDFInputFormat.class.getName());
 private boolean reached = false;
 @Override
 public boolean reachedEnd() throws IOException {
  logger.info("called reached " + reached);
  // TODO Auto-generated method stub
  return reached;
 }
 @Override
 public String nextRecord(String reuse) throws IOException {
  logger.info("This is where you parse PDF");
  String content = new String(
    Files.readAllBytes(Paths.get(this.currentSplit.getPath().getPath())));
  logger.info("Content " + content);
  reached = true;
  return content;
 }
}

On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <[hidden email]> wrote:

I have a very simple program that just reads all the files in the path. However, flink is not working as expected.

Everytime I execute this job I only see flink reading 2 files, even though there are more in that directory. On closer look it appears that it might be related to:

[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 task slot(s).

My question is, isn't flink supposed to iterate over the directory after those 2 slots become free again? I am assuming this problem is caused because there are only 2 slots.


Code ---

  PDFFileInputFormat format = new PDFFileInputFormat();
  format.setFilePath(args[0]);
  format.setNestedFileEnumeration(true);
  logger.info("Number of splits " + format.getNumSplits());

  // logger.info(Paths.get(".").toAbsolutePath().normalize().toString());

  env.createInput(format, TypeInformation.of(StringValue.class)).print();


Reply | Threaded
Open this post in threaded view
|

Re: Odd flink behaviour

Fabian Hueske-2
Do you set reached to false in open()? 

Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <[hidden email]>:
And here is the inputformat code:

public class PDFFileInputFormat extends FileInputFormat<String> {
 /**
  *
  */
 private static final long serialVersionUID = -4137283038479003711L;
 private static final Logger logger = LoggerFactory
   .getLogger(PDFInputFormat.class.getName());
 private boolean reached = false;
 @Override
 public boolean reachedEnd() throws IOException {
  logger.info("called reached " + reached);
  // TODO Auto-generated method stub
  return reached;
 }
 @Override
 public String nextRecord(String reuse) throws IOException {
  logger.info("This is where you parse PDF");
  String content = new String(
    Files.readAllBytes(Paths.get(this.currentSplit.getPath().getPath())));
  logger.info("Content " + content);
  reached = true;
  return content;
 }
}

On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <[hidden email]> wrote:

I have a very simple program that just reads all the files in the path. However, flink is not working as expected.

Everytime I execute this job I only see flink reading 2 files, even though there are more in that directory. On closer look it appears that it might be related to:

[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 task slot(s).

My question is, isn't flink supposed to iterate over the directory after those 2 slots become free again? I am assuming this problem is caused because there are only 2 slots.


Code ---

  PDFFileInputFormat format = new PDFFileInputFormat();
  format.setFilePath(args[0]);
  format.setNestedFileEnumeration(true);
  logger.info("Number of splits " + format.getNumSplits());

  // logger.info(Paths.get(".").toAbsolutePath().normalize().toString());

  env.createInput(format, TypeInformation.of(StringValue.class)).print();



Reply | Threaded
Open this post in threaded view
|

Re: Odd flink behaviour

Mohit Anchlia
I didn't override open. I am using open that got inherited from FileInputFormat . Am I supposed to specifically override open?

On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske <[hidden email]> wrote:
Do you set reached to false in open()? 


Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <[hidden email]>:
And here is the inputformat code:

public class PDFFileInputFormat extends FileInputFormat<String> {
 /**
  *
  */
 private static final long serialVersionUID = -4137283038479003711L;
 private static final Logger logger = LoggerFactory
   .getLogger(PDFInputFormat.class.getName());
 private boolean reached = false;
 @Override
 public boolean reachedEnd() throws IOException {
  logger.info("called reached " + reached);
  // TODO Auto-generated method stub
  return reached;
 }
 @Override
 public String nextRecord(String reuse) throws IOException {
  logger.info("This is where you parse PDF");
  String content = new String(
    Files.readAllBytes(Paths.get(this.currentSplit.getPath().getPath())));
  logger.info("Content " + content);
  reached = true;
  return content;
 }
}

On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <[hidden email]> wrote:

I have a very simple program that just reads all the files in the path. However, flink is not working as expected.

Everytime I execute this job I only see flink reading 2 files, even though there are more in that directory. On closer look it appears that it might be related to:

[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 task slot(s).

My question is, isn't flink supposed to iterate over the directory after those 2 slots become free again? I am assuming this problem is caused because there are only 2 slots.


Code ---

  PDFFileInputFormat format = new PDFFileInputFormat();
  format.setFilePath(args[0]);
  format.setNestedFileEnumeration(true);
  logger.info("Number of splits " + format.getNumSplits());

  // logger.info(Paths.get(".").toAbsolutePath().normalize().toString());

  env.createInput(format, TypeInformation.of(StringValue.class)).print();




Reply | Threaded
Open this post in threaded view
|

Re: Odd flink behaviour

Fabian Hueske-2
An InputFormat processes multiple InputSplits. open() is called for each InputSplit.
If you don't reset reached to false in open() you will only read a single (i.e., the first) InputSplit and skip all others.

I'd override open as follows:

public void open(FileInputSplit fileSplit) throws IOException {
  super.open();
  reached = false;
}

Cheers, Fabian

2017-08-01 8:08 GMT+02:00 Mohit Anchlia <[hidden email]>:
I didn't override open. I am using open that got inherited from FileInputFormat . Am I supposed to specifically override open?

On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske <[hidden email]> wrote:
Do you set reached to false in open()? 


Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <[hidden email]>:
And here is the inputformat code:

public class PDFFileInputFormat extends FileInputFormat<String> {
 /**
  *
  */
 private static final long serialVersionUID = -4137283038479003711L;
 private static final Logger logger = LoggerFactory
   .getLogger(PDFInputFormat.class.getName());
 private boolean reached = false;
 @Override
 public boolean reachedEnd() throws IOException {
  logger.info("called reached " + reached);
  // TODO Auto-generated method stub
  return reached;
 }
 @Override
 public String nextRecord(String reuse) throws IOException {
  logger.info("This is where you parse PDF");
  String content = new String(
    Files.readAllBytes(Paths.get(this.currentSplit.getPath().getPath())));
  logger.info("Content " + content);
  reached = true;
  return content;
 }
}

On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <[hidden email]> wrote:

I have a very simple program that just reads all the files in the path. However, flink is not working as expected.

Everytime I execute this job I only see flink reading 2 files, even though there are more in that directory. On closer look it appears that it might be related to:

[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 task slot(s).

My question is, isn't flink supposed to iterate over the directory after those 2 slots become free again? I am assuming this problem is caused because there are only 2 slots.


Code ---

  PDFFileInputFormat format = new PDFFileInputFormat();
  format.setFilePath(args[0]);
  format.setNestedFileEnumeration(true);
  logger.info("Number of splits " + format.getNumSplits());

  // logger.info(Paths.get(".").toAbsolutePath().normalize().toString());

  env.createInput(format, TypeInformation.of(StringValue.class)).print();





Reply | Threaded
Open this post in threaded view
|

Re: Odd flink behaviour

Mohit Anchlia
Thanks that worked. However, what I don't understand is wouldn't the open call that I am inheriting have this logic already inbuilt? I am inheriting FileInputFormat.

On Tue, Aug 1, 2017 at 1:42 AM, Fabian Hueske <[hidden email]> wrote:
An InputFormat processes multiple InputSplits. open() is called for each InputSplit.
If you don't reset reached to false in open() you will only read a single (i.e., the first) InputSplit and skip all others.

I'd override open as follows:

public void open(FileInputSplit fileSplit) throws IOException {
  super.open();
  reached = false;
}

Cheers, Fabian


2017-08-01 8:08 GMT+02:00 Mohit Anchlia <[hidden email]>:
I didn't override open. I am using open that got inherited from FileInputFormat . Am I supposed to specifically override open?

On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske <[hidden email]> wrote:
Do you set reached to false in open()? 


Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <[hidden email]>:
And here is the inputformat code:

public class PDFFileInputFormat extends FileInputFormat<String> {
 /**
  *
  */
 private static final long serialVersionUID = -4137283038479003711L;
 private static final Logger logger = LoggerFactory
   .getLogger(PDFInputFormat.class.getName());
 private boolean reached = false;
 @Override
 public boolean reachedEnd() throws IOException {
  logger.info("called reached " + reached);
  // TODO Auto-generated method stub
  return reached;
 }
 @Override
 public String nextRecord(String reuse) throws IOException {
  logger.info("This is where you parse PDF");
  String content = new String(
    Files.readAllBytes(Paths.get(this.currentSplit.getPath().getPath())));
  logger.info("Content " + content);
  reached = true;
  return content;
 }
}

On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <[hidden email]> wrote:

I have a very simple program that just reads all the files in the path. However, flink is not working as expected.

Everytime I execute this job I only see flink reading 2 files, even though there are more in that directory. On closer look it appears that it might be related to:

[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 task slot(s).

My question is, isn't flink supposed to iterate over the directory after those 2 slots become free again? I am assuming this problem is caused because there are only 2 slots.


Code ---

  PDFFileInputFormat format = new PDFFileInputFormat();
  format.setFilePath(args[0]);
  format.setNestedFileEnumeration(true);
  logger.info("Number of splits " + format.getNumSplits());

  // logger.info(Paths.get(".").toAbsolutePath().normalize().toString());

  env.createInput(format, TypeInformation.of(StringValue.class)).print();






Reply | Threaded
Open this post in threaded view
|

Re: Odd flink behaviour

Fabian Hueske-2
FileInputFormat cannot know about the reached variable that you added in your class. So there is no way it could reset it to false.
An alternative implementation without overriding open() could be to change the reachedEnd method to check if the stream is still at offset 0.

2017-08-01 20:22 GMT+02:00 Mohit Anchlia <[hidden email]>:
Thanks that worked. However, what I don't understand is wouldn't the open call that I am inheriting have this logic already inbuilt? I am inheriting FileInputFormat.

On Tue, Aug 1, 2017 at 1:42 AM, Fabian Hueske <[hidden email]> wrote:
An InputFormat processes multiple InputSplits. open() is called for each InputSplit.
If you don't reset reached to false in open() you will only read a single (i.e., the first) InputSplit and skip all others.

I'd override open as follows:

public void open(FileInputSplit fileSplit) throws IOException {
  super.open();
  reached = false;
}

Cheers, Fabian


2017-08-01 8:08 GMT+02:00 Mohit Anchlia <[hidden email]>:
I didn't override open. I am using open that got inherited from FileInputFormat . Am I supposed to specifically override open?

On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske <[hidden email]> wrote:
Do you set reached to false in open()? 


Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <[hidden email]>:
And here is the inputformat code:

public class PDFFileInputFormat extends FileInputFormat<String> {
 /**
  *
  */
 private static final long serialVersionUID = -4137283038479003711L;
 private static final Logger logger = LoggerFactory
   .getLogger(PDFInputFormat.class.getName());
 private boolean reached = false;
 @Override
 public boolean reachedEnd() throws IOException {
  logger.info("called reached " + reached);
  // TODO Auto-generated method stub
  return reached;
 }
 @Override
 public String nextRecord(String reuse) throws IOException {
  logger.info("This is where you parse PDF");
  String content = new String(
    Files.readAllBytes(Paths.get(this.currentSplit.getPath().getPath())));
  logger.info("Content " + content);
  reached = true;
  return content;
 }
}

On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <[hidden email]> wrote:

I have a very simple program that just reads all the files in the path. However, flink is not working as expected.

Everytime I execute this job I only see flink reading 2 files, even though there are more in that directory. On closer look it appears that it might be related to:

[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 task slot(s).

My question is, isn't flink supposed to iterate over the directory after those 2 slots become free again? I am assuming this problem is caused because there are only 2 slots.


Code ---

  PDFFileInputFormat format = new PDFFileInputFormat();
  format.setFilePath(args[0]);
  format.setNestedFileEnumeration(true);
  logger.info("Number of splits " + format.getNumSplits());

  // logger.info(Paths.get(".").toAbsolutePath().normalize().toString());

  env.createInput(format, TypeInformation.of(StringValue.class)).print();







Reply | Threaded
Open this post in threaded view
|

Re: Odd flink behaviour

Mohit Anchlia
Thanks. I thought the purpose of below method was to supply that information?

@Override

public boolean reachedEnd() throws IOException {

logger.info("Reached " + reached);

return reached;

}


On Wed, Aug 2, 2017 at 1:43 AM, Fabian Hueske <[hidden email]> wrote:
FileInputFormat cannot know about the reached variable that you added in your class. So there is no way it could reset it to false.
An alternative implementation without overriding open() could be to change the reachedEnd method to check if the stream is still at offset 0.

2017-08-01 20:22 GMT+02:00 Mohit Anchlia <[hidden email]>:
Thanks that worked. However, what I don't understand is wouldn't the open call that I am inheriting have this logic already inbuilt? I am inheriting FileInputFormat.

On Tue, Aug 1, 2017 at 1:42 AM, Fabian Hueske <[hidden email]> wrote:
An InputFormat processes multiple InputSplits. open() is called for each InputSplit.
If you don't reset reached to false in open() you will only read a single (i.e., the first) InputSplit and skip all others.

I'd override open as follows:

public void open(FileInputSplit fileSplit) throws IOException {
  super.open();
  reached = false;
}

Cheers, Fabian


2017-08-01 8:08 GMT+02:00 Mohit Anchlia <[hidden email]>:
I didn't override open. I am using open that got inherited from FileInputFormat . Am I supposed to specifically override open?

On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske <[hidden email]> wrote:
Do you set reached to false in open()? 


Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <[hidden email]>:
And here is the inputformat code:

public class PDFFileInputFormat extends FileInputFormat<String> {
 /**
  *
  */
 private static final long serialVersionUID = -4137283038479003711L;
 private static final Logger logger = LoggerFactory
   .getLogger(PDFInputFormat.class.getName());
 private boolean reached = false;
 @Override
 public boolean reachedEnd() throws IOException {
  logger.info("called reached " + reached);
  // TODO Auto-generated method stub
  return reached;
 }
 @Override
 public String nextRecord(String reuse) throws IOException {
  logger.info("This is where you parse PDF");
  String content = new String(
    Files.readAllBytes(Paths.get(this.currentSplit.getPath().getPath())));
  logger.info("Content " + content);
  reached = true;
  return content;
 }
}

On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <[hidden email]> wrote:

I have a very simple program that just reads all the files in the path. However, flink is not working as expected.

Everytime I execute this job I only see flink reading 2 files, even though there are more in that directory. On closer look it appears that it might be related to:

[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 task slot(s).

My question is, isn't flink supposed to iterate over the directory after those 2 slots become free again? I am assuming this problem is caused because there are only 2 slots.


Code ---

  PDFFileInputFormat format = new PDFFileInputFormat();
  format.setFilePath(args[0]);
  format.setNestedFileEnumeration(true);
  logger.info("Number of splits " + format.getNumSplits());

  // logger.info(Paths.get(".").toAbsolutePath().normalize().toString());

  env.createInput(format, TypeInformation.of(StringValue.class)).print();