Hi guys,
In our flink job we use java source for deserializing a message from kafka using a kafka deserializer. Signature is as below.
The other parts of the streaming job are in scala. When data has to be serialized I get this exception java.lang.RuntimeException: org.apache.flink.api.java.tuple.Tuple2 cannot be cast to scala.Product at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) Here is how I provide type info for serialization in the java deserialization class: @Override Here is how I add the kafka source in scala : private[flink] def sourceType( Any idea thoughts on how to interoperate between java tuple2 and scala case class ? Also using 1.9.1 version of flink-connector-kafka while the rest of the cluster uses 1.7.2 version of flink. Best, Nick. |
Hi,
Flink will not do any casting between types. You either need to output to correct (Scala) Tuple type from the deserialization schema or insert a step (say a map function) that converts between the two types. The Tuple2 type and the Scala tuple type, i.e. (foo, bar) have nothing in common when it comes to the type system. Best, Aljoscha On 06.05.20 01:42, Nick Bendtner wrote: > Hi guys, > In our flink job we use java source for deserializing a message from kafka > using a kafka deserializer. Signature is as below. > > > public class CustomAvroDeserializationSchema implements > KafkaDeserializationSchema<Tuple2<EventMetaData,GenericRecord>> > > The other parts of the streaming job are in scala. When data has to be > serialized I get this exception > > > > > *java.lang.RuntimeException: org.apache.flink.api.java.tuple.Tuple2 cannot > be cast to scala.Product at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)* > > Here is how I provide type info for serialization in the java > deserialization class: > > @Override > public TypeInformation<Tuple2<EventMetaData, GenericRecord>> getProducedType() { > return new TupleTypeInfo(TypeInformation.of(EventMetaData.class),new > GenericRecordAvroTypeInfo(this > .writer)); > > Here is how I add the kafka source in scala : > > private[flink] def sourceType( > deserialization: KafkaDeserializationSchema[(EventMetaData, GenericRecord)], > properties: Properties): FlinkKafkaConsumer[(EventMetaData, > GenericRecord)] = { > val consumer = new FlinkKafkaConsumer[(EventMetaData, GenericRecord)]( > source.asJava, > deserialization, > properties) > consumer > } > > Any idea thoughts on how to interoperate between java tuple2 and scala case > class ? Also using 1.9.1 version of flink-connector-kafka while the rest of > the cluster uses 1.7.2 version of flink. > > Best, > Nick. > |
No, I think that should be all right.
On 06.05.20 16:57, Vishwas Siravara wrote: > Thanks I figured that would be the case. I m using the flink tuple type in > the map functions ,so there is no casting required now. Can you think of > any downsides of using flink tuples in scala code, especially since the > flink tuple is in the java api package in flink ? > > Best, > Nick. > > On Wed, May 6, 2020 at 9:52 AM Aljoscha Krettek <[hidden email]> wrote: > >> Hi, >> >> Flink will not do any casting between types. You either need to output >> to correct (Scala) Tuple type from the deserialization schema or insert >> a step (say a map function) that converts between the two types. The >> Tuple2 type and the Scala tuple type, i.e. (foo, bar) have nothing in >> common when it comes to the type system. >> >> Best, >> Aljoscha >> >> On 06.05.20 01:42, Nick Bendtner wrote: >>> Hi guys, >>> In our flink job we use java source for deserializing a message from >> kafka >>> using a kafka deserializer. Signature is as below. >>> >>> >>> public class CustomAvroDeserializationSchema implements >>> KafkaDeserializationSchema<Tuple2<EventMetaData,GenericRecord>> >>> >>> The other parts of the streaming job are in scala. When data has to be >>> serialized I get this exception >>> >>> >>> >>> >>> *java.lang.RuntimeException: org.apache.flink.api.java.tuple.Tuple2 >> cannot >>> be cast to scala.Product at >>> >> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) >>> at >>> >> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) >>> at >>> >> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)* >>> >>> Here is how I provide type info for serialization in the java >>> deserialization class: >>> >>> @Override >>> public TypeInformation<Tuple2<EventMetaData, GenericRecord>> >> getProducedType() { >>> return new TupleTypeInfo(TypeInformation.of(EventMetaData.class),new >>> GenericRecordAvroTypeInfo(this >>> .writer)); >>> >>> Here is how I add the kafka source in scala : >>> >>> private[flink] def sourceType( >>> deserialization: KafkaDeserializationSchema[(EventMetaData, >> GenericRecord)], >>> properties: Properties): FlinkKafkaConsumer[(EventMetaData, >>> GenericRecord)] = { >>> val consumer = new FlinkKafkaConsumer[(EventMetaData, GenericRecord)]( >>> source.asJava, >>> deserialization, >>> properties) >>> consumer >>> } >>> >>> Any idea thoughts on how to interoperate between java tuple2 and scala >> case >>> class ? Also using 1.9.1 version of flink-connector-kafka while the rest >> of >>> the cluster uses 1.7.2 version of flink. >>> >>> Best, >>> Nick. >>> >> >> > |
Free forum by Nabble | Edit this page |