Hi Guys,
I tried to implement my Avro Deserializer following these link : Actually compilation is good, but when i send a Avro event from Kafka to Flink, i got the following error : org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp (JobManager.scala:897) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply (JobManager.scala:840) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply (JobManager.scala:840) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1 (Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run (Future.scala:24) at akka.dispatch.TaskInvocation.run (AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec (AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec (ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask (ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker (ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run (ForkJoinWorkerThread.java:107) Caused by: org.apache.avro.AvroRuntimeException: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.NullPointerException at org.apache.avro.specific.SpecificData.getSchema (SpecificData.java:227) at org.apache.avro.specific.SpecificDatumReader.<init> (SpecificDatumReader.java:37) at com.nybble.alpha.AvroDeserializationSchema.ensureInitialized (AvroDeserializationSchema.java:66) at com.nybble.alpha.AvroDeserializationSchema.deserialize (AvroDeserializationSchema.java:44) at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize (KeyedDeserializationSchemaWrapper.java:42) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop (Kafka09Fetcher.java:139) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run (FlinkKafkaConsumerBase.java:652) at org.apache.flink.streaming.api.operators.StreamSource.run (StreamSource.java:86) at org.apache.flink.streaming.api.operators.StreamSource.run (StreamSource.java:55) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run (SourceStreamTask.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke (StreamTask.java:264) at org.apache.flink.runtime.taskmanager.Task.run (Task.java:718) at java.lang.Thread.run (Thread.java:748) Anyone have already encounter this error with Avro Deserialization ? I can't find much information about "avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException" Regards, Sebastien. |
Hi Sebastien,
for me this seems more an Avro issue than a Flink issue. You can ignore the shaded exception, we shade Google utilities for avoiding depencency conflicts. The root cause is this: java.lang.NullPointerException at org.apache.avro.specific.SpecificData.getSchema (SpecificData.java:227) And the corresponding lines look like this: /** Find the schema for a Java type. */ public Schema getSchema(java.lang.reflect.Type type) { try { return schemaCache.get(type); } catch (Exception e) { throw (e instanceof AvroRuntimeException) ? // line 227 (AvroRuntimeException)e.getCause() : new AvroRuntimeException(e); } } So I guess your schema is missing. I hope this helps. Regards, Timo Am 25.04.18 um 10:57 schrieb Lehuede sebastien: > ava.lang.NullPointerException > at org.apache.avro.specific.SpecificData.getSchema |
Hi Timo, Thanks for your response ! I have define my Avro schema in "toKafka.avsc" and create my "toKafka.java" file with : #java -jar avro-tools-1.8.2.jar compile schema toKafka.avsc Then i import Avro Serialize Schema and my "toKafka.java" generated file : import com.nybble.alpha.AvroDeserializationSchema; import com.nybble.alpha.toKafka; Do i miss something ? Regards, Sebastien. 2018-04-25 11:32 GMT+02:00 Timo Walther <[hidden email]>: Hi Sebastien, |
Free forum by Nabble | Edit this page |