Customer inputformat

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Customer inputformat

Mohit Anchlia
Hi,

I created a custom input format. Idea behind this is to read all binary files from a directory and use each file as it's own split. Each split is read as one whole record. When I run it in flink I don't get any error but I am not seeing any output from .print. Am I missing something?

----

public class PDFFileInputFormat extends RichInputFormat<StringValue, InputSplit> {

private static final Logger logger = LoggerFactory.getLogger(PDFFileInputFormat.class.getName());

PDFFileInputSplit current = null;

public static void main(String... args) throws Exception {

PDFFileInputFormat pdfReader = new PDFFileInputFormat("c:\\proj\\test");

InputSplit[] splits = pdfReader.createInputSplits(1);

pdfReader.open(splits[0]);

pdfReader.nextRecord(null);

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.fromElements(1, 2, 3)

// returns the squared i

.print();

PDFFileInputFormat format = new PDFFileInputFormat("c:\\proj\\test");

InputFormatSourceFunction<StringValue> reader = new InputFormatSourceFunction<>(format,

TypeInformation.of(StringValue.class));

env.createInput(format,TypeInformation.of(StringValue.class)).print();

}

String path = null;

public PDFFileInputFormat(String path) {

this.path = path;

}

public void configure(Configuration parameters) {

// TODO Auto-generated method stub

}

public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {

// TODO Auto-generated method stub

return cachedStatistics;

}

public InputSplit[] createInputSplits(int minNumSplits) throws IOException {

final List<PDFFileInputSplit> splits = new ArrayList<PDFFileInputSplit>();

Files.list(Paths.get(path)).forEach(f -> {

PDFFileInputSplit split = new PDFFileInputSplit(splits.size(), f);

splits.add(split);

});

PDFFileInputSplit[] inputSplitArray = new PDFFileInputSplit[splits.size()];

return splits.toArray(inputSplitArray);

}

public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {

logger.info("Assigner");

// TODO Auto-generated method stub

return new DefaultInputSplitAssigner(inputSplits);

}

public void open(InputSplit split) throws IOException {

this.current = (PDFFileInputSplit) split;

}

public boolean reachedEnd() throws IOException {

// TODO Auto-generated method stub

return true;

}

public StringValue nextRecord(StringValue reuse) throws IOException {

String content = new String(Files.readAllBytes(this.current.getFile()));

logger.info("Content " + content);

return new StringValue(content);

}

public void close() throws IOException {

// TODO Auto-generated method stub

}

}

---


Thanks,

Mohit


Reply | Threaded
Open this post in threaded view
|

Re: Customer inputformat

Fabian Hueske-2
Hi,

Flink calls the reachedEnd() method before it calls nextRecord() and closes the IF when reachedEnd() returns true.
So, it should not return true until nextRecord() was called and the first and last record was emitted.

You might also want to built your PDFFileInputFormat on FileInputFormat and set unsplittable to true.
FileInputFormat comes with lots of built-in functionality such as InputSplit generation.

Cheers, Fabian

2017-07-30 3:41 GMT+02:00 Mohit Anchlia <[hidden email]>:
Hi,

I created a custom input format. Idea behind this is to read all binary files from a directory and use each file as it's own split. Each split is read as one whole record. When I run it in flink I don't get any error but I am not seeing any output from .print. Am I missing something?

----

public class PDFFileInputFormat extends RichInputFormat<StringValue, InputSplit> {

private static final Logger logger = LoggerFactory.getLogger(PDFFileInputFormat.class.getName());

PDFFileInputSplit current = null;

public static void main(String... args) throws Exception {

PDFFileInputFormat pdfReader = new PDFFileInputFormat("c:\\proj\\test");

InputSplit[] splits = pdfReader.createInputSplits(1);

pdfReader.open(splits[0]);

pdfReader.nextRecord(null);

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.fromElements(1, 2, 3)

// returns the squared i

.print();

PDFFileInputFormat format = new PDFFileInputFormat("c:\\proj\\test");

InputFormatSourceFunction<StringValue> reader = new InputFormatSourceFunction<>(format,

TypeInformation.of(StringValue.class));

env.createInput(format,TypeInformation.of(StringValue.class)).print();

}

String path = null;

public PDFFileInputFormat(String path) {

this.path = path;

}

public void configure(Configuration parameters) {

// TODO Auto-generated method stub

}

public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {

// TODO Auto-generated method stub

return cachedStatistics;

}

public InputSplit[] createInputSplits(int minNumSplits) throws IOException {

final List<PDFFileInputSplit> splits = new ArrayList<PDFFileInputSplit>();

Files.list(Paths.get(path)).forEach(f -> {

PDFFileInputSplit split = new PDFFileInputSplit(splits.size(), f);

splits.add(split);

});

PDFFileInputSplit[] inputSplitArray = new PDFFileInputSplit[splits.size()];

return splits.toArray(inputSplitArray);

}

public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {

logger.info("Assigner");

// TODO Auto-generated method stub

return new DefaultInputSplitAssigner(inputSplits);

}

public void open(InputSplit split) throws IOException {

this.current = (PDFFileInputSplit) split;

}

public boolean reachedEnd() throws IOException {

// TODO Auto-generated method stub

return true;

}

public StringValue nextRecord(StringValue reuse) throws IOException {

String content = new String(Files.readAllBytes(this.current.getFile()));

logger.info("Content " + content);

return new StringValue(content);

}

public void close() throws IOException {

// TODO Auto-generated method stub

}

}

---


Thanks,

Mohit



Reply | Threaded
Open this post in threaded view
|

Re: Customer inputformat

Mohit Anchlia
Thanks. Few more questions:

- Is there an example for FileInputFormat? 
- how to make it read all the files in a directory?
- how to make an inputformat a streaming input instead of batch? Eg: read as new files come to a dir.

Thanks again.

On Sun, Jul 30, 2017 at 12:53 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink calls the reachedEnd() method before it calls nextRecord() and closes the IF when reachedEnd() returns true.
So, it should not return true until nextRecord() was called and the first and last record was emitted.

You might also want to built your PDFFileInputFormat on FileInputFormat and set unsplittable to true.
FileInputFormat comes with lots of built-in functionality such as InputSplit generation.

Cheers, Fabian

2017-07-30 3:41 GMT+02:00 Mohit Anchlia <[hidden email]>:
Hi,

I created a custom input format. Idea behind this is to read all binary files from a directory and use each file as it's own split. Each split is read as one whole record. When I run it in flink I don't get any error but I am not seeing any output from .print. Am I missing something?

----

public class PDFFileInputFormat extends RichInputFormat<StringValue, InputSplit> {

private static final Logger logger = LoggerFactory.getLogger(PDFFileInputFormat.class.getName());

PDFFileInputSplit current = null;

public static void main(String... args) throws Exception {

PDFFileInputFormat pdfReader = new PDFFileInputFormat("c:\\proj\\test");

InputSplit[] splits = pdfReader.createInputSplits(1);

pdfReader.open(splits[0]);

pdfReader.nextRecord(null);

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.fromElements(1, 2, 3)

// returns the squared i

.print();

PDFFileInputFormat format = new PDFFileInputFormat("c:\\proj\\test");

InputFormatSourceFunction<StringValue> reader = new InputFormatSourceFunction<>(format,

TypeInformation.of(StringValue.class));

env.createInput(format,TypeInformation.of(StringValue.class)).print();

}

String path = null;

public PDFFileInputFormat(String path) {

this.path = path;

}

public void configure(Configuration parameters) {

// TODO Auto-generated method stub

}

public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {

// TODO Auto-generated method stub

return cachedStatistics;

}

public InputSplit[] createInputSplits(int minNumSplits) throws IOException {

final List<PDFFileInputSplit> splits = new ArrayList<PDFFileInputSplit>();

Files.list(Paths.get(path)).forEach(f -> {

PDFFileInputSplit split = new PDFFileInputSplit(splits.size(), f);

splits.add(split);

});

PDFFileInputSplit[] inputSplitArray = new PDFFileInputSplit[splits.size()];

return splits.toArray(inputSplitArray);

}

public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {

logger.info("Assigner");

// TODO Auto-generated method stub

return new DefaultInputSplitAssigner(inputSplits);

}

public void open(InputSplit split) throws IOException {

this.current = (PDFFileInputSplit) split;

}

public boolean reachedEnd() throws IOException {

// TODO Auto-generated method stub

return true;

}

public StringValue nextRecord(StringValue reuse) throws IOException {

String content = new String(Files.readAllBytes(this.current.getFile()));

logger.info("Content " + content);

return new StringValue(content);

}

public void close() throws IOException {

// TODO Auto-generated method stub

}

}

---


Thanks,

Mohit




Reply | Threaded
Open this post in threaded view
|

Re: Customer inputformat

Ted Yu
For #1, you can find quite a few classes which extend FileInputFormat.
e.g.

flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java:public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQuer
flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java:public abstract class BinaryInputFormat<T> extends FileInputFormat<T>
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java:public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> implements Checkpoi

flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java:             extends FileInputFormat<String>

FYI

On Sun, Jul 30, 2017 at 12:26 PM, Mohit Anchlia <[hidden email]> wrote:
Thanks. Few more questions:

- Is there an example for FileInputFormat? 
- how to make it read all the files in a directory?
- how to make an inputformat a streaming input instead of batch? Eg: read as new files come to a dir.

Thanks again.

On Sun, Jul 30, 2017 at 12:53 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink calls the reachedEnd() method before it calls nextRecord() and closes the IF when reachedEnd() returns true.
So, it should not return true until nextRecord() was called and the first and last record was emitted.

You might also want to built your PDFFileInputFormat on FileInputFormat and set unsplittable to true.
FileInputFormat comes with lots of built-in functionality such as InputSplit generation.

Cheers, Fabian

2017-07-30 3:41 GMT+02:00 Mohit Anchlia <[hidden email]>:
Hi,

I created a custom input format. Idea behind this is to read all binary files from a directory and use each file as it's own split. Each split is read as one whole record. When I run it in flink I don't get any error but I am not seeing any output from .print. Am I missing something?

----

public class PDFFileInputFormat extends RichInputFormat<StringValue, InputSplit> {

private static final Logger logger = LoggerFactory.getLogger(PDFFileInputFormat.class.getName());

PDFFileInputSplit current = null;

public static void main(String... args) throws Exception {

PDFFileInputFormat pdfReader = new PDFFileInputFormat("c:\\proj\\test");

InputSplit[] splits = pdfReader.createInputSplits(1);

pdfReader.open(splits[0]);

pdfReader.nextRecord(null);

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.fromElements(1, 2, 3)

// returns the squared i

.print();

PDFFileInputFormat format = new PDFFileInputFormat("c:\\proj\\test");

InputFormatSourceFunction<StringValue> reader = new InputFormatSourceFunction<>(format,

TypeInformation.of(StringValue.class));

env.createInput(format,TypeInformation.of(StringValue.class)).print();

}

String path = null;

public PDFFileInputFormat(String path) {

this.path = path;

}

public void configure(Configuration parameters) {

// TODO Auto-generated method stub

}

public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {

// TODO Auto-generated method stub

return cachedStatistics;

}

public InputSplit[] createInputSplits(int minNumSplits) throws IOException {

final List<PDFFileInputSplit> splits = new ArrayList<PDFFileInputSplit>();

Files.list(Paths.get(path)).forEach(f -> {

PDFFileInputSplit split = new PDFFileInputSplit(splits.size(), f);

splits.add(split);

});

PDFFileInputSplit[] inputSplitArray = new PDFFileInputSplit[splits.size()];

return splits.toArray(inputSplitArray);

}

public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {

logger.info("Assigner");

// TODO Auto-generated method stub

return new DefaultInputSplitAssigner(inputSplits);

}

public void open(InputSplit split) throws IOException {

this.current = (PDFFileInputSplit) split;

}

public boolean reachedEnd() throws IOException {

// TODO Auto-generated method stub

return true;

}

public StringValue nextRecord(StringValue reuse) throws IOException {

String content = new String(Files.readAllBytes(this.current.getFile()));

logger.info("Content " + content);

return new StringValue(content);

}

public void close() throws IOException {

// TODO Auto-generated method stub

}

}

---


Thanks,

Mohit





Reply | Threaded
Open this post in threaded view
|

Re: Customer inputformat

Fabian Hueske-2
Hi Mohit,

as Ted said, there are plenty of InputFormats which are based on FileInputFormat.
FileInputFormat also supports reading all files in a directory. Simply specify the path of the directory.

Check StreamExecutionEnvironment.createFileInput() which takes a several parameters such as a FileInputFormat and a time interval in which the directory is periodically checked.

Best, Fabian

2017-07-30 21:31 GMT+02:00 Ted Yu <[hidden email]>:
For #1, you can find quite a few classes which extend FileInputFormat.
e.g.

flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java:public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQuer
flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java:public abstract class BinaryInputFormat<T> extends FileInputFormat<T>
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java:public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> implements Checkpoi

flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java:             extends FileInputFormat<String>

FYI

On Sun, Jul 30, 2017 at 12:26 PM, Mohit Anchlia <[hidden email]> wrote:
Thanks. Few more questions:

- Is there an example for FileInputFormat? 
- how to make it read all the files in a directory?
- how to make an inputformat a streaming input instead of batch? Eg: read as new files come to a dir.

Thanks again.

On Sun, Jul 30, 2017 at 12:53 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink calls the reachedEnd() method before it calls nextRecord() and closes the IF when reachedEnd() returns true.
So, it should not return true until nextRecord() was called and the first and last record was emitted.

You might also want to built your PDFFileInputFormat on FileInputFormat and set unsplittable to true.
FileInputFormat comes with lots of built-in functionality such as InputSplit generation.

Cheers, Fabian

2017-07-30 3:41 GMT+02:00 Mohit Anchlia <[hidden email]>:
Hi,

I created a custom input format. Idea behind this is to read all binary files from a directory and use each file as it's own split. Each split is read as one whole record. When I run it in flink I don't get any error but I am not seeing any output from .print. Am I missing something?

----

public class PDFFileInputFormat extends RichInputFormat<StringValue, InputSplit> {

private static final Logger logger = LoggerFactory.getLogger(PDFFileInputFormat.class.getName());

PDFFileInputSplit current = null;

public static void main(String... args) throws Exception {

PDFFileInputFormat pdfReader = new PDFFileInputFormat("c:\\proj\\test");

InputSplit[] splits = pdfReader.createInputSplits(1);

pdfReader.open(splits[0]);

pdfReader.nextRecord(null);

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.fromElements(1, 2, 3)

// returns the squared i

.print();

PDFFileInputFormat format = new PDFFileInputFormat("c:\\proj\\test");

InputFormatSourceFunction<StringValue> reader = new InputFormatSourceFunction<>(format,

TypeInformation.of(StringValue.class));

env.createInput(format,TypeInformation.of(StringValue.class)).print();

}

String path = null;

public PDFFileInputFormat(String path) {

this.path = path;

}

public void configure(Configuration parameters) {

// TODO Auto-generated method stub

}

public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {

// TODO Auto-generated method stub

return cachedStatistics;

}

public InputSplit[] createInputSplits(int minNumSplits) throws IOException {

final List<PDFFileInputSplit> splits = new ArrayList<PDFFileInputSplit>();

Files.list(Paths.get(path)).forEach(f -> {

PDFFileInputSplit split = new PDFFileInputSplit(splits.size(), f);

splits.add(split);

});

PDFFileInputSplit[] inputSplitArray = new PDFFileInputSplit[splits.size()];

return splits.toArray(inputSplitArray);

}

public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {

logger.info("Assigner");

// TODO Auto-generated method stub

return new DefaultInputSplitAssigner(inputSplits);

}

public void open(InputSplit split) throws IOException {

this.current = (PDFFileInputSplit) split;

}

public boolean reachedEnd() throws IOException {

// TODO Auto-generated method stub

return true;

}

public StringValue nextRecord(StringValue reuse) throws IOException {

String content = new String(Files.readAllBytes(this.current.getFile()));

logger.info("Content " + content);

return new StringValue(content);

}

public void close() throws IOException {

// TODO Auto-generated method stub

}

}

---


Thanks,

Mohit






Reply | Threaded
Open this post in threaded view
|

Re: Customer inputformat

Mohit Anchlia
Thanks! When I give path to a directory flink is only reading 2 files. It seems to be picking these 2 files randomly.

On Mon, Jul 31, 2017 at 12:05 AM, Fabian Hueske <[hidden email]> wrote:
Hi Mohit,

as Ted said, there are plenty of InputFormats which are based on FileInputFormat.
FileInputFormat also supports reading all files in a directory. Simply specify the path of the directory.

Check StreamExecutionEnvironment.createFileInput() which takes a several parameters such as a FileInputFormat and a time interval in which the directory is periodically checked.

Best, Fabian

2017-07-30 21:31 GMT+02:00 Ted Yu <[hidden email]>:
For #1, you can find quite a few classes which extend FileInputFormat.
e.g.

flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java:public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQuer
flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java:public abstract class BinaryInputFormat<T> extends FileInputFormat<T>
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java:public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> implements Checkpoi

flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java:             extends FileInputFormat<String>

FYI

On Sun, Jul 30, 2017 at 12:26 PM, Mohit Anchlia <[hidden email]> wrote:
Thanks. Few more questions:

- Is there an example for FileInputFormat? 
- how to make it read all the files in a directory?
- how to make an inputformat a streaming input instead of batch? Eg: read as new files come to a dir.

Thanks again.

On Sun, Jul 30, 2017 at 12:53 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink calls the reachedEnd() method before it calls nextRecord() and closes the IF when reachedEnd() returns true.
So, it should not return true until nextRecord() was called and the first and last record was emitted.

You might also want to built your PDFFileInputFormat on FileInputFormat and set unsplittable to true.
FileInputFormat comes with lots of built-in functionality such as InputSplit generation.

Cheers, Fabian

2017-07-30 3:41 GMT+02:00 Mohit Anchlia <[hidden email]>:
Hi,

I created a custom input format. Idea behind this is to read all binary files from a directory and use each file as it's own split. Each split is read as one whole record. When I run it in flink I don't get any error but I am not seeing any output from .print. Am I missing something?

----

public class PDFFileInputFormat extends RichInputFormat<StringValue, InputSplit> {

private static final Logger logger = LoggerFactory.getLogger(PDFFileInputFormat.class.getName());

PDFFileInputSplit current = null;

public static void main(String... args) throws Exception {

PDFFileInputFormat pdfReader = new PDFFileInputFormat("c:\\proj\\test");

InputSplit[] splits = pdfReader.createInputSplits(1);

pdfReader.open(splits[0]);

pdfReader.nextRecord(null);

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.fromElements(1, 2, 3)

// returns the squared i

.print();

PDFFileInputFormat format = new PDFFileInputFormat("c:\\proj\\test");

InputFormatSourceFunction<StringValue> reader = new InputFormatSourceFunction<>(format,

TypeInformation.of(StringValue.class));

env.createInput(format,TypeInformation.of(StringValue.class)).print();

}

String path = null;

public PDFFileInputFormat(String path) {

this.path = path;

}

public void configure(Configuration parameters) {

// TODO Auto-generated method stub

}

public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {

// TODO Auto-generated method stub

return cachedStatistics;

}

public InputSplit[] createInputSplits(int minNumSplits) throws IOException {

final List<PDFFileInputSplit> splits = new ArrayList<PDFFileInputSplit>();

Files.list(Paths.get(path)).forEach(f -> {

PDFFileInputSplit split = new PDFFileInputSplit(splits.size(), f);

splits.add(split);

});

PDFFileInputSplit[] inputSplitArray = new PDFFileInputSplit[splits.size()];

return splits.toArray(inputSplitArray);

}

public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {

logger.info("Assigner");

// TODO Auto-generated method stub

return new DefaultInputSplitAssigner(inputSplits);

}

public void open(InputSplit split) throws IOException {

this.current = (PDFFileInputSplit) split;

}

public boolean reachedEnd() throws IOException {

// TODO Auto-generated method stub

return true;

}

public StringValue nextRecord(StringValue reuse) throws IOException {

String content = new String(Files.readAllBytes(this.current.getFile()));

logger.info("Content " + content);

return new StringValue(content);

}

public void close() throws IOException {

// TODO Auto-generated method stub

}

}

---


Thanks,

Mohit







Reply | Threaded
Open this post in threaded view
|

Re: Customer inputformat

Ted Yu
Did you use StreamExecutionEnvironment.createFileInput() ?

What did the modification times of the 2 files look like (were they the newest) ?

Cheers

On Mon, Jul 31, 2017 at 12:42 PM, Mohit Anchlia <[hidden email]> wrote:
Thanks! When I give path to a directory flink is only reading 2 files. It seems to be picking these 2 files randomly.

On Mon, Jul 31, 2017 at 12:05 AM, Fabian Hueske <[hidden email]> wrote:
Hi Mohit,

as Ted said, there are plenty of InputFormats which are based on FileInputFormat.
FileInputFormat also supports reading all files in a directory. Simply specify the path of the directory.

Check StreamExecutionEnvironment.createFileInput() which takes a several parameters such as a FileInputFormat and a time interval in which the directory is periodically checked.

Best, Fabian

2017-07-30 21:31 GMT+02:00 Ted Yu <[hidden email]>:
For #1, you can find quite a few classes which extend FileInputFormat.
e.g.

flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java:public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQuer
flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java:public abstract class BinaryInputFormat<T> extends FileInputFormat<T>
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java:public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> implements Checkpoi

flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java:             extends FileInputFormat<String>

FYI

On Sun, Jul 30, 2017 at 12:26 PM, Mohit Anchlia <[hidden email]> wrote:
Thanks. Few more questions:

- Is there an example for FileInputFormat? 
- how to make it read all the files in a directory?
- how to make an inputformat a streaming input instead of batch? Eg: read as new files come to a dir.

Thanks again.

On Sun, Jul 30, 2017 at 12:53 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink calls the reachedEnd() method before it calls nextRecord() and closes the IF when reachedEnd() returns true.
So, it should not return true until nextRecord() was called and the first and last record was emitted.

You might also want to built your PDFFileInputFormat on FileInputFormat and set unsplittable to true.
FileInputFormat comes with lots of built-in functionality such as InputSplit generation.

Cheers, Fabian

2017-07-30 3:41 GMT+02:00 Mohit Anchlia <[hidden email]>:
Hi,

I created a custom input format. Idea behind this is to read all binary files from a directory and use each file as it's own split. Each split is read as one whole record. When I run it in flink I don't get any error but I am not seeing any output from .print. Am I missing something?

----

public class PDFFileInputFormat extends RichInputFormat<StringValue, InputSplit> {

private static final Logger logger = LoggerFactory.getLogger(PDFFileInputFormat.class.getName());

PDFFileInputSplit current = null;

public static void main(String... args) throws Exception {

PDFFileInputFormat pdfReader = new PDFFileInputFormat("c:\\proj\\test");

InputSplit[] splits = pdfReader.createInputSplits(1);

pdfReader.open(splits[0]);

pdfReader.nextRecord(null);

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.fromElements(1, 2, 3)

// returns the squared i

.print();

PDFFileInputFormat format = new PDFFileInputFormat("c:\\proj\\test");

InputFormatSourceFunction<StringValue> reader = new InputFormatSourceFunction<>(format,

TypeInformation.of(StringValue.class));

env.createInput(format,TypeInformation.of(StringValue.class)).print();

}

String path = null;

public PDFFileInputFormat(String path) {

this.path = path;

}

public void configure(Configuration parameters) {

// TODO Auto-generated method stub

}

public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {

// TODO Auto-generated method stub

return cachedStatistics;

}

public InputSplit[] createInputSplits(int minNumSplits) throws IOException {

final List<PDFFileInputSplit> splits = new ArrayList<PDFFileInputSplit>();

Files.list(Paths.get(path)).forEach(f -> {

PDFFileInputSplit split = new PDFFileInputSplit(splits.size(), f);

splits.add(split);

});

PDFFileInputSplit[] inputSplitArray = new PDFFileInputSplit[splits.size()];

return splits.toArray(inputSplitArray);

}

public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {

logger.info("Assigner");

// TODO Auto-generated method stub

return new DefaultInputSplitAssigner(inputSplits);

}

public void open(InputSplit split) throws IOException {

this.current = (PDFFileInputSplit) split;

}

public boolean reachedEnd() throws IOException {

// TODO Auto-generated method stub

return true;

}

public StringValue nextRecord(StringValue reuse) throws IOException {

String content = new String(Files.readAllBytes(this.current.getFile()));

logger.info("Content " + content);

return new StringValue(content);

}

public void close() throws IOException {

// TODO Auto-generated method stub

}

}

---


Thanks,

Mohit