More questionsIn Scala my DataProcessor is defined asclass DataProcessorKeyed extends CoProcessFunction[WineRecord, ModelToServe, Double] with CheckpointedFunction {And it is used as followsval models = modelsStream.map(ModelToServe.fromByteArray(_)) .flatMap(BadDataHandler[ModelToServe]) .keyBy(_.dataType) val data = dataStream.map(DataRecord.fromByteArray(_)) .flatMap(BadDataHandler[WineRecord]) .keyBy(_.dataType) // Merge streams data .connect(models) .process(DataProcessorKeyed())When I am doing the same thing in Javapublic class DataProcessorKeyed extends CoProcessFunction<Winerecord.WineRecord, ModelToServe, Double> implements CheckpointedFunction{Which I am using as follows// Read data from streams DataStream<Tuple2<String, ModelToServe>> models = modelsStream .flatMap(new ModelDataConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(ModelToServe.class))) .keyBy(0); DataStream<Tuple2<String, Winerecord.WineRecord>> data = dataStream .flatMap(new DataDataConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(Winerecord.WineRecord.class))) .keyBy(0); // Merge streams data .connect(models) .process(new DataProcessorKeyed());I am getting an error
Error:(68, 17) java: no suitable method found for keyBy(int)method org.apache.flink.streaming.api.scala.DataStream.keyBy(scala.collection.Seq<java.lang.Object>) is not applicable(argument mismatch; int cannot be converted to scala.collection.Seq<java.lang.Object>)method org.apache.flink.streaming.api.scala.DataStream.<K>keyBy(scala.Function1<org.apache.flink.api.java.tuple.Tuple2<java.lang.String,com.lightbend.model.ModelToServe>,K>,org.apache.flink.api.common.typeinfo.TypeInformation<K>) is not applicable(cannot infer type-variable(s) K(actual and formal argument lists differ in length))So it assumes key/value pairs for the coprocessor
Why is such difference between APIs?
On Jan 10, 2018, at 6:20 PM, Boris Lublinsky <[hidden email]> wrote:
I am trying to covert Scala code (which works fine) to JavaThe sacral code is:// create a Kafka consumers // Data val dataConsumer = new FlinkKafkaConsumer010[Array[Byte]]( DATA_TOPIC, new ByteArraySchema, dataKafkaProps ) // Model val modelConsumer = new FlinkKafkaConsumer010[Array[Byte]]( MODELS_TOPIC, new ByteArraySchema, modelKafkaProps ) // Create input data streams val modelsStream = env.addSource(modelConsumer) val dataStream = env.addSource(dataConsumer) // Read data from streams val models = modelsStream.map(ModelToServe.fromByteArray(_)) .flatMap(BadDataHandler[ModelToServe]) .keyBy(_.dataType) val data = dataStream.map(DataRecord.fromByteArray(_)) .flatMap(BadDataHandler[WineRecord]) .keyBy(_.dataType)Now I am trying to re write it to Java and fighting with the requirement of providing types, where they should be obvious
// create a Kafka consumers // Data FlinkKafkaConsumer010<byte[]> dataConsumer = new FlinkKafkaConsumer010<>( ModelServingConfiguration.DATA_TOPIC, new ByteArraySchema(), dataKafkaProps); // Model FlinkKafkaConsumer010<byte[]> modelConsumer = new FlinkKafkaConsumer010<>( ModelServingConfiguration.MODELS_TOPIC, new ByteArraySchema(), modelKafkaProps); // Create input data streams DataStream<byte[]> modelsStream = env.addSource(modelConsumer, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); DataStream<byte[]> dataStream = env.addSource(dataConsumer, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);// Read data from streams DataStream<Tuple2<String,ModelToServe>> models = modelsStream .flatMap(new ModelConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(ModelToServe.class)));Am I missing something similar to import org.apache.flink.api.scala._In java?
Now if this is an only way, Does this seems right?
Free forum by Nabble | Edit this page |