I've implemented a CustomSink with TwoPhaseCommit. To test this I've create a test using the baselines of this [1] one, and it works fine.
To test the integration with S3 (and with an exponential back off), I've tried to implement a new test, using the following code: ... val invalidWriter = writer .asInstanceOf[WindowParquetGenericRecordListFileSink] .copy(filePath = s"s3a://bucket_that_doesnt_exists/") val records: Iterable[GenericRecord] = Iterable apply { new GenericData.Record(GenericRecordSchema.schema) { put(KEY.name, "x") put(EVENT.name, "record.value()") put(LOGGER_TIMESTAMP.name, "2020-01-01T02:22:23.123456Z") put(EVENT_TYPE.name, "xpto") } } val env = StreamExecutionEnvironment.getExecutionEnvironment env .enableCheckpointing(1000) .fromElements(records) .addSink(invalidWriter) val task = executor.submit(() => env.execute("s3exponential")) ... This will setup a small environment with one record and enable checkpoint (in order for the TPC works), and then execute in another thread so the test check check if the error count is increasing. So, the test have the following behaviour: If I use enableCheckpointing(10), the test passes 9 of 10 times. If I use other values, like 1000, the test fails if not all the times, most of the times. Here is a small example of the log when the test is successful. 2020-03-05 16:04:40,342 DEBUG com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink - Invoke - f3b667c1-8d75-4ab8-9cab-71dfa6a71271 [1] 2020-03-05 16:04:40,342 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint (5) CHECKPOINT on task Sink: Unnamed (2/2) 2020-03-05 16:04:40,342 DEBUG org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - WindowParquetGenericRecordListFileSink 1/2 - checkpoint 5 triggered, flushing transaction 'TransactionHolder{handle=f3b667c1-8d75-4ab8-9cab-71dfa6a71271, transactionStartTime=1583424280304}' ### 2020-03-05 16:04:40,342 DEBUG com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink - Pre Commit - f3b667c1-8d75-4ab8-9cab-71dfa6a71271 [1] - openedTransactions [1] ### 2020-03-05 16:04:40,342 DEBUG com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink - operation='preCommit', message='Start writing #1 records' When the test fails, here is some part of the log: 2020-03-05 16:38:44,386 DEBUG com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink - Invoke - 7c5dd046-54b8-4b13-b360-397da6743b3b [1] 2020-03-05 16:38:44,386 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Aborting checkpoint via cancel-barrier 5 for task Sink: Unnamed (1/2) 2020-03-05 16:38:44,387 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner - Sink: Unnamed (1/2) (e5699032cb2eb874dcff740b6b7b62f1): End of stream alignment, feeding buffered data back. 2020-03-05 16:38:44,390 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - ReleaseOnConsumptionResultPartition 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9 [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions]: Received consumed notification for subpartition 0. 2020-03-05 16:38:44,390 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartitionManager - Received consume notification from ReleaseOnConsumptionResultPartition 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9 [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions]. 2020-03-05 16:38:44,390 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - Source: Collection Source (1/1) (b32dcfe3631e86d18cb893a258a6f0f9): Releasing ReleaseOnConsumptionResultPartition 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9 [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions]. Not sure why this behaviour changes with the time for each checkpoint, but until now I didn't find the reason why "pre commit" isn't execute Does anyone have any thought, something that I'm missing ? [1] https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java |
Hi David, bounded sources do not work well with checkpointing. As soon as the source is drained, no checkpoints are performed anymore. It's an unfortunate limitation that we want to get rid of, but haven't found the time (because it requires larger changes). So for your test to work, you need to add a source that is continuously open, but does not output more than one element. Fortunately, there is already a working implementation in our test bed. On Thu, Mar 5, 2020 at 7:54 PM David Magalhães <[hidden email]> wrote:
|
Awesome Arvid, thanks a lot! :) And I thought when doing this that I was simplifying the test ... On Thu, Mar 5, 2020 at 8:27 PM Arvid Heise <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |