I'm implementing an exponential backoff inside a custom sink that uses an AvroParquetWriter to write to S3. I've change the number of attempts to 0 inside the core-site.xml, and I'm capturing the timeout exception, doing a Thread.sleep for X seconds. This is working as intended, and when S3 is offline, it waits until it is online.
I also want to test that the back pressure and the checkpoints are working as intended, and for the first one, I can see the back pressure in Flink UI going up, and recover as expected and not reading more data from Kafka. For the checkpoints, and I've added inside the sink invoke function a randomly exception (1 in 100, to simulate that a problem has happen, and need to recover from the last good checkpoint), but something strange happens. I can see the job is being canceled and created again, and running fine, other times after a X number of times of being created and canceled, it gives a NoClassDefFoundError, and it will keep giving that forever. Do you guys have any thoughts? org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception while processing timer. at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958) at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: TimerException{java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeParserBucket} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284) ... 7 more Caused by: java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeParserBucket at org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825) at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193) at com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78) at com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115) at com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32) at com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25) at com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072) at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746) at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027) at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553) at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) at org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81) at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246) at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280) at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535) at com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at scala.util.Try$.apply(Try.scala:209) at com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54) at com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45) at com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) ... 7 more |
Hi David, This looks like a problem with resolution of maven dependencies or something. The custom WindowParquetGenericRecordListFileSink probably transitively depends on org/joda/time/format/DateTimeParserBucket and it is missing on the runtime classpath of Flink. Best, Andrey On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <[hidden email]> wrote:
On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <[hidden email]> wrote:
|
Hi Andrey, thanks for your reply. The class is on the jar created with `sbt assembly` that is submitted to Flink to start a Job. unzip -l target/jar/myapp-0.0.1-SNAPSHOT.jar | grep DateTimeParserBucket 1649 05-27-2016 10:24 org/joda/time/format/DateTimeParserBucket$SavedField.class 1984 05-27-2016 10:24 org/joda/time/format/DateTimeParserBucket$SavedState.class 8651 05-27-2016 10:24 org/joda/time/format/DateTimeParserBucket.class Shouldn't this be enough ? I think it uses is when nothing happens, but as soon it have some exceptions, looks like it "forgets" it. Like I said before, this is kind of intermittent. Thanks, David On Thu, Feb 6, 2020 at 5:46 PM Andrey Zagrebin <[hidden email]> wrote:
|
Hi David, this seems to be a bug in our s3 plugin. The joda dependency should be bundled there. Are you using s3 as a plugin by any chance? Which flink version are you using? If you are using s3 as a plugin, you could put joda in your plugin folder like this If flink-s3-fs-hadoop.jar is in lib, you could try adding joda into that. Adding joda to your user code will unfortunately not work. Best, Arvid On Thu, Feb 6, 2020 at 11:16 PM David Magalhães <[hidden email]> wrote:
|
Hi David, upon closer reviewing your stacktrace, it seems like you are trying to access S3 without our S3 plugin. That's in general not recommended at all. Best, Arvid On Tue, Feb 11, 2020 at 11:06 AM Arvid Heise <[hidden email]> wrote:
|
Hi Arvid, I'm using flink-s3-fs-hadoop-1.9.1.jar in plugins folder. Like I said previously, this works normally until an exception is throw inside the sink. It will try to recover again, but sometimes doesn't recover giving this error. To write to S3 I use AvroParquetWriter with the following code: val writer = AvroParquetWriter .builder[GenericRecord](new Path(finalFilePath)) Path is from org.apache.hadoop.fs, the other option is to use org.apache.flink.formats.parquet.StreamOutputFile which will use flink S3 plugin, right ? Not sure how can I convert from Path to StreamOuputFile. I know that when I've used StreamingFileSink, I used StreamOuputFile. On Wed, Feb 12, 2020 at 10:03 AM Arvid Heise <[hidden email]> wrote:
|
According to this answer [1] the first exception "mentioning" org/joda/time/format/DateTimeParserBucket should be a different one. Can you go through the logs to make sure it is really a ClassNotFoundException, and not a ExceptionInInitializerError or something else? On Wed, Feb 12, 2020 at 12:36 PM David Magalhães <[hidden email]> wrote:
|
Hi Robert, I couldn't found any previous mention before the NoClassDefFoundError. Here is the full log [1] if you want to look for something more specific. [1] https://www.dropbox.com/s/l8tba6vft08flke/joda.out?dl=0 On Wed, Feb 12, 2020 at 12:45 PM Robert Metzger <[hidden email]> wrote:
|
Hi David, can you double-check the folder structure of your plugin? It should reside in its own subfolder. Here is an example.
I will investigate your error deeply in the next few days but I'd like to have a final confirmation about the folder structure.
On Wed, Feb 12, 2020 at 8:56 PM David Magalhães <[hidden email]> wrote:
|
Hi Arvid, I use a docker image. Here is the Dockerfile: FROM flink:1.9.1-scala_2.12 RUN mkdir /opt/flink/plugins/flink-s3-fs-hadoop RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.9.1.jar /opt/flink/plugins/flink-s3-fs-hadoop/ Please let me know if you need more information. On Wed, Feb 12, 2020 at 9:15 PM Arvid Heise <[hidden email]> wrote:
|
Hi David, sorry for replying late. I was caught up on other incidents. I double-checked all the information that you provided and conclude that you completely bypass our filesystems and plugins. What you are using is AvroParquetWriter, which brings in the hadoop dependencies, including raw hadoop s3. It becomes obvious since the Path you are using is not coming from Flink namespace. The class issues that come from that are hard to debug. You are effectively bundling another hadoop, so if you also have a specific Hadoop version on your cluster (e.g. on EMR), then there can be ambiguities and the seen error happens. What I'd recommend you do is a completely different approach. Assuming you just want exponential backoff for all s3 write accesses, you could wrap the S3AFileSystem and create your own s3 plugin. That would work with any given format for future cases. If you want to stick to your approach, you should use org.apache.flink.formats.parquet.ParquetWriterFactory, which uses your mentioned StreamOutputFile. Best, Arvid On Thu, Feb 13, 2020 at 12:04 AM David Magalhães <[hidden email]> wrote:
|
Thanks for the feedback Arvid. Currently isn't an issue, but I will look back into it in the future. On Tue, Feb 18, 2020 at 1:51 PM Arvid Heise <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |