|
Hi Timo "You don't need to specify the type in .flatMap() explicitly. It will be automatically extracted using the generic signature of DataDataConverter.” It does not. That is the reason why I had to add it there
Regarding your error. Make sure that you don't mix up the API classes. If you want to use the Java API you should not use "org.apache.flink.streaming.api.scala.DataStream" but the Java one.
I rewrote the class in Java. Thats why I am so confused
Subject: Re: Java types
Date: January 11, 2018 at 3:07:08 AM CST
Hi Boris,
each API is designed language-specific so they might not always be
the same. Scala has better type extraction features and let you
write code very precisely. Java requires sometime more code to
archieve the same.
You don't need to specify the type in .flatMap() explicitly. It
will be automatically extracted using the generic signature of
DataDataConverter.
Regarding your error. Make sure that you don't mix up the API
classes. If you want to use the Java API you should not use
"org.apache.flink.streaming.api.scala.DataStream" but the Java
one.
Regards,
Timo
Am 1/11/18 um 5:13 AM schrieb Boris Lublinsky:
More questions
In Scala my DataProcessor is defined as
class DataProcessorKeyed extends CoProcessFunction[WineRecord, ModelToServe, Double] with CheckpointedFunction {
And it is used as follows
val models = modelsStream.map(ModelToServe.fromByteArray(_))
.flatMap(BadDataHandler[ModelToServe])
.keyBy(_.dataType)
val data = dataStream.map(DataRecord.fromByteArray(_))
.flatMap(BadDataHandler[WineRecord])
.keyBy(_.dataType)
// Merge streams
data
.connect(models)
.process(DataProcessorKeyed())
When I am doing the same thing in Java
public class DataProcessorKeyed extends CoProcessFunction<Winerecord.WineRecord, ModelToServe, Double> implements CheckpointedFunction{
Which I am using as follows
// Read data from streams
DataStream<Tuple2<String, ModelToServe>> models = modelsStream
.flatMap(new ModelDataConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(ModelToServe.class)))
.keyBy(0);
DataStream<Tuple2<String, Winerecord.WineRecord>> data = dataStream
.flatMap(new DataDataConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(Winerecord.WineRecord.class)))
.keyBy(0);
// Merge streams
data
.connect(models)
.process(new DataProcessorKeyed());
I am getting an error
Error:(68, 17) java: no suitable method found for
keyBy(int)
method
org.apache.flink.streaming.api.scala.DataStream.keyBy(scala.collection.Seq<java.lang.Object>)
is not applicable
(argument mismatch; int cannot be converted
to scala.collection.Seq<java.lang.Object>)
method
org.apache.flink.streaming.api.scala.DataStream.<K>keyBy(scala.Function1<org.apache.flink.api.java.tuple.Tuple2<java.lang.String,com.lightbend.model.ModelToServe>,K>,org.apache.flink.api.common.typeinfo.TypeInformation<K>)
is not applicable
(cannot infer type-variable(s) K
(actual and formal argument lists differ
in length))
So it assumes key/value pairs for the coprocessor
Why is such difference between APIs?
I am trying to covert Scala code (which
works fine) to Java
The sacral code is:
// create a Kafka consumers
// Data
val dataConsumer = new FlinkKafkaConsumer010[Array[Byte]](
DATA_TOPIC,
new ByteArraySchema,
dataKafkaProps
)
// Model
val modelConsumer = new FlinkKafkaConsumer010[Array[Byte]](
MODELS_TOPIC,
new ByteArraySchema,
modelKafkaProps
)
// Create input data streams
val modelsStream = env.addSource(modelConsumer)
val dataStream = env.addSource(dataConsumer)
// Read data from streams
val models = modelsStream.map(ModelToServe.fromByteArray(_))
.flatMap(BadDataHandler[ModelToServe])
.keyBy(_.dataType)
val data = dataStream.map(DataRecord.fromByteArray(_))
.flatMap(BadDataHandler[WineRecord])
.keyBy(_.dataType)
Now I am trying to re write it to Java and
fighting with the requirement of providing types,
where they should be obvious
// create a Kafka consumers
// Data
FlinkKafkaConsumer010<byte[]> dataConsumer = new FlinkKafkaConsumer010<>(
ModelServingConfiguration.DATA_TOPIC,
new ByteArraySchema(),
dataKafkaProps);
// Model
FlinkKafkaConsumer010<byte[]> modelConsumer = new FlinkKafkaConsumer010<>(
ModelServingConfiguration.MODELS_TOPIC,
new ByteArraySchema(),
modelKafkaProps);
// Create input data streams
DataStream<byte[]> modelsStream = env.addSource(modelConsumer, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
DataStream<byte[]> dataStream = env.addSource(dataConsumer, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
// Read data from streams
DataStream<Tuple2<String,ModelToServe>> models = modelsStream
.flatMap(new ModelConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(ModelToServe.class)));
Am I missing something similar to import org.apache.flink.api.scala._
In java?
Now if this is an only way, Does this seems
right?
|