Hi,
I want to know is it possible to use PipedInutStream and PipedOutputStream in Flink for reading text data from a file?
For example extending a RichSourceFunction for it and readata like this:
DataStream<String> raw = env.addSource(new PipedSource(file_path));
Actually i tried to implement a class for it but as PipedInputStream and PipedOutputStream should be on seperate Threads, I have no idea how to implement that.
Here is my template class
public static class PipedFile extends RichSourceFunction<String> {
PipedOutputStream outputPipe = new PipedOutputStream();
PipedInputStream inputPipe = new PipedInputStream();
FileInputStream fis;
PipedFile(String s) throws IOException {
outputPipe.connect(inputPipe);
fis = new FileInputStream("data_source.csv");
}
@Override
public void run(SourceContext sourceContext) throws Exception {
int length;
byte[] buffer = new byte[1024];
while ((length = fis.read(buffer, 0, 1024)) != -1) {
outputPipe.write(buffer, 0, length);
}
}
@Override
public void cancel() {
try {
outputPipe.close();
inputPipe.close();
fis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}