A custom FileInputFormat

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

A custom FileInputFormat

Niklas Semmler
Hello Flink community,

I am running into an issue with a custom FileInputFormat class and would
appreciate your help.

My goal is to read all files from a directory as paths:

val env : ExecutionEnvironment =
ExecutionEnvironment.getExecutionEnvironment

var source : DataSet[String] = env.readFile(new DirReader,
"/tmp/mydir").setParallelism(1)

source.writeAsText("/tmp/results", WriteMode.OVERWRITE)

env.execute("Job")

It works, when I execute the program from within my IDE or execute it
directly as a fat jar. When I run it through the Flink CLI the file
"/tmp/results" is created, but not filled with entries.

There seems to be something wrong with my custom DirReader (see below).
The output of the println statements is not visible when running the
code from the Flink CLI.

No exception is stated in the logs (see below). I am at a loss at what
to try. Even worse, when I copy the fat jar to a remote system, the
problem appears also when I execute the fat jar directly.

Local System
Flink: 1.0.2
Java: 1.8.0_102
Scala: 2.11.8

Remote System
Flink: 1.1.3
Java: 1.8.0_92
Scala: 2.11.6

Help or ideas to try out are welcome!

Best,
Niklas



----------------------------------------
import java.io.File

import org.apache.flink.api.common.io.FileInputFormat

class DirReader extends FileInputFormat[String] {
   var running : Boolean = false
   var fileList : Array[String] = null

   override def openInputFormat() = {
       println("Path: " + this.filePath.toString)
       val directory = new File(this.filePath.toString)
       if (directory != null && directory.isDirectory) {
         fileList =
directory.listFiles.filter(_.isDirectory).map(_.listFiles).flatten
          .map(_.toString)
         running = if (fileList.length > 1) true else false
       }
       println("fileList " + fileList.length + " running " + running)
   }

   override def nextRecord(reuse: String): String = {
     val head = fileList.head
     println("File: " + head)
     fileList = fileList.tail
     running = if (fileList.length == 0) false else true
     head
   }

   override def reachedEnd(): Boolean = ! running
}
----------------------------------------

The output from the CLI:

10/28/2016 18:27:56 Job execution switched to status RUNNING.
10/28/2016 18:27:56 DataSource (at
org.apache.flink.api.scala.ExecutionEnvironment.readFile(ExecutionEnvironment.scala:385)
(de.tuberlin.inet.plag.DirReader))(1/1) switched to SCHEDULED
10/28/2016 18:27:56 DataSource (at
org.apache.flink.api.scala.ExecutionEnvironment.readFile(ExecutionEnvironment.scala:385)
(de.tuberlin.inet.plag.DirReader))(1/1) switched to DEPLOYING
10/28/2016 18:27:56 DataSource (at
org.apache.flink.api.scala.ExecutionEnvironment.readFile(ExecutionEnvironment.scala:385)
(de.tuberlin.inet.plag.DirReader))(1/1) switched to RUNNING
10/28/2016 18:27:56 DataSink (TextOutputFormat (/tmp/results) -
UTF-8)(1/1) switched to SCHEDULED
10/28/2016 18:27:56 DataSink (TextOutputFormat (/tmp/results) -
UTF-8)(1/1) switched to DEPLOYING
10/28/2016 18:27:56 DataSource (at
org.apache.flink.api.scala.ExecutionEnvironment.readFile(ExecutionEnvironment.scala:385)
(de.tuberlin.inet.plag.DirReader))(1/1) switched to FINISHED
10/28/2016 18:27:56 DataSink (TextOutputFormat (/tmp/results) -
UTF-8)(1/1) switched to RUNNING
10/28/2016 18:27:56 DataSink (TextOutputFormat (/tmp/results) -
UTF-8)(1/1) switched to FINISHED
10/28/2016 18:27:56 Job execution switched to status FINISHED.


--
Niklas Semmler
PhD Student / Research Assistant
TU Berlin, INET, Room MAR 4.027
Marchstr 23, 10587 Berlin
Tel.: +49 (0)30 314 75739
http://inet.tu-berlin.de/~nsemmler/
Reply | Threaded
Open this post in threaded view
|

Re: A custom FileInputFormat

Fabian Hueske-2
Hi Niklas,

I don't know exactly what is going wrong there, but I have a few pointers for you:

1) in cluster setups, Flink redirects println() to ./log/*.out files, i.e, you have to search for the task manager that ran the DirReader and check its ./log/*.out file
2) you are using Java's File class. That will only work if you are accessing the local file system of the machine the DirReader runs on. If you want to read the files from an HDFS you have to use the corresponding HDFS client.
3) I would not extend the FileInputFormat for your purpose. The FileInputFormat is meant to *read* files, not just look up file names. I'd rather implement an InputFormat from scratch. Since you are only running a single instance, you can return a single dummy InputSplit.

Let me know, if you have further questions.
Best, Fabian

2016-10-28 18:38 GMT+02:00 Niklas Semmler <[hidden email]>:
Hello Flink community,

I am running into an issue with a custom FileInputFormat class and would appreciate your help.

My goal is to read all files from a directory as paths:

val env : ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

var source : DataSet[String] = env.readFile(new DirReader, "/tmp/mydir").setParallelism(1)

source.writeAsText("/tmp/results", WriteMode.OVERWRITE)

env.execute("Job")

It works, when I execute the program from within my IDE or execute it directly as a fat jar. When I run it through the Flink CLI the file "/tmp/results" is created, but not filled with entries.

There seems to be something wrong with my custom DirReader (see below). The output of the println statements is not visible when running the code from the Flink CLI.

No exception is stated in the logs (see below). I am at a loss at what to try. Even worse, when I copy the fat jar to a remote system, the problem appears also when I execute the fat jar directly.

Local System
Flink: 1.0.2
Java: 1.8.0_102
Scala: 2.11.8

Remote System
Flink: 1.1.3
Java: 1.8.0_92
Scala: 2.11.6

Help or ideas to try out are welcome!

Best,
Niklas



----------------------------------------
import java.io.File

import org.apache.flink.api.common.io.FileInputFormat

class DirReader extends FileInputFormat[String] {
  var running : Boolean = false
  var fileList : Array[String] = null

  override def openInputFormat() = {
      println("Path: " + this.filePath.toString)
      val directory = new File(this.filePath.toString)
      if (directory != null && directory.isDirectory) {
        fileList = directory.listFiles.filter(_.isDirectory).map(_.listFiles).flatten
         .map(_.toString)
        running = if (fileList.length > 1) true else false
      }
      println("fileList " + fileList.length + " running " + running)
  }

  override def nextRecord(reuse: String): String = {
    val head = fileList.head
    println("File: " + head)
    fileList = fileList.tail
    running = if (fileList.length == 0) false else true
    head
  }

  override def reachedEnd(): Boolean = ! running
}
----------------------------------------

The output from the CLI:

10/28/2016 18:27:56     Job execution switched to status RUNNING.
10/28/2016 18:27:56     DataSource (at org.apache.flink.api.scala.ExecutionEnvironment.readFile(ExecutionEnvironment.scala:385) (de.tuberlin.inet.plag.DirReader))(1/1) switched to SCHEDULED
10/28/2016 18:27:56     DataSource (at org.apache.flink.api.scala.ExecutionEnvironment.readFile(ExecutionEnvironment.scala:385) (de.tuberlin.inet.plag.DirReader))(1/1) switched to DEPLOYING
10/28/2016 18:27:56     DataSource (at org.apache.flink.api.scala.ExecutionEnvironment.readFile(ExecutionEnvironment.scala:385) (de.tuberlin.inet.plag.DirReader))(1/1) switched to RUNNING
10/28/2016 18:27:56     DataSink (TextOutputFormat (/tmp/results) - UTF-8)(1/1) switched to SCHEDULED
10/28/2016 18:27:56     DataSink (TextOutputFormat (/tmp/results) - UTF-8)(1/1) switched to DEPLOYING
10/28/2016 18:27:56     DataSource (at org.apache.flink.api.scala.ExecutionEnvironment.readFile(ExecutionEnvironment.scala:385) (de.tuberlin.inet.plag.DirReader))(1/1) switched to FINISHED
10/28/2016 18:27:56     DataSink (TextOutputFormat (/tmp/results) - UTF-8)(1/1) switched to RUNNING
10/28/2016 18:27:56     DataSink (TextOutputFormat (/tmp/results) - UTF-8)(1/1) switched to FINISHED
10/28/2016 18:27:56     Job execution switched to status FINISHED.


--
Niklas Semmler
PhD Student / Research Assistant
TU Berlin, INET, Room MAR 4.027
Marchstr 23, 10587 Berlin
Tel.: <a href="tel:%2B49%20%280%2930%20314%2075739" value="+493031475739" target="_blank">+49 (0)30 314 75739
http://inet.tu-berlin.de/~nsemmler/

Reply | Threaded
Open this post in threaded view
|

Re: A custom FileInputFormat

Niklas Semmler
Hello Fabian,

thanks for the response and my apologies for the late reply.

I tried extending the InputFormat, but it felt to hackish, so I simply
loaded the files before running the job and fed it to Flink via the
env.fromCollections(...) statement.

Cheers,
Niklas

On 01.11.2016 21:59, Fabian Hueske wrote:

> Hi Niklas,
>
> I don't know exactly what is going wrong there, but I have a few pointers
> for you:
>
> 1) in cluster setups, Flink redirects println() to ./log/*.out files, i.e,
> you have to search for the task manager that ran the DirReader and check
> its ./log/*.out file
> 2) you are using Java's File class. That will only work if you are
> accessing the local file system of the machine the DirReader runs on. If
> you want to read the files from an HDFS you have to use the corresponding
> HDFS client.
> 3) I would not extend the FileInputFormat for your purpose. The
> FileInputFormat is meant to *read* files, not just look up file names. I'd
> rather implement an InputFormat from scratch. Since you are only running a
> single instance, you can return a single dummy InputSplit.
>
> Let me know, if you have further questions.
> Best, Fabian
>
> 2016-10-28 18:38 GMT+02:00 Niklas Semmler <[hidden email]>:
>
>> Hello Flink community,
>>
>> I am running into an issue with a custom FileInputFormat class and would
>> appreciate your help.
>>
>> My goal is to read all files from a directory as paths:
>>
>> val env : ExecutionEnvironment = ExecutionEnvironment.getExecut
>> ionEnvironment
>>
>> var source : DataSet[String] = env.readFile(new DirReader,
>> "/tmp/mydir").setParallelism(1)
>>
>> source.writeAsText("/tmp/results", WriteMode.OVERWRITE)
>>
>> env.execute("Job")
>>
>> It works, when I execute the program from within my IDE or execute it
>> directly as a fat jar. When I run it through the Flink CLI the file
>> "/tmp/results" is created, but not filled with entries.
>>
>> There seems to be something wrong with my custom DirReader (see below).
>> The output of the println statements is not visible when running the code
>> from the Flink CLI.
>>
>> No exception is stated in the logs (see below). I am at a loss at what to
>> try. Even worse, when I copy the fat jar to a remote system, the problem
>> appears also when I execute the fat jar directly.
>>
>> Local System
>> Flink: 1.0.2
>> Java: 1.8.0_102
>> Scala: 2.11.8
>>
>> Remote System
>> Flink: 1.1.3
>> Java: 1.8.0_92
>> Scala: 2.11.6
>>
>> Help or ideas to try out are welcome!
>>
>> Best,
>> Niklas
>>
>>
>>
>> ----------------------------------------
>> import java.io.File
>>
>> import org.apache.flink.api.common.io.FileInputFormat
>>
>> class DirReader extends FileInputFormat[String] {
>>   var running : Boolean = false
>>   var fileList : Array[String] = null
>>
>>   override def openInputFormat() = {
>>       println("Path: " + this.filePath.toString)
>>       val directory = new File(this.filePath.toString)
>>       if (directory != null && directory.isDirectory) {
>>         fileList = directory.listFiles.filter(_.i
>> sDirectory).map(_.listFiles).flatten
>>          .map(_.toString)
>>         running = if (fileList.length > 1) true else false
>>       }
>>       println("fileList " + fileList.length + " running " + running)
>>   }
>>
>>   override def nextRecord(reuse: String): String = {
>>     val head = fileList.head
>>     println("File: " + head)
>>     fileList = fileList.tail
>>     running = if (fileList.length == 0) false else true
>>     head
>>   }
>>
>>   override def reachedEnd(): Boolean = ! running
>> }
>> ----------------------------------------
>>
>> The output from the CLI:
>>
>> 10/28/2016 18:27:56     Job execution switched to status RUNNING.
>> 10/28/2016 18:27:56     DataSource (at org.apache.flink.api.scala.Exe
>> cutionEnvironment.readFile(ExecutionEnvironment.scala:385)
>> (de.tuberlin.inet.plag.DirReader))(1/1) switched to SCHEDULED
>> 10/28/2016 18:27:56     DataSource (at org.apache.flink.api.scala.Exe
>> cutionEnvironment.readFile(ExecutionEnvironment.scala:385)
>> (de.tuberlin.inet.plag.DirReader))(1/1) switched to DEPLOYING
>> 10/28/2016 18:27:56     DataSource (at org.apache.flink.api.scala.Exe
>> cutionEnvironment.readFile(ExecutionEnvironment.scala:385)
>> (de.tuberlin.inet.plag.DirReader))(1/1) switched to RUNNING
>> 10/28/2016 18:27:56     DataSink (TextOutputFormat (/tmp/results) -
>> UTF-8)(1/1) switched to SCHEDULED
>> 10/28/2016 18:27:56     DataSink (TextOutputFormat (/tmp/results) -
>> UTF-8)(1/1) switched to DEPLOYING
>> 10/28/2016 18:27:56     DataSource (at org.apache.flink.api.scala.Exe
>> cutionEnvironment.readFile(ExecutionEnvironment.scala:385)
>> (de.tuberlin.inet.plag.DirReader))(1/1) switched to FINISHED
>> 10/28/2016 18:27:56     DataSink (TextOutputFormat (/tmp/results) -
>> UTF-8)(1/1) switched to RUNNING
>> 10/28/2016 18:27:56     DataSink (TextOutputFormat (/tmp/results) -
>> UTF-8)(1/1) switched to FINISHED
>> 10/28/2016 18:27:56     Job execution switched to status FINISHED.
>>
>>
>> --
>> Niklas Semmler
>> PhD Student / Research Assistant
>> TU Berlin, INET, Room MAR 4.027
>> Marchstr 23, 10587 Berlin
>> Tel.: +49 (0)30 314 75739
>> http://inet.tu-berlin.de/~nsemmler/
>>
>

--
Niklas Semmler
PhD Student / Research Assistant
TU Berlin, INET, Room MAR 4.027
Marchstr 23, 10587 Berlin
Tel.: +49 (0)30 314 75739
http://inet.tu-berlin.de/~nsemmler/