I rewrote the class in Java. Thats why I am so confusedRegarding your error. Make sure that you don't mix up the API classes. If you want to use the Java API you should not use "org.apache.flink.streaming.api.scala.DataStream" but the Java one.
Hi Boris,
each API is designed language-specific so they might not always be the same. Scala has better type extraction features and let you write code very precisely. Java requires sometime more code to archieve the same.
You don't need to specify the type in .flatMap() explicitly. It will be automatically extracted using the generic signature of DataDataConverter.
Regarding your error. Make sure that you don't mix up the API classes. If you want to use the Java API you should not use "org.apache.flink.streaming.api.scala.DataStream" but the Java one.
Regards,
Timo
Am 1/11/18 um 5:13 AM schrieb Boris Lublinsky:
More questionsIn Scala my DataProcessor is defined asclass DataProcessorKeyed extends CoProcessFunction[WineRecord, ModelToServe, Double] with CheckpointedFunction {And it is used as followsval models = modelsStream.map(ModelToServe.fromByteArray(_)) .flatMap(BadDataHandler[ModelToServe]) .keyBy(_.dataType) val data = dataStream.map(DataRecord.fromByteArray(_)) .flatMap(BadDataHandler[WineRecord]) .keyBy(_.dataType) // Merge streams data .connect(models) .process(DataProcessorKeyed())When I am doing the same thing in Javapublic class DataProcessorKeyed extends CoProcessFunction<Winerecord.WineRecord, ModelToServe, Double> implements CheckpointedFunction{Which I am using as follows// Read data from streams DataStream<Tuple2<String, ModelToServe>> models = modelsStream .flatMap(new ModelDataConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(ModelToServe.class))) .keyBy(0); DataStream<Tuple2<String, Winerecord.WineRecord>> data = dataStream .flatMap(new DataDataConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(Winerecord.WineRecord.class))) .keyBy(0); // Merge streams data .connect(models) .process(new DataProcessorKeyed());I am getting an error
Error:(68, 17) java: no suitable method found for keyBy(int)method org.apache.flink.streaming.api.scala.DataStream.keyBy(scala.collection.Seq<java.lang.Object>) is not applicable(argument mismatch; int cannot be converted to scala.collection.Seq<java.lang.Object>)method org.apache.flink.streaming.api.scala.DataStream.<K>keyBy(scala.Function1<org.apache.flink.api.java.tuple.Tuple2<java.lang.String,com.lightbend.model.ModelToServe>,K>,org.apache.flink.api.common.typeinfo.TypeInformation<K>) is not applicable(cannot infer type-variable(s) K(actual and formal argument lists differ in length))So it assumes key/value pairs for the coprocessor
Why is such difference between APIs?
On Jan 10, 2018, at 6:20 PM, Boris Lublinsky <[hidden email]> wrote:
I am trying to covert Scala code (which works fine) to JavaThe sacral code is:// create a Kafka consumers // Data val dataConsumer = new FlinkKafkaConsumer010[Array[Byte]]( DATA_TOPIC, new ByteArraySchema, dataKafkaProps ) // Model val modelConsumer = new FlinkKafkaConsumer010[Array[Byte]]( MODELS_TOPIC, new ByteArraySchema, modelKafkaProps ) // Create input data streams val modelsStream = env.addSource(modelConsumer) val dataStream = env.addSource(dataConsumer) // Read data from streams val models = modelsStream.map(ModelToServe.fromByteArray(_)) .flatMap(BadDataHandler[ModelToServe]) .keyBy(_.dataType) val data = dataStream.map(DataRecord.fromByteArray(_)) .flatMap(BadDataHandler[WineRecord]) .keyBy(_.dataType)Now I am trying to re write it to Java and fighting with the requirement of providing types, where they should be obvious
// create a Kafka consumers // Data FlinkKafkaConsumer010<byte[]> dataConsumer = new FlinkKafkaConsumer010<>( ModelServingConfiguration.DATA_TOPIC, new ByteArraySchema(), dataKafkaProps); // Model FlinkKafkaConsumer010<byte[]> modelConsumer = new FlinkKafkaConsumer010<>( ModelServingConfiguration.MODELS_TOPIC, new ByteArraySchema(), modelKafkaProps); // Create input data streams DataStream<byte[]> modelsStream = env.addSource(modelConsumer, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); DataStream<byte[]> dataStream = env.addSource(dataConsumer, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);// Read data from streams DataStream<Tuple2<String,ModelToServe>> models = modelsStream .flatMap(new ModelConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(ModelToServe.class)));Am I missing something similar to import org.apache.flink.api.scala._In java?
Now if this is an only way, Does this seems right?
Free forum by Nabble | Edit this page |