I'm trying to stream log messages (syslog fed into Kafak) into Parquet files on HDFS via Flink. I'm able to read, parse, and construct objects for my messages in Flink; however, writing to Parquet is tripping me up. I do *not* need to have this be real-time; a delay of a few minutes, even up to an hour, is fine. I've found the following articles talking about this being very difficult: All of these posts speak of troubles using the check-pointing mechanisms and Parquets need to perform batch writes. I'm not experienced enough with Flink's check-pointing or Parquet's file format to completely understand the issue. So my questions are as follows: 1) Is this possible in Flink in an exactly-once way? If not, is it possible in a way that _might_ cause duplicates during an error? 2) Is there another/better format to use other than Parquet that offers compression and the ability to be queried by something like Drill or Impala? 3) Any further recommendations for solving the overall problem: ingesting syslogs and writing them to a file(s) that is searchable by an SQL(-like) framework? Thanks! Bill- |
Hi Bill, I wrote those two medium posts you mentioned above. But clearly, the techlab one is much better I would suggest just "close the file when checkpointing" which is the easiest way. If you use BucketingSink, you can modify the code to make it work. Just replace the code from line 691 to 693 with closeCurrentPartFile() This should guarantee exactly-once. You may have some files with underscore prefix when flink job failed. But usually those files are ignored by the query engine/ readers for example, Presto If you use 1.6 and later, I think the issue is already addressed Thanks Hao On Fri, Sep 28, 2018 at 1:57 PM William Speirs <[hidden email]> wrote:
Thanks - Hao |
Hi Bill, Flink 1.6.0 supports writing Avro records as Parquet files to HDFS via the previously mentioned StreamingFileSink [1], [2]. Best, Fabian Am Fr., 28. Sep. 2018 um 23:36 Uhr schrieb hao gao <[hidden email]>:
|
Nice to see this finally! On Mon, Oct 1, 2018 at 1:53 AM Fabian Hueske <[hidden email]> wrote:
Thank you ~/Das |
In reply to this post by Fabian Hueske-2
Hi Fabian, Kostas,
From the description of this ticket https://issues.apache.org/jira/browse/FLINK-9753, I understand that now my output parquet file with StreamingFileSink will span multiple checkpoints. However, when I tried (as in the here below code snippet) I still see that one "part-X-X" file is created after each checkpoint. Is there any other configuration that I'm missing? BTW, I have another question regarding StreamingFileSink.BulkFormatBuilder.withBucketCheckInterval(). As per the notes at the end of this page StreamingFileSink <https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html> , buck-enconding can only combined with OnCheckpointRollingPolicy, which rolls on every checkpoint. So setting that CheckInterval makes no difference. So why should we expose that withBucketCheckInterval method? Thanks and best regards, Averell def buildSink[T <: MyBaseRecord](outputPath: String)(implicit ct: ClassTag[T]): StreamingFileSink[T] = { StreamingFileSink.forBulkFormat(new Path(outputPath), ParquetAvroWriters.forReflectRecord(ct.runtimeClass)).asInstanceOf[StreamingFileSink.BulkFormatBuilder[T, String]] .withBucketCheckInterval(5L * 60L * 1000L) .withBucketAssigner(new DateTimeBucketAssigner[T]("yyyy-MM-dd--HH")) .build() } -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell,
You are right that for Bulk Formats like Parquet, we roll on every checkpoint. This is currently a limitation that has to do with the fact that bulk formats gather and rely on metadata that they keep internally and which we cannot checkpoint in Flink,as they do not expose them. Setting the checkpoint interval affects how big your part files are going to be and, in some cases, how efficient your compression is going to be. In some cases, the more the data to be compressed, the better to compression ratio. Exposing the withBucketCheckInterval() you are right that it does not serve much for the moment. Cheers, Kostas > On Oct 5, 2018, at 1:54 AM, Averell <[hidden email]> wrote: > > Hi Fabian, Kostas, > > From the description of this ticket > https://issues.apache.org/jira/browse/FLINK-9753, I understand that now my > output parquet file with StreamingFileSink will span multiple checkpoints. > However, when I tried (as in the here below code snippet) I still see that > one "part-X-X" file is created after each checkpoint. Is there any other > configuration that I'm missing? > > BTW, I have another question regarding > StreamingFileSink.BulkFormatBuilder.withBucketCheckInterval(). As per the > notes at the end of this page StreamingFileSink > <https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html> > , buck-enconding can only combined with OnCheckpointRollingPolicy, which > rolls on every checkpoint. So setting that CheckInterval makes no > difference. So why should we expose that withBucketCheckInterval method? > > Thanks and best regards, > Averell > > def buildSink[T <: MyBaseRecord](outputPath: String)(implicit ct: > ClassTag[T]): StreamingFileSink[T] = { > StreamingFileSink.forBulkFormat(new Path(outputPath), > ParquetAvroWriters.forReflectRecord(ct.runtimeClass)).asInstanceOf[StreamingFileSink.BulkFormatBuilder[T, > String]] > .withBucketCheckInterval(5L * 60L * 1000L) > .withBucketAssigner(new DateTimeBucketAssigner[T]("yyyy-MM-dd--HH")) > .build() > } > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Kostas,
Thanks for the info. Just one more question regarding writing parquet. I need to write my stream as parquet to S3. As per this ticket https://issues.apache.org/jira/browse/FLINK-9752 <https://issues.apache.org/jira/browse/FLINK-9752> , it is now not supported. Is there any ready-to-use solution that supports copying/moving file from HDFS to S3 (something like a trigger from Flink after it has finished writing to HDFS). Thanks and best regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell,
There is no such “out-of-the-box” solution, but there is an open PR for adding S3 support to the StreamingFileSink [1]. Cheers, Kostas
|
What a great news.
Thanks for that, Kostas. Regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
This post was updated on .
In reply to this post by Kostas Kloudas
Hi Kostas,
I tried your PR - trying to write to S3 from Flink running on AWS EMR, and I got the following error. I copied the three jar files flink-hadoop-fs-1.7-SNAPSHOT.jar, flink-s3-fs-base-1.7-SNAPSHOT.jar, flink-s3-fs-hadoop-1.7-SNAPSHOT.jar to lib/ directory. Do I need to make any change to HADOOP configurations? Thanks and best regards, Averell java.lang.Exception: unable to establish the security context at org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:73) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1118) Caused by: java.lang.RuntimeException: java.lang.RuntimeException: class org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not org.apache.hadoop.security.GroupMappingServiceProvider at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2503) at org.apache.hadoop.security.Groups.<init>(Groups.java:106) at org.apache.hadoop.security.Groups.<init>(Groups.java:101) at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:448) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:331) at org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:359) at org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:70) at org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:67) ... 1 more Caused by: java.lang.RuntimeException: class org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not org.apache.hadoop.security.GroupMappingServiceProvider at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2497) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Kostas Kloudas
Hi Kostas,
Please help ignore my previous email about the issue with security. It seems to I had mixed version of shaded and unshaded jars. However, I'm now facing another issue with writing parquet files: only the first part is closed. All the subsequent parts are kept in in-progress state forever. My settings are to have checkpoint every 3 minutes. Sink parallelism set to 1 (my tries to set to 4 or 30 showed no difference). BucketID assigner is using event-timestamp. I only got this issue when running Flink on a yarn cluster, either writing to file:/// or to S3. When I ran it on my laptop, I got one part for every single checkpoint. TM logs says something like "*BucketState ... has pending files for checkpoints: {2 }*" Could you please help on how can I further debug this? Here below is the TM log: 2018-10-06 14:39:01.197 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=dt=2018-09-22 due to element SdcRecord(1537656300000,meter0219838,R1.S1.LT1.P25). 2018-10-06 14:39:01.197 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=dt=2018-09-22 due to element SdcRecord(1537656300000,meter0219838,R1.S1.LT1.P25). 2018-10-06 14:39:01.984 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-0" for bucket id=dt=2018-09-22. 2018-10-06 14:39:01.984 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-0" for bucket id=dt=2018-09-22. 2018-10-06 14:40:17.855 [Sink: Unnamed (1/1)] INFO o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=2 (max part counter=1). 2018-10-06 14:40:17.855 [Sink: Unnamed (1/1)] INFO o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=2 (max part counter=1). 2018-10-06 14:40:17.855 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint. 2018-10-06 14:40:17.855 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint. 2018-10-06 14:40:18.254 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=dt=2018-09-22 and bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files for checkpoints: {2 } 2018-10-06 14:40:18.254 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=dt=2018-09-22 and bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files for checkpoints: {2 } 2018-10-06 14:40:44.069 [Async calls on Sink: Unnamed (1/1)] INFO o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=2. 2018-10-06 14:40:44.069 [Async calls on Sink: Unnamed (1/1)] INFO o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=2. 2018-10-06 14:40:46.691 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=dt=2018-09-22 due to element SdcRecord(1537656300000,meter0207081,R1.S1.LT1.P25). 2018-10-06 14:40:46.691 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=dt=2018-09-22 due to element SdcRecord(1537656300000,meter0207081,R1.S1.LT1.P25). 2018-10-06 14:40:46.765 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-1" for bucket id=dt=2018-09-22. 2018-10-06 14:40:46.765 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-1" for bucket id=dt=2018-09-22. 2018-10-06 14:43:17.831 [Sink: Unnamed (1/1)] INFO o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=3 (max part counter=2). 2018-10-06 14:43:17.831 [Sink: Unnamed (1/1)] INFO o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=3 (max part counter=2). 2018-10-06 14:43:17.831 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint. 2018-10-06 14:43:17.831 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint. 2018-10-06 14:43:18.401 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=dt=2018-09-22 and bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files for checkpoints: {3 } 2018-10-06 14:43:18.401 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=dt=2018-09-22 and bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files for checkpoints: {3 } 2018-10-06 14:45:59.276 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=dt=2018-09-22 due to element SdcRecord(1537657200000,meter0218455,R1.S1.LT1.P10). 2018-10-06 14:45:59.276 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=dt=2018-09-22 due to element SdcRecord(1537657200000,meter0218455,R1.S1.LT1.P10). 2018-10-06 14:45:59.334 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-2" for bucket id=dt=2018-09-22. 2018-10-06 14:45:59.334 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-2" for bucket id=dt=2018-09-22. 2018-10-06 14:46:17.825 [Sink: Unnamed (1/1)] INFO o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=4 (max part counter=3). 2018-10-06 14:46:17.825 [Sink: Unnamed (1/1)] INFO o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=4 (max part counter=3). 2018-10-06 14:46:17.825 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint. 2018-10-06 14:46:17.825 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint. 2018-10-06 14:46:18.228 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=dt=2018-09-22 and bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files for checkpoints: {3 4 } 2018-10-06 14:46:18.228 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=dt=2018-09-22 and bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files for checkpoints: {3 4 } 2018-10-06 14:46:25.041 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=dt=2018-09-22 due to element SdcRecord(1537657200000,meter0209471,R1.S1.LT1.P25). 2018-10-06 14:46:25.041 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=dt=2018-09-22 due to element SdcRecord(1537657200000,meter0209471,R1.S1.LT1.P25). 2018-10-06 14:46:25.186 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-3" for bucket id=dt=2018-09-22. 2018-10-06 14:46:25.186 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-3" for bucket id=dt=2018-09-22. 2018-10-06 14:49:17.848 [Sink: Unnamed (1/1)] INFO o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=5 (max part counter=4). 2018-10-06 14:49:17.848 [Sink: Unnamed (1/1)] INFO o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=5 (max part counter=4). 2018-10-06 14:49:17.849 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint. 2018-10-06 14:49:17.849 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint. 2018-10-06 14:49:18.385 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=dt=2018-09-22 and bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files for checkpoints: {3 4 5 } 2018-10-06 14:49:18.385 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=dt=2018-09-22 and bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files for checkpoints: {3 4 5 } 2018-10-06 14:52:17.824 [Sink: Unnamed (1/1)] INFO o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=6 (max part counter=4). 2018-10-06 14:52:17.824 [Sink: Unnamed (1/1)] INFO o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=6 (max part counter=4). 2018-10-06 14:52:17.825 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=dt=2018-09-22 and bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files for checkpoints: {3 4 5 } 2018-10-06 14:52:17.825 [Sink: Unnamed (1/1)] DEBUG o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=dt=2018-09-22 and bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files for checkpoints: {3 4 5 } Thanks and best regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell,
From the logs, only checkpoint 2 was acknowledged (search for “eceived completion notification for checkpoint with id=“) and this is why no more files are finalized. So only checkpoint 2 was successfully completed. BTW you are using the PR you mentioned before or Flink 1.6? I am asking because on Flink 1.6. there is no support for S3 on the streaming file sink. Cheers, Kostas > On Oct 7, 2018, at 2:02 AM, Averell <[hidden email]> wrote: > > received completion notification for checkpoint wit |
Hi Kostas,
I'm using a build with your PR. However, it seemed the issue is not with S3, as when I tried to write to local file system (file:///, not HDFS), I also got the same problem - only the first part published. All remaining parts were in inprogress and had names prefixed with "." From Flink GUI, all checkpoints were shown as completed successfully. How could I debug further? Thanks a lot for your help. Regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell,
Could you set your logging to DEBUG? This may shed some light on what is happening as it will contain more logs. Kostas > On Oct 7, 2018, at 11:03 AM, Averell <[hidden email]> wrote: > > Hi Kostas, > > I'm using a build with your PR. However, it seemed the issue is not with S3, > as when I tried to write to local file system (file:///, not HDFS), I also > got the same problem - only the first part published. All remaining parts > were in inprogress and had names prefixed with "." > > From Flink GUI, all checkpoints were shown as completed successfully. > > How could I debug further? > > Thanks a lot for your help. > Regards, > Averell > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
I just saw that you have already set the level to DEBUG. These are all your DEBUG logs of the TM when running on YARN? Also did you try to wait a bit more to see if the acknowledgements of the checkpoints arrive a bit later? Checkpoints and acknowledgments are not necessarily aligned. Kostas > On Oct 7, 2018, at 12:37 PM, Kostas Kloudas <[hidden email]> wrote: > > Hi Averell, > > Could you set your logging to DEBUG? > This may shed some light on what is happening as it will contain more logs. > > Kostas > >> On Oct 7, 2018, at 11:03 AM, Averell <[hidden email]> wrote: >> >> Hi Kostas, >> >> I'm using a build with your PR. However, it seemed the issue is not with S3, >> as when I tried to write to local file system (file:///, not HDFS), I also >> got the same problem - only the first part published. All remaining parts >> were in inprogress and had names prefixed with "." >> >> From Flink GUI, all checkpoints were shown as completed successfully. >> >> How could I debug further? >> >> Thanks a lot for your help. >> Regards, >> Averell >> >> >> >> -- >> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > |
Hi Kostas,
Yes, I set the level to DEBUG, but for the /org.apache.flink.streaming.api.functions.sink.filesystem.bucket/ only. Will try to enable for /org.apache.flink.streaming/. I just found one (possibly) issue with my build is that I had not used the latest master branch when merging with your PR. So I might have missed some other important PRs. BTW, regarding your comment /Checkpoints and acknowledgments are not necessarily aligned/ << If this happens and for some reason, the job crashed right after the checkpoint and before acknowledgements, does that means that last checkpoint is not valid? Thanks and best regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
Yes, please enable DEBUG to streaming to see all the logs also from the StreamTask. A checkpoint is “valid” as soon as it get acknowledged. As the documentation says, the job will restart from “ the last **successful** checkpoint” which is the most recent acknowledged one. Cheers, Kostas > On Oct 7, 2018, at 1:03 PM, Averell <[hidden email]> wrote: > > Hi Kostas, > > Yes, I set the level to DEBUG, but for the > /org.apache.flink.streaming.api.functions.sink.filesystem.bucket/ only. > Will try to enable for /org.apache.flink.streaming/. > I just found one (possibly) issue with my build is that I had not used the > latest master branch when merging with your PR. So I might have missed some > other important PRs. > > BTW, regarding your comment /Checkpoints and acknowledgments are not > necessarily aligned/ << If this happens and for some reason, the job crashed > right after the checkpoint and before acknowledgements, does that means that > last checkpoint is not valid? > > Thanks and best regards, > Averell > > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Kostas,
Thanks for the info. That error caused by I built your code along with not up-to-date baseline. I rebased my branch build, and there's no more such issue. I've been testing, and until now have some questions/issues as below: 1. I'm not able to write to S3 with the following URI format: *s3*://<path>, and had to use *s3a*://<path>. Is this behaviour expected? (I am running Flink on AWS EMR, and I thought that EMR provides a wrapper for HDFS over S3 with something called EMRFS). 2. Occasionally/randomly I got the below message ( parquet_error1.log <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/parquet_error1.log> ). I'm using ParquetAvroWriters.forReflectRecord() method to write Scala case classes. Re-running the job doesn't get that error at the same data location, so I don't think that there's issue with data. *java.lang.ArrayIndexOutOfBoundsException: <some random number>* /at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainBinaryDictionaryValuesWriter.fallBackDictionaryEncodedData/. 3. Sometimes I got this error message when I use parallelism of 8 for the sink ( parquet_error2.log <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/parquet_error2.log> ). Reducing to 2 solves the issue. But is it possible to increase the pool size? I could not find any place that I can change the /fs.s3.maxconnections/ parameter. /java.io.InterruptedIOException: initiate MultiPartUpload on Test/output/dt=2018-09-20/part-7-5: org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool/ 4. Where is the temporary folder that you store the parquet file before uploading to S3? Thanks a lot for your help. Best regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |