http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Signal-for-End-of-Stream-tp20002p20027.html
Thanks a lot, Fabian for your response.
What I understand is that if I write my own Sourcefunction such that it handles the "end of stream” record and make the source exit from run() method, the flink program will terminate.
@Override
public void run(SourceContext<String> ctx) throws Exception {
final StringBuilder buffer = new StringBuilder();
long attempt = 0;
while (isRunning) {
try (Socket socket = new Socket()) {
currentSocket = socket;
LOG.info("Custom: Connecting to server socket " + hostname + ':' + port);
socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
char[] cbuf = new char[8192];
int bytesRead;
while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
buffer.append(cbuf, 0, bytesRead);
int delimPos;
while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {
String record = buffer.substring(0, delimPos);
if(record.equals("END")) {
isRunning = false;
buffer.delete(0, delimPos + delimiter.length());
break;
}
// truncate trailing carriage return
if (delimiter.equals("\n") && record.endsWith("\r")) {
record = record.substring(0, record.length() - 1);
}
ctx.collect(record);
buffer.delete(0, delimPos + delimiter.length());
}
}
}
// if we dropped out of this loop due to an EOF, sleep and retry
if (isRunning) {
attempt++;
if (maxNumRetries == -1 || attempt < maxNumRetries) {
LOG.warn("Lost connection to server socket. Retrying in " + delayBetweenRetries + " msecs...");
Thread.sleep(delayBetweenRetries);
}
else {
// this should probably be here, but some examples expect simple exists of the stream source
// throw new EOFException("Reached end of stream and reconnects are not enabled.");
break;
}
}
}
// collect trailing data
if (buffer.length() > 0) {
ctx.collect(buffer.toString());
}
}