Hi to all,
I'm trying to test a streaming job but the files written by the BucketingSink are never finalized (remains into the pending state). Is this caused by the fact that the job finishes before the checkpoint? Shouldn't the sink properly close anyway? This is my code:
@Test public void testBucketingSink() throws Exception { final StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(senv); senv.enableCheckpointing(5000); DataStream<String> testStream = senv.fromElements(// "1,aaa,white", // "2,bbb,gray", // "3,ccc,white", // "4,bbb,gray", // "5,bbb,gray" // ); final RowTypeInfo rtf = new RowTypeInfo( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); DataStream<Row> rows = testStream.map(new MapFunction<String, Row>() { private static final long serialVersionUID = 1L; @Override public Row map(String str) throws Exception { String[] split = str.split(Pattern.quote(",")); Row ret = new Row(3); ret.setField(0, split[0]); ret.setField(1, split[1]); ret.setField(2, split[2]); return ret; } }).returns(rtf); String columnNames = "id,value,state"; final String dsName = "test"; tEnv.registerDataStream(dsName, rows, columnNames); final String whiteAreaFilter = "state = 'white'"; DataStream<Row> grayArea = rows; DataStream<Row> whiteArea = null; if (whiteAreaFilter != null) { String sql = "SELECT *, (%s) as _WHITE FROM %s"; sql = String.format(sql, whiteAreaFilter, dsName); Table table = tEnv.sql(sql); grayArea = tEnv.toDataStream(table.where("!_WHITE").select(columnNames), rtf); DataStream<Row> nw = tEnv.toDataStream(table.where("_WHITE").select(columnNames), rtf); whiteArea = whiteArea == null ? nw : whiteArea.union(nw); } Writer<Row> bucketSinkwriter = new RowCsvWriter("UTF-8", "\t", "\n"); String datasetWhiteDir = "/tmp/bucket/white"; BucketingSink<Row> whiteAreaSink = new BucketingSink<>(datasetWhiteDir.toString()); whiteAreaSink.setWriter(bucketSinkwriter); whiteAreaSink.setBatchSize(10); whiteArea.addSink(whiteAreaSink); String datasetGrayDir = "/tmp/bucket/gray"; BucketingSink<Row> grayAreaSink = new BucketingSink<>(datasetGrayDir.toString()); grayAreaSink.setWriter(bucketSinkwriter); grayAreaSink.setBatchSize(10); grayArea.addSink(grayAreaSink); JobExecutionResult jobInfo = senv.execute("Buketing sink test "); System.out.printf("Job took %s minutes", jobInfo.getNetRuntime(TimeUnit.MINUTES)); } public class RowCsvWriter extends StreamWriterBase<Row> { private static final long serialVersionUID = 1L; private final String charsetName; private transient Charset charset; private String fieldDelimiter; private String recordDelimiter; private boolean allowNullValues = true; private boolean quoteStrings = false; /** * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert strings to * bytes. */ public RowCsvWriter() { this("UTF-8", CsvOutputFormat.DEFAULT_FIELD_DELIMITER, CsvOutputFormat.DEFAULT_LINE_DELIMITER); } /** * Creates a new {@code StringWriter} that uses the given charset to convert strings to bytes. * * @param charsetName Name of the charset to be used, must be valid input for * {@code Charset.forName(charsetName)} */ public RowCsvWriter(String charsetName, String fieldDelimiter, String recordDelimiter) { this.charsetName = charsetName; this.fieldDelimiter = fieldDelimiter; this.recordDelimiter = recordDelimiter; } @Override public void open(FileSystem fs, Path path) throws IOException { super.open(fs, path); try { this.charset = Charset.forName(charsetName); } catch (IllegalCharsetNameException ex) { throw new IOException("The charset " + charsetName + " is not valid.", ex); } catch (UnsupportedCharsetException ex) { throw new IOException("The charset " + charsetName + " is not supported.", ex); } } @Override public void write(Row element) throws IOException { FSDataOutputStream outputStream = getStream(); writeRow(element, outputStream); } private void writeRow(Row element, FSDataOutputStream out) throws IOException { int numFields = element.getArity(); for (int i = 0; i < numFields; i++) { Object obj = element.getField(i); if (obj != null) { if (i != 0) { out.write(this.fieldDelimiter.getBytes(charset)); } if (quoteStrings) { if (obj instanceof String || obj instanceof StringValue) { out.write('"'); out.write(obj.toString().getBytes(charset)); out.write('"'); } else { out.write(obj.toString().getBytes(charset)); } } else { out.write(obj.toString().getBytes(charset)); } } else { if (this.allowNullValues) { if (i != 0) { out.write(this.fieldDelimiter.getBytes(charset)); } } else { throw new RuntimeException("Cannot write tuple with <null> value at position: " + i); } } } // add the record delimiter out.write(this.recordDelimiter.getBytes(charset)); } @Override public Writer<Row> duplicate() { return new RowCsvWriter(charsetName, fieldDelimiter, recordDelimiter); } } Any help is appreciated, Flavio |
Hi Flavio,
If I understand correctly, I think you bumped into this issue: https://issues.apache.org/jira/browse/FLINK-2646 There is also a similar discussion on the BucketingSink here: Kostas
|
Hi,
Expanding a bit on Kostas' answer. Yes, your analysis is correct, the problem is that the job is shutting down before a last checkpoint can "confirm" the written bucket data by moving it to the final state. The problem, as Kostas noted is that a user function (and thus also BucketingSink) does not know whether close() is being called because of a failure or because normal job shutdown. Therefore, we cannot move data to the final stage there. Once we have the issue that Kostas posted resolve we can also resolve this problem for the BucketingSink. Best, Aljoscha
|
For the moment I give up with streaming...too many missing/unclear features wrt batch. For example:
Maybe I'm wrong with those points but the attempt to replace my current batch system with a streaming one had no luck with those points. Best, Flavio On Fri, Sep 8, 2017 at 5:29 PM, Aljoscha Krettek <[hidden email]> wrote:
|
Hi Flavio, Let me try to understand / look at some of the problems you have encountered.
What do you mean be which "checkpointing system” to use? Do you mean state backends? Typically, you would only get OOM exceptions for memory-backed state backends if the state size exceeds the memory capacity. State sizes can be queried from the REST APIs / Web UI.
This sounds like a bug that we should look into. Do you have any logs on which you observed this?
Would you be up to opening up JIRAs for what you think is missing (if there isn’t one already)?
Maybe we can add some built-in metric in the ES sink connector that tracks the number of successfully indexed elements, which can then be queried from the REST API / Web UI. That wouldn’t be too much effort. What do you think, would that be useful for your case? Would be happy to hear your thoughts on this! Cheers, Gordon On 12 September 2017 at 11:36:27 AM, Flavio Pompermaier ([hidden email]) wrote:
|
Ah, sorry, one correction. Just realized there’s already some analysis of the BucketingSink closing issue in this mail thread. Please ignore my request for relevant logs :) On 13 September 2017 at 10:56:10 AM, Tzu-Li (Gordon) Tai ([hidden email]) wrote:
|
In reply to this post by Tzu-Li (Gordon) Tai
Hi Gordon,
thanks for your feedback. The main problem for me is that moving from batch to stream should be much easier IMHO. Rows should be a first class citizen in Flink and should be VERY easy to read/write them, while at the moment it seems that Tuples are the dominating type...I don't want to write a serializer/outputFormat to persist Rows as Parquet, Avro, Thrift, OCR, Kudu, Hive, etc..I expect to have some already existing (and mantained) connector already available somewhere. The case of the Parquet Rollink sink is just an example. Regarding state backends I think that its not so easy to understand how to design and monitor it properly: there are many parameters/variables to take into account and it would be helpful to have a proper hands-on training course/certification about this... About ES indexing monitoring see my discussion with Chesnay at http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-job-monitoring-td13583.html: what I need is just to have recordsIn/recordsOut reflecting real values. Best, Flavio On Wed, Sep 13, 2017 at 10:56 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
|
I've just looked at Robert presentation at FF [1] and that's exactly what I was waiting for streaming planning/training...
Very useful ;) [1] https://www.youtube.com/watch?v=8l8dCKMMWkw On Wed, Sep 13, 2017 at 12:04 PM, Flavio Pompermaier <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |