Flink Customized read text file

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Flink Customized read text file

Soheil Pourbafrani
Hi,

I want to know is it possible to use PipedInutStream and PipedOutputStream in Flink for reading text data from a file?
For example extending a RichSourceFunction for it and readata like this:

DataStream<String> raw = env.addSource(new PipedSource(file_path));

Actually i tried to implement a class for it but as PipedInputStream and PipedOutputStream should be on seperate Threads, I have no idea how to implement that.

Here is my template class

public static class PipedFile extends RichSourceFunction<String> {
PipedOutputStream outputPipe = new PipedOutputStream();
PipedInputStream inputPipe = new PipedInputStream();
FileInputStream fis;


PipedFile(String s) throws IOException {
outputPipe.connect(inputPipe);
fis = new FileInputStream("data_source.csv");
}
@Override
public void run(SourceContext sourceContext) throws Exception {
int length;
byte[] buffer = new byte[1024];
while ((length = fis.read(buffer, 0, 1024)) != -1) {
outputPipe.write(buffer, 0, length);
}
}

@Override
public void cancel() {
try {
outputPipe.close();
inputPipe.close();
fis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}