Hi, Flink Streaming Sink is designed to use global counter when creating files to avoid overwrites. I am running Flink 1.8.2 with Kinesis Analytics (managed flink provided by AWS) with bulk writes (rolling policy is hardcoded to roll over on checkpoint). My job is configured to checkpoint every minute. Job is running with parallelism 1. The problem is that the same counter 616 is used for both files invalid-records/2020-01-22T15_06_00Z/part-0-616 and invalid-records/2020-01-22T15_05_00Z/part-0-616. 15:06:37 {
"locationInformation": "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)",
"logger": "org.apache.flink.fs.s3.common.writer.S3Committer",
"message": "Committing invalid-records/2020-01-22T15_06_00Z/part-0-616 with MPU ID f7PQc2D82.kKaDRS.RXYYS8AkLd5q_9ogw3WZJJg2KGABhYgjtv.eJbqQ_UwpzciYb.TDTIkixulkmaTMyyuwmr6c5eC61aenoo2m4cj7wAT9v0JXB3i6gBArw.HpSLxpUBTEW6PT3aN9XKPZmT2kg--",
"threadName": "Async calls on Source: Custom Source -> Extract Td-agent message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)",
"applicationARN": "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel",
"applicationVersionId": "33",
"messageSchemaVersion": "1",
"messageType": "INFO"} } 15:07:37 {
"locationInformation": "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)",
"logger": "org.apache.flink.fs.s3.common.writer.S3Committer",
"message": "Committing invalid-records/2020-01-22T15_05_00Z/part-0-616 with MPU ID XoliYkdvP1Cc3gePyteIGhTqF1LrID8rEyddaPXRNPQYkWDNKpDF0tnYuhDBqywAqCWf4nJTOJ2Kx_a_91KTyVTvZ7GkKs25nseGs4jDR6Y5Nxuai47aKNeWeS4bs9imMJ1iAxbd7lRQyxnM5qwDeA--",
"threadName": "Async calls on Source: Custom Source -> Extract Td-agent message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)",
"applicationARN": "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel",
"applicationVersionId": "33",
"messageSchemaVersion": "1",
"messageType": "INFO"
} Thanks, Pawel |
I have looked into the source code and it looks likes that the same counter counter value being used in two buckets is correct. Each Bucket class https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java is passed partCounter in the constructor. Whenever part file is rolled over then counter is incremented within the scope of this bucket. It can happen that there are two or more active buckets and counter is increased independently inside them so that they are become equal. However, global max counter maintained by Buckets class always keeps the max part counter so that when new bucket is created is passed the correct part counter. I have done my analysis based on the logs from my job. I highlighted the same counter value used for part-0-8. 2020-01-24 14:57:41 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-6" for bucket id=2020-01-24T14_54_00Z. Thanks, Pawel On Thu, 23 Jan 2020 at 23:29, Pawel Bartoszek <[hidden email]> wrote:
|
Hi Pawel,
You are correct that counters are unique within the same bucket but NOT across buckets. Across buckets, you may see the same counter being used. The max counter is used only upon restoring from a failure, resuming from a savepoint or rescaling and this is done to guarantee that n valid data are overwritten while limiting the state that Flink has to keep internally. For a more detailed discussion about the why, you can have a look here: https://issues.apache.org/jira/browse/FLINK-13609 Cheers, Kostas On Fri, Jan 24, 2020 at 5:16 PM Pawel Bartoszek <[hidden email]> wrote: > > I have looked into the source code and it looks likes that the same counter counter value being used in two buckets is correct. > Each Bucket class https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java is passed partCounter in the constructor. Whenever part file is rolled over then counter is incremented within the scope of this bucket. It can happen that there are two or more active buckets and counter is increased independently inside them so that they are become equal. However, global max counter maintained by Buckets class always keeps the max part counter so that when new bucket is created is passed the correct part counter. > > I have done my analysis based on the logs from my job. I highlighted the same counter value used for part-0-8. > > 2020-01-24 14:57:41 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-6" for bucket id=2020-01-24T14_54_00Z. > 2020-01-24 14:57:41 [Async Sink: Unnamed (1/1)] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=7. > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=8 (max part counter=7). > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on checkpoint. > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and bucketPath=s3://xxx > 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to element > 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-7" for bucket id=2020-01-24T14_54_00Z. > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=8. > 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to element > 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-8" for bucket id=2020-01-24T14_55_00Z. > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=9 (max part counter=9). > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on checkpoint. > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and bucketPath=s3://xxx > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z on checkpoint. > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and bucketPath=s3://xxx > 2020-01-24 14:58:41 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to element > 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-8" for bucket id=2020-01-24T14_54_00Z. > 2020-01-24 14:58:42 [Async Sink: Unnamed (1/1)] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=9. > 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to element > 2020-01-24 14:58:43 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-9" for bucket id=2020-01-24T14_55_00Z. > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=10 (max part counter=10). > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on checkpoint. > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and bucketPath=s3://xxx > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z on checkpoint. > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and bucketPath=s3://xxx > > > Thanks, > Pawel > > > On Thu, 23 Jan 2020 at 23:29, Pawel Bartoszek <[hidden email]> wrote: >> >> Hi, >> >> >> Flink Streaming Sink is designed to use global counter when creating files to avoid overwrites. I am running Flink 1.8.2 with Kinesis Analytics (managed flink provided by AWS) with bulk writes (rolling policy is hardcoded to roll over on checkpoint). >> My job is configured to checkpoint every minute. Job is running with parallelism 1. >> >> The problem is that the same counter 616 is used for both files invalid-records/2020-01-22T15_06_00Z/part-0-616 and invalid-records/2020-01-22T15_05_00Z/part-0-616. >> >> 15:06:37 >> { "locationInformation": "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)", "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": "Committing invalid-records/2020-01-22T15_06_00Z/part-0-616 with MPU ID f7PQc2D82.kKaDRS.RXYYS8AkLd5q_9ogw3WZJJg2KGABhYgjtv.eJbqQ_UwpzciYb.TDTIkixulkmaTMyyuwmr6c5eC61aenoo2m4cj7wAT9v0JXB3i6gBArw.HpSLxpUBTEW6PT3aN9XKPZmT2kg--", "threadName": "Async calls on Source: Custom Source -> Extract Td-agent message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)", "applicationARN": "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel", "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType": "INFO"} >> } >> 15:07:37 >> { "locationInformation": "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)", "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": "Committing invalid-records/2020-01-22T15_05_00Z/part-0-616 with MPU ID XoliYkdvP1Cc3gePyteIGhTqF1LrID8rEyddaPXRNPQYkWDNKpDF0tnYuhDBqywAqCWf4nJTOJ2Kx_a_91KTyVTvZ7GkKs25nseGs4jDR6Y5Nxuai47aKNeWeS4bs9imMJ1iAxbd7lRQyxnM5qwDeA--", "threadName": "Async calls on Source: Custom Source -> Extract Td-agent message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)", "applicationARN": "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel", "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType": "INFO" } >> >> Thanks, >> Pawel |
Hi Kostas, Thanks for confirming that. I started thinking it might be useful or more user friendly to use unique counter across buckets for the same operator subtask? The way I could imagine this working is to pass max counter to the https://github.com/apache/flink/blob/e7e24471240dbaa6b5148d406575e57d170b1623/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L204 write method? or bucket holding instance of Buckets class and accessing global counter from there? As far as I know the write method invocation is guaranteed to be thread safe for the same sub operator instance. Thanks, Pawel On Fri, 24 Jan 2020 at 20:45, Kostas Kloudas <[hidden email]> wrote: Hi Pawel, |
Hi Pawel,
You are correct that the write method invocation is guaranteed to be thread safe for the same sub operator instance. But I am not sure if having a unique counter per subtask across buckets would add much to the user experience of the sink. I think that in both cases, the interpretation of the part files would be the same. I may be wrong though so please let me know if this is a deal breaker for you. Cheers, Kostas On Sat, Jan 25, 2020 at 11:48 AM Pawel Bartoszek <[hidden email]> wrote: > > Hi Kostas, > > Thanks for confirming that. I started thinking it might be useful or more user friendly to use unique counter across buckets for the same operator subtask? > The way I could imagine this working is to pass max counter to the https://github.com/apache/flink/blob/e7e24471240dbaa6b5148d406575e57d170b1623/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L204 write method? or bucket holding instance of Buckets class and accessing global counter from there? As far as I know the write method invocation is guaranteed to be thread safe for the same sub operator instance. > > Thanks, > Pawel > > > On Fri, 24 Jan 2020 at 20:45, Kostas Kloudas <[hidden email]> wrote: >> >> Hi Pawel, >> >> You are correct that counters are unique within the same bucket but >> NOT across buckets. Across buckets, you may see the same counter being >> used. >> The max counter is used only upon restoring from a failure, resuming >> from a savepoint or rescaling and this is done to guarantee that n >> valid data are overwritten while limiting the state that Flink has to >> keep internally. For a more detailed discussion about the why, you can >> have a look here: https://issues.apache.org/jira/browse/FLINK-13609 >> >> Cheers, >> Kostas >> >> On Fri, Jan 24, 2020 at 5:16 PM Pawel Bartoszek >> <[hidden email]> wrote: >> > >> > I have looked into the source code and it looks likes that the same counter counter value being used in two buckets is correct. >> > Each Bucket class https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java is passed partCounter in the constructor. Whenever part file is rolled over then counter is incremented within the scope of this bucket. It can happen that there are two or more active buckets and counter is increased independently inside them so that they are become equal. However, global max counter maintained by Buckets class always keeps the max part counter so that when new bucket is created is passed the correct part counter. >> > >> > I have done my analysis based on the logs from my job. I highlighted the same counter value used for part-0-8. >> > >> > 2020-01-24 14:57:41 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-6" for bucket id=2020-01-24T14_54_00Z. >> > 2020-01-24 14:57:41 [Async Sink: Unnamed (1/1)] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=7. >> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=8 (max part counter=7). >> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on checkpoint. >> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and bucketPath=s3://xxx >> > 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to element >> > 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-7" for bucket id=2020-01-24T14_54_00Z. >> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=8. >> > 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to element >> > 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-8" for bucket id=2020-01-24T14_55_00Z. >> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=9 (max part counter=9). >> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on checkpoint. >> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and bucketPath=s3://xxx >> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z on checkpoint. >> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and bucketPath=s3://xxx >> > 2020-01-24 14:58:41 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to element >> > 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-8" for bucket id=2020-01-24T14_54_00Z. >> > 2020-01-24 14:58:42 [Async Sink: Unnamed (1/1)] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=9. >> > 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to element >> > 2020-01-24 14:58:43 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-9" for bucket id=2020-01-24T14_55_00Z. >> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=10 (max part counter=10). >> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on checkpoint. >> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and bucketPath=s3://xxx >> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z on checkpoint. >> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and bucketPath=s3://xxx >> > >> > >> > Thanks, >> > Pawel >> > >> > >> > On Thu, 23 Jan 2020 at 23:29, Pawel Bartoszek <[hidden email]> wrote: >> >> >> >> Hi, >> >> >> >> >> >> Flink Streaming Sink is designed to use global counter when creating files to avoid overwrites. I am running Flink 1.8.2 with Kinesis Analytics (managed flink provided by AWS) with bulk writes (rolling policy is hardcoded to roll over on checkpoint). >> >> My job is configured to checkpoint every minute. Job is running with parallelism 1. >> >> >> >> The problem is that the same counter 616 is used for both files invalid-records/2020-01-22T15_06_00Z/part-0-616 and invalid-records/2020-01-22T15_05_00Z/part-0-616. >> >> >> >> 15:06:37 >> >> { "locationInformation": "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)", "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": "Committing invalid-records/2020-01-22T15_06_00Z/part-0-616 with MPU ID f7PQc2D82.kKaDRS.RXYYS8AkLd5q_9ogw3WZJJg2KGABhYgjtv.eJbqQ_UwpzciYb.TDTIkixulkmaTMyyuwmr6c5eC61aenoo2m4cj7wAT9v0JXB3i6gBArw.HpSLxpUBTEW6PT3aN9XKPZmT2kg--", "threadName": "Async calls on Source: Custom Source -> Extract Td-agent message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)", "applicationARN": "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel", "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType": "INFO"} >> >> } >> >> 15:07:37 >> >> { "locationInformation": "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)", "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": "Committing invalid-records/2020-01-22T15_05_00Z/part-0-616 with MPU ID XoliYkdvP1Cc3gePyteIGhTqF1LrID8rEyddaPXRNPQYkWDNKpDF0tnYuhDBqywAqCWf4nJTOJ2Kx_a_91KTyVTvZ7GkKs25nseGs4jDR6Y5Nxuai47aKNeWeS4bs9imMJ1iAxbd7lRQyxnM5qwDeA--", "threadName": "Async calls on Source: Custom Source -> Extract Td-agent message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)", "applicationARN": "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel", "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType": "INFO" } >> >> >> >> Thanks, >> >> Pawel |
Free forum by Nabble | Edit this page |