Hi, I created a custom input format. Idea behind this is to read all binary files from a directory and use each file as it's own split. Each split is read as one whole record. When I run it in flink I don't get any error but I am not seeing any output from .print. Am I missing something? ---- public class PDFFileInputFormat extends RichInputFormat<StringValue, InputSplit> {
private static final Logger logger = LoggerFactory.getLogger(PDFFileInputFormat.class.getName());
PDFFileInputSplit current = null;
public static void main(String... args) throws Exception { PDFFileInputFormat pdfReader = new PDFFileInputFormat("c:\\proj\\test"); InputSplit[] splits = pdfReader.createInputSplits(1); pdfReader.open(splits[0]); pdfReader.nextRecord(null);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2, 3) // returns the squared i .print();
PDFFileInputFormat format = new PDFFileInputFormat("c:\\proj\\test");
InputFormatSourceFunction<StringValue> reader = new InputFormatSourceFunction<>(format, TypeInformation.of(StringValue.class));
env.createInput(format,TypeInformation.of(StringValue.class)).print();
}
String path = null;
public PDFFileInputFormat(String path) { this.path = path; }
public void configure(Configuration parameters) { // TODO Auto-generated method stub
}
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { // TODO Auto-generated method stub return cachedStatistics; }
public InputSplit[] createInputSplits(int minNumSplits) throws IOException { final List<PDFFileInputSplit> splits = new ArrayList<PDFFileInputSplit>(); Files.list(Paths.get(path)).forEach(f -> { PDFFileInputSplit split = new PDFFileInputSplit(splits.size(), f); splits.add(split); }); PDFFileInputSplit[] inputSplitArray = new PDFFileInputSplit[splits.size()]; return splits.toArray(inputSplitArray); }
public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) { logger.info("Assigner"); // TODO Auto-generated method stub return new DefaultInputSplitAssigner(inputSplits); }
public void open(InputSplit split) throws IOException { this.current = (PDFFileInputSplit) split; }
public boolean reachedEnd() throws IOException { // TODO Auto-generated method stub return true; }
public StringValue nextRecord(StringValue reuse) throws IOException { String content = new String(Files.readAllBytes(this.current.getFile())); logger.info("Content " + content); return new StringValue(content); }
public void close() throws IOException { // TODO Auto-generated method stub
} } --- Thanks, Mohit |
Hi, Flink calls the reachedEnd() method before it calls nextRecord() and closes the IF when reachedEnd() returns true.2017-07-30 3:41 GMT+02:00 Mohit Anchlia <[hidden email]>:
|
Thanks. Few more questions: - how to make it read all the files in a directory? - how to make an inputformat a streaming input instead of batch? Eg: read as new files come to a dir. Thanks again. On Sun, Jul 30, 2017 at 12:53 AM, Fabian Hueske <[hidden email]> wrote:
|
For #1, you can find quite a few classes which extend FileInputFormat. e.g. flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java:public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQuer flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java:public abstract class BinaryInputFormat<T> extends FileInputFormat<T> flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java:public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> implements Checkpoi flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java: extends FileInputFormat<String> FYI On Sun, Jul 30, 2017 at 12:26 PM, Mohit Anchlia <[hidden email]> wrote:
|
Hi Mohit, as Ted said, there are plenty of InputFormats which are based on FileInputFormat.Check StreamExecutionEnvironment.createFileInput() which takes a several parameters such as a FileInputFormat and a time interval in which the directory is periodically checked. 2017-07-30 21:31 GMT+02:00 Ted Yu <[hidden email]>:
|
Thanks! When I give path to a directory flink is only reading 2 files. It seems to be picking these 2 files randomly. On Mon, Jul 31, 2017 at 12:05 AM, Fabian Hueske <[hidden email]> wrote:
|
Did you use StreamExecutionEnvironment. What did the modification times of the 2 files look like (were they the newest) ? Cheers On Mon, Jul 31, 2017 at 12:42 PM, Mohit Anchlia <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |