Casting from org.apache.flink.api.java.tuple.Tuple2 to scala.Product; using java and scala in flink job

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Casting from org.apache.flink.api.java.tuple.Tuple2 to scala.Product; using java and scala in flink job

Nick Bendtner
Hi guys,
In our flink job we use java source for deserializing a message from kafka using a kafka deserializer. Signature is as below. 

public class CustomAvroDeserializationSchema implements
KafkaDeserializationSchema<Tuple2<EventMetaData,GenericRecord>>
The other parts of the streaming job are in scala. When data has to be serialized I get this exception 

java.lang.RuntimeException: org.apache.flink.api.java.tuple.Tuple2 cannot be cast to scala.Product
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)


Here is how I provide type info for serialization in the java deserialization class: 
@Override 
public TypeInformation<Tuple2<EventMetaData, GenericRecord>> getProducedType() {
return new TupleTypeInfo(TypeInformation.of(EventMetaData.class),new GenericRecordAvroTypeInfo(this
.writer));
Here is how I add the kafka source in scala : 
private[flink] def sourceType(
deserialization: KafkaDeserializationSchema[(EventMetaData, GenericRecord)],
properties: Properties): FlinkKafkaConsumer[(EventMetaData, GenericRecord)] = {
val consumer = new FlinkKafkaConsumer[(EventMetaData, GenericRecord)](
source.asJava,
deserialization,
properties)
consumer
}
Any idea thoughts on how to interoperate between java tuple2 and scala case class ? Also using 1.9.1 version of flink-connector-kafka while the rest of the cluster uses 1.7.2 version of flink. 

Best,
Nick. 








Reply | Threaded
Open this post in threaded view
|

Re: Casting from org.apache.flink.api.java.tuple.Tuple2 to scala.Product; using java and scala in flink job

Aljoscha Krettek
Hi,

Flink will not do any casting between types. You either need to output
to correct (Scala) Tuple type from the deserialization schema or insert
a step (say a map function) that converts between the two types. The
Tuple2 type and the Scala tuple type, i.e. (foo, bar) have nothing in
common when it comes to the type system.

Best,
Aljoscha

On 06.05.20 01:42, Nick Bendtner wrote:

> Hi guys,
> In our flink job we use java source for deserializing a message from kafka
> using a kafka deserializer. Signature is as below.
>
>
> public class CustomAvroDeserializationSchema implements
>          KafkaDeserializationSchema<Tuple2<EventMetaData,GenericRecord>>
>
> The other parts of the streaming job are in scala. When data has to be
> serialized I get this exception
>
>
>
>
> *java.lang.RuntimeException: org.apache.flink.api.java.tuple.Tuple2 cannot
> be cast to scala.Product at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)*
>
> Here is how I provide type info for serialization in the java
> deserialization class:
>
> @Override
> public TypeInformation<Tuple2<EventMetaData, GenericRecord>> getProducedType() {
>      return new TupleTypeInfo(TypeInformation.of(EventMetaData.class),new
> GenericRecordAvroTypeInfo(this
>              .writer));
>
> Here is how I add the kafka source in scala :
>
> private[flink] def sourceType(
>    deserialization: KafkaDeserializationSchema[(EventMetaData, GenericRecord)],
>    properties: Properties): FlinkKafkaConsumer[(EventMetaData,
> GenericRecord)] = {
>    val consumer = new FlinkKafkaConsumer[(EventMetaData, GenericRecord)](
>      source.asJava,
>      deserialization,
>      properties)
>    consumer
> }
>
> Any idea thoughts on how to interoperate between java tuple2 and scala case
> class ? Also using 1.9.1 version of flink-connector-kafka while the rest of
> the cluster uses 1.7.2 version of flink.
>
> Best,
> Nick.
>

Reply | Threaded
Open this post in threaded view
|

Re: Casting from org.apache.flink.api.java.tuple.Tuple2 to scala.Product; using java and scala in flink job

Aljoscha Krettek
No, I think that should be all right.

On 06.05.20 16:57, Vishwas Siravara wrote:

> Thanks I figured that would be the case. I m using the flink tuple type in
> the map functions ,so there is no casting required now. Can you think of
> any downsides of using flink tuples in scala code, especially since the
> flink tuple is in the java api package in flink ?
>
> Best,
> Nick.
>
> On Wed, May 6, 2020 at 9:52 AM Aljoscha Krettek <[hidden email]> wrote:
>
>> Hi,
>>
>> Flink will not do any casting between types. You either need to output
>> to correct (Scala) Tuple type from the deserialization schema or insert
>> a step (say a map function) that converts between the two types. The
>> Tuple2 type and the Scala tuple type, i.e. (foo, bar) have nothing in
>> common when it comes to the type system.
>>
>> Best,
>> Aljoscha
>>
>> On 06.05.20 01:42, Nick Bendtner wrote:
>>> Hi guys,
>>> In our flink job we use java source for deserializing a message from
>> kafka
>>> using a kafka deserializer. Signature is as below.
>>>
>>>
>>> public class CustomAvroDeserializationSchema implements
>>>           KafkaDeserializationSchema<Tuple2<EventMetaData,GenericRecord>>
>>>
>>> The other parts of the streaming job are in scala. When data has to be
>>> serialized I get this exception
>>>
>>>
>>>
>>>
>>> *java.lang.RuntimeException: org.apache.flink.api.java.tuple.Tuple2
>> cannot
>>> be cast to scala.Product at
>>>
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>>> at
>>>
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>>> at
>>>
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)*
>>>
>>> Here is how I provide type info for serialization in the java
>>> deserialization class:
>>>
>>> @Override
>>> public TypeInformation<Tuple2<EventMetaData, GenericRecord>>
>> getProducedType() {
>>>       return new TupleTypeInfo(TypeInformation.of(EventMetaData.class),new
>>> GenericRecordAvroTypeInfo(this
>>>               .writer));
>>>
>>> Here is how I add the kafka source in scala :
>>>
>>> private[flink] def sourceType(
>>>     deserialization: KafkaDeserializationSchema[(EventMetaData,
>> GenericRecord)],
>>>     properties: Properties): FlinkKafkaConsumer[(EventMetaData,
>>> GenericRecord)] = {
>>>     val consumer = new FlinkKafkaConsumer[(EventMetaData, GenericRecord)](
>>>       source.asJava,
>>>       deserialization,
>>>       properties)
>>>     consumer
>>> }
>>>
>>> Any idea thoughts on how to interoperate between java tuple2 and scala
>> case
>>> class ? Also using 1.9.1 version of flink-connector-kafka while the rest
>> of
>>> the cluster uses 1.7.2 version of flink.
>>>
>>> Best,
>>> Nick.
>>>
>>
>>
>