Hi,
I am attempting to create a Key/Value serializer for the Kafka table connector. I forked `KafkaTableSourceSinkFactoryBase`[1] and other relevant classes, updating the serializer. First, I created `JsonRowKeyedSerializationSchema` which implements `KeyedSerializationSchema`[2], which is deprecated. The way it works is you provide a list of indices in your Row output that are the Key. This works successfully. When I tried migrating my `JsonRowKeyedSerializationSchema` to implement `KafkaSerializationSchema`[3], I get a `java.lang.AbstractMethodError` exception. Normally, this would me I'm using an old interface however all my Flink dependencies are version 1.9. I can not find this abstract `serialize()` function in the Flink codebase. Has anyone come across this before? When I print the method of my `JsonRowKeyedSerializationSchema` class, I do see the below which seems to be getting called by the FlinkKafkaProducer but I do not see it in `KafkaSerializationSchema`: public abstract org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema.serialize(java.lang.Object,java.lang.Long) java.lang.Object java.lang.Long `JsonRowKeyedSerializationSchema` class import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.flink.types.Row; import org.apache.kafka.clients.producer.ProducerRecord; public class JsonRowKeyedSerializationSchema implements KafkaSerializationSchema<Row> { // constructors and helpers @Override public ProducerRecord<byte[], byte[]> serialize(Row row, @Nullable Long aLong) { return new ProducerRecord<>("some_topic", serializeKey(row), serializeValue(row)); } } Stacktrace: Caused by: java.lang.AbstractMethodError: Method com/mypackage/flink/serialization/json/JsonRowKeyedSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord; is abstract at com.mypackage.flink.serialization.json.JsonRowKeyedSerializationSchema.serialize(JsonRowKeyedSerializationSchema.java) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:816) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:98) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202) |
Hi Steve, for some reason, it seems as if the Java compiler is not generating the bridge method [1]. Could you double-check that the Java version of your build process and your cluster match? Could you run javap on your generated class file and report back? On Thu, Mar 19, 2020 at 5:13 PM Steve Whelan <[hidden email]> wrote:
|
Hi Arvid, Interestingly, my job runs successfully in a docker container (image flink:1.9.0-scala_2.11) but is failing with the java.lang.AbstractMethodError on AWS EMR (non-docker). I am compiling with java version OpenJDK 1.8.0_242, which is the same version my EMR cluster is running. Though since it runs successfully locally in a docker container, it would point to an issue in our AWS environment setup. Oddly, we have been running Flink on EMR for +2 years and have never come across this till now. Results of javap are: public class com.jwplayer.flink.serialization.json.JsonRowKeyedSerializationSchema implements org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema<org.apache.flink.types.Row> { public static org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema<org.apache.flink.types.Row> create(com.jwplayer.flink.config.serde.SerDeConfig); public byte[] serializeKey(org.apache.flink.types.Row); public byte[] serializeValue(org.apache.flink.types.Row); public org.apache.kafka.clients.producer.ProducerRecord<byte[], byte[]> serialize(org.apache.flink.types.Row, java.lang.Long); public org.apache.kafka.clients.producer.ProducerRecord serialize(java.lang.Object, java.lang.Long); } On Mon, Mar 23, 2020 at 9:55 AM Arvid Heise <[hidden email]> wrote:
|
Hi Steve, I just noticed some inconsistency: Your class correctly contains the bridge method (last method in javap). Your stacktrace however mentions org/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord instead of org.apache.kafka.clients.producer.ProducerRecord. Did you perform any relocations yourself (quite unlikely)? If so, please add an exclusion for org.apache.kafka. If not, then we may look at some inconsistencies in different versions of Flink and your setup. Could you double check all Flink versions ideally down to the minor part, especially on EMR (dashboard) but also in your poms (mvn dependency:tree)? On Wed, Mar 25, 2020 at 2:35 AM Steve Whelan <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |