Kafka to Flink Avro Deserializer

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka to Flink Avro Deserializer

Lehuede sebastien
Hi Guys,

I tried to implement my Avro Deserializer following these link : 

Actually compilation is good, but when i send a Avro event from Kafka to Flink, i got the following error :

org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp (JobManager.scala:897)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply (JobManager.scala:840)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply (JobManager.scala:840)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1 (Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run (Future.scala:24)
    at akka.dispatch.TaskInvocation.run (AbstractDispatcher.scala:39)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec (AbstractDispatcher.scala:415)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec (ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask (ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker (ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run (ForkJoinWorkerThread.java:107)
Caused by: org.apache.avro.AvroRuntimeException: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.NullPointerException
    at org.apache.avro.specific.SpecificData.getSchema (SpecificData.java:227)
    at org.apache.avro.specific.SpecificDatumReader.<init> (SpecificDatumReader.java:37)
    at com.nybble.alpha.AvroDeserializationSchema.ensureInitialized (AvroDeserializationSchema.java:66)
    at com.nybble.alpha.AvroDeserializationSchema.deserialize (AvroDeserializationSchema.java:44)
    at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize (KeyedDeserializationSchemaWrapper.java:42)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop (Kafka09Fetcher.java:139)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run (FlinkKafkaConsumerBase.java:652)
    at org.apache.flink.streaming.api.operators.StreamSource.run (StreamSource.java:86)
    at org.apache.flink.streaming.api.operators.StreamSource.run (StreamSource.java:55)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run (SourceStreamTask.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke (StreamTask.java:264)
    at org.apache.flink.runtime.taskmanager.Task.run (Task.java:718)
    at java.lang.Thread.run (Thread.java:748)

Anyone have already encounter this error with Avro Deserialization ? 

I can't find much information about "avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException"


Regards,
Sebastien. 

Reply | Threaded
Open this post in threaded view
|

Re: Kafka to Flink Avro Deserializer

Timo Walther
Hi Sebastien,

for me this seems more an Avro issue than a Flink issue. You can ignore
the shaded exception, we shade Google utilities for avoiding depencency
conflicts.

The root cause is this:

java.lang.NullPointerException
     at org.apache.avro.specific.SpecificData.getSchema
(SpecificData.java:227)

And the corresponding lines look like this:

   /** Find the schema for a Java type. */
   public Schema getSchema(java.lang.reflect.Type type) {
     try {
       return schemaCache.get(type);
     } catch (Exception e) {
       throw (e instanceof AvroRuntimeException) ? // line 227
           (AvroRuntimeException)e.getCause() : new AvroRuntimeException(e);
     }
   }

So I guess your schema is missing.

I hope this helps.

Regards,
Timo

Am 25.04.18 um 10:57 schrieb Lehuede sebastien:
> ava.lang.NullPointerException
>     at org.apache.avro.specific.SpecificData.getSchema


Reply | Threaded
Open this post in threaded view
|

Re: Kafka to Flink Avro Deserializer

Lehuede sebastien
Hi Timo, 

Thanks for your response ! 

I have define my Avro schema in "toKafka.avsc" and create my "toKafka.java" file with :

#java -jar avro-tools-1.8.2.jar compile schema toKafka.avsc

Then i import Avro Serialize Schema and my "toKafka.java" generated file :

import com.nybble.alpha.AvroDeserializationSchema;
import com.nybble.alpha.toKafka;

Do i miss something ? 

Regards,
Sebastien.


2018-04-25 11:32 GMT+02:00 Timo Walther <[hidden email]>:
Hi Sebastien,

for me this seems more an Avro issue than a Flink issue. You can ignore the shaded exception, we shade Google utilities for avoiding depencency conflicts.

The root cause is this:

java.lang.NullPointerException
    at org.apache.avro.specific.SpecificData.getSchema (SpecificData.java:227)

And the corresponding lines look like this:

  /** Find the schema for a Java type. */
  public Schema getSchema(java.lang.reflect.Type type) {
    try {
      return schemaCache.get(type);
    } catch (Exception e) {
      throw (e instanceof AvroRuntimeException) ? // line 227
          (AvroRuntimeException)e.getCause() : new AvroRuntimeException(e);
    }
  }

So I guess your schema is missing.

I hope this helps.

Regards,
Timo

Am 25.04.18 um 10:57 schrieb Lehuede sebastien:
ava.lang.NullPointerException
    at org.apache.avro.specific.SpecificData.getSchema