This post was updated on .
Hi,
Excuse me for the unclear title but I don't know how to summarise the question. I'm using an external library integrated with Flink called Flink-HTM. It is still a prototype. Internally, it performs everything I want but I have a problem returning evaluated values in a printable datastream. I posted here my question because I believe the problem is tied with Flink and not with the library. Essentially I have the following code in my main: DataStream<Double> result = HTM.learn(kafkaStream, new Harness.AnomalyNetwork()) .select(new InferenceSelectFunction<Harness.KafkaRecord, Double>() { @Override public Double select(Tuple2<Harness.KafkaRecord, NetworkInference> inference) throws Exception { return inference.f1.getAnomalyScore(); } }); Then I want to print the datastream "result". Following the /learn/ method the flink-htm lib correctly performs many operations on data. At the end of this computation, in another class I have a /DataStream<T, NetworkInference>/ and essentially I have to call the overridden "/select/" method on that/ Datastream<T,NetworkInference>/. The code which would do that is: final DataStream<Tuple2<T, NetworkInference>> inferenceStream = inferenceStreamBuilder.build(); return inferenceStream .map(new InferenceSelectMapper<T, R>(clean(inferenceSelectFunction))) .returns(returnType); where /map/ and /returns/ methods are described in Flink's /DataStream.class./ public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) { TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(), Utils.getCallLocationName(), true); return transform("Map", outType, new StreamMap<>(clean(mapper))); } public SingleOutputStreamOperator<T> returns(TypeInformation<T> typeInfo) { requireNonNull(typeInfo, "TypeInformation must not be null"); transformation.setOutputType(typeInfo); return this; } while /InferenceSelectMapper<T,R>/ is the following class: private static class InferenceSelectMapper<T, R> implements MapFunction<Tuple2<T, NetworkInference>, R> { private final InferenceSelectFunction<T, R> inferenceSelectFunction; public InferenceSelectMapper(InferenceSelectFunction<T, R> inferenceSelectFunction) { this.inferenceSelectFunction = inferenceSelectFunction; } @Override public R map(Tuple2<T, NetworkInference> value) throws Exception { return inferenceSelectFunction.select(value); } } which implements Flink's /MapFunction/. I absolutely need the program call the /InferenceSelectMapper.map()/ method to call my defined "/select/" function, unfortunately this doesn't happen. As consequence of that, in main method and in the IDE console, I suppose the /DataStream result/ is not filled and none output is printed, which is the my fundamental problem. Since I'm not a Flink expert I don't know how to perform many operations at "lower level". Honestly I don't understand exactly what /map/ and /returns/ methods of /DataStream.class/ do. I thought a lot about it and I also tried to find a way to call /InferenceSelectMapper.map()/ method but I don't know how to extract the /Tuple2<T, NetworkInference>/ from the /DataStream<Tuple2 < ...>>/. I'm absolutely sure that the /map/ function I need in /InferenceSelectMethod/ is not called because it doesn't appear in call hierarchy and also adding a print instruction that is not showed. Please, can you help me to solve this? I've been stuck on it for a week while the lib's owner doesn't reply to my mails. Sorry for the length. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Andrea, a MapFunction calls its map() function for each stream element and returns exactly one result value.Are you sure that the input stream of the Map operator emits records? Best, Fabian 2017-09-02 19:23 GMT+02:00 AndreaKinn <[hidden email]>: Hi, |
Hi,
I tried to remove the returns function but if I do it, the program returns an error (curious since the return value is a Double). I'm absolutely sure env.execute() is called because I see other streams printed. The program is connected, I followed exactly the example showed in the library, I think that's a bug and I need to solve it. The inference stream where is called the map function is surely full. Thank you Fabian Hueske-2 wrote > Hi Andrea, > > a MapFunction calls its map() function for each stream element and returns > exactly one result value. > MapFunctions are used for 1-to-1 transformations. > The returns() method allows to specify the return type of an operator, in > your case the MapOperator. It is only necessary if Flink cannot > automatically determine the return type of an operator. > > It's not easy to identify what is going on from the code you posted. > Are you sure the program is executed, i.e., did you call env.execute()? > Are all parts of the program connected? > Are you sure that the input stream of the Map operator emits records? > > Best, Fabian > > > 2017-09-02 19:23 GMT+02:00 AndreaKinn < > kinn6aer@ > >: > >> Hi, >> Excuse me for the unclear title but I don't know how to summarise the >> question. >> I'm using an external library integrated with Flink called Flink-HTM. It >> is >> still a prototype. >> Internally, it performs everything I want but I have a problem returning >> evaluated values in a printable datastream. >> I posted here my question because I believe the problem is tied with >> Flink >> and not with the library. >> >> Essentially I have the following code in my main: >> >> */DataStream > <Double> > result = HTM.learn(kafkaStream, new >> Harness.AnomalyNetwork()) >> .select(new >> InferenceSelectFunction<Harness.KafkaRecord, > > Double>() { >> @Override >> public Double select(Tuple2<Harness.KafkaRecord, > > NetworkInference> inference) throws Exception { >> return >> inference.f1.getAnomalyScore(); >> } >> });/* >> >> Then I want to print the datastream "result". >> Following the /learn/ method the flink-htm lib correctly performs many >> operations on data. >> At the end of this computation, in another class I have a >> /DataStream<T, > > NetworkInference>/ and essentially I have to call the overridden > "/select/" >> method on that/ Datastream<T,NetworkInference>/. >> >> The code which would do that is: >> >> */final DataStream<Tuple2&lt;T, NetworkInference>> >> inferenceStream = >> inferenceStreamBuilder.build(); >> >> return inferenceStream >> .map(new InferenceSelectMapper<T, > > R>(clean(inferenceSelectFunction))) >> .returns(returnType); / >> * >> where /map/ and /returns/ methods are described in Flink's >> /DataStream.class./ >> >> */public > <R> > SingleOutputStreamOperator > <R> > map(MapFunction<T, R> mapper) { >> >> TypeInformation > <R> > outType = >> TypeExtractor.getMapReturnTypes(clean(mapper), getType(), >> Utils.getCallLocationName(), true); >> >> return transform("Map", outType, new >> StreamMap<>(clean(mapper))); >> }/* >> >> */public SingleOutputStreamOperator > <T> > returns(TypeInformation > <T> > typeInfo) >> { >> requireNonNull(typeInfo, "TypeInformation must not be >> null"); >> >> transformation.setOutputType(typeInfo); >> return this; >> }/* >> >> while /InferenceSelectMapper<T,R>/ is the following class: >> >> */private static class InferenceSelectMapper<T, R> implements >> MapFunction<Tuple2&lt;T, NetworkInference>, R> { >> >> private final InferenceSelectFunction<T, R> >> inferenceSelectFunction; >> >> public InferenceSelectMapper(InferenceSelectFunction<T, R> >> inferenceSelectFunction) { >> this.inferenceSelectFunction = >> inferenceSelectFunction; >> } >> >> @Override >> public R map(Tuple2<T, NetworkInference> value) throws >> Exception { >> return inferenceSelectFunction.select(value); >> } >> }/* >> >> which implements Flink's /MapFunction/. I absolutely need the program >> call >> the /InferenceSelectMapper.map()/ method to call my defined "/select/" >> function, unfortunately this doesn't happen. As consequence of that, in >> main >> method and in the IDE console, I suppose the /DataStream result/ is not >> filled and none output is printed, which is the my fundamental problem. >> >> Since I'm not a Flink expert I don't know how to perform many operations >> at >> "lower level". >> Honestly I don't understand exactly what /map/ and /returns/ methods of >> /DataStream.class/ do. I thought a lot about it and I also tried to find >> a >> way to call /InferenceSelectMapper.map()/ method but I don't know how to >> extract the /Tuple2<T, NetworkInference>/ from the >> /DataStream<Tuple2&lt;...>>/. >> >> I'm absolutely sure that the /map/ function I need in >> /InferenceSelectMethod/ is not called because it doesn't appear in call >> hierarchy and also adding a print instruction that is not showed. >> >> Please, can you help me to solve this? I've been stuck on it for a week >> while the lib's owner doesn't reply to my mails. >> Sorry for the length. >> >> >> >> -- >> Sent from: http://apache-flink-user-mailing-list-archive.2336050. >> n4.nabble.com/ >> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |