Hi,
I'm using an external library with Flink I'm trying to implement slotSharingGroup(String) method on it. To do it I looked at SingleOutputStreamOperator Flink's class to see how the method slotSharingGroup(String) is implemented. An abstract: /public class SingleOutputStreamOperator<T> extends DataStream<T> { SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) { super(environment, transformation); } SingleOutputStreamOperator<T> slotSharingGroup(String slotSharingGroup) { transformation.setSlotSharingGroup(slotSharingGroup); return this; } }/ so I changed the constructor of external library class which has to offer the slotSharingGroup() method making it more adherent to SingleOutputStreamOperator template. Now my problem is how to call it (see below) because I don't understand what is StreamTransformation<T> object among the parameters of the constructor of SingleOutputStreamOperator and how to obtain it in main class. Following the method I call: /DataStream<Tuple7<String, String, Date, String, String, Double, Double>> LCxAccResult = HTM.learn(env, */* what STREAMTRANSFORMATION<T> here? */*).slotSharingGroup("group"); / Hope you can help me, thanks in advance Andrea -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Andrea, How about return `SingleOutputStreamOperator` when you called `HTM.learn()`, instead of create a new method in the external library. Since I guessed it called the API of Flink inner that function and the transformation in Flink, such as map, is actually return `SingleOutputStreamOperator` [1], I think it is easier to just change the return type of that function. And you can leverage the functionality of the `SingleOutputStreamOperator`. Hope this will help you. Best Regards, Tony Wei 2017-10-26 0:27 GMT+08:00 AndreaKinn <[hidden email]>: Hi, |
This post was updated on .
Sorry Tony it is my fault, I was wrong the first post. Actually now my
situation is the following: DataStream<Tuple7<String, String, Date, String, String, Double, Double>> LCxAccResult = HTM.learn(LCxAccStream, new Harness.AnomalyNetwork()) .select(new InferenceSelectFunction<Harness.KafkaRecord, Tuple7<String, String, Date, String, String, Double, Double>>() {...} so actually the return value of "Learn" is a HTMStream object and the return value of "Select" is a DataStream where I need to implement slotSharingGroup on Learn. So I think I can't set SingleOutputStreamOperator as return value of learn, I believe (I hope not since I have not a clue how to do it :D) I need to define slotSharingGroup directly in HTMStream class, as in the first post. I think my only hope is to discover what is and how to retrieve that StreamTransformation object -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Andrea, I roughly read that external library[1], and I think the return object of "select" function could be casted as `SingleOutputStreamOperator` type [2]. How about trying the following code? DataStream<Tuple7<String, String, Date, String, String, Double, Double>> LCxAccResult = HTM.learn(LCxAccStream, new Harness.AnomalyNetwork()) .select(new InferenceSelectFunction<Harness.KafkaRecord, Tuple7<String, String, Date, String, String, Double, Double>>() {...}; ((SingleOutputStreamOperator) LCxAccResult).slotSharingGroup("..."); Best Regards, Tony Wei 2017-10-26 16:31 GMT+08:00 AndreaKinn <[hidden email]>: Sorry Tony it is my fault, I was wrong the first post. Actually now my |
Mmm looks good. This solution would be great.
In this way am I setting a slotSharing group for both learn and select method and not only on select? I believed I need to call slotSharingGroup exactly on the return type of learn. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Andrea, In this way, you will only set a slotSharing group on select operator and learn operator will remain in the default group. If you want to set lean operator as well, I am afraid that you need to refactor `HTMStream` to expose `InferenceStreamBuilder.build()`. Best Regards, Tony Wei 2017-10-26 17:01 GMT+08:00 AndreaKinn <[hidden email]>: Mmm looks good. This solution would be great. |
Can you be clearer about this part?
I'm really appreciating your help Tony Wei wrote > you need to refactor `HTMStream` to expose > `InferenceStreamBuilder.build()`. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Andrea, The `learn` operator is defined in this method [1]. If you need to set its slotSharing group, you should add `slotSharingGroup(...)` behind line 97 [2] or a new API to get the result from `inferenceStreamBuilder.build()`. Best Regards, Tony Wei 2017-10-26 17:36 GMT+08:00 AndreaKinn <[hidden email]>: Can you be clearer about this part? |
I'm trying to create an API to get results from build() but anyway I'm stuck
on the StreamTransformation which is essential to implement slotSharingGroup(...). I have to provide it from the main class. Tony Wei wrote > Hi Andrea, > > The `learn` operator is defined in this method [1]. If you need to set its > slotSharing group, you should add `slotSharingGroup(...)` behind line 97 > [2] or a new API to get the result from `inferenceStreamBuilder.build()`. > > Best Regards, > Tony Wei > > [1] > https://github.com/htm-community/flink-htm/blob/master/flink-htm-streaming-java/src/main/java/org/numenta/nupic/flink/streaming/api/HTMStream.java#L148 > [2] > https://github.com/htm-community/flink-htm/blob/master/flink-htm-streaming-java/src/main/java/org/numenta/nupic/flink/streaming/api/HTMStream.java#L97 > > 2017-10-26 17:36 GMT+08:00 AndreaKinn < > kinn6aer@ > >: > >> Can you be clearer about this part? >> >> I'm really appreciating your help >> >> >> Tony Wei wrote >> > you need to refactor `HTMStream` to expose >> > `InferenceStreamBuilder.build()`. >> >> >> >> >> >> -- >> Sent from: http://apache-flink-user-mailing-list-archive.2336050. >> n4.nabble.com/ >> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Andrea, I don't think you need to touch `StreamTransformation`. If you can get the result from build(), you can do the same thing I mentioned above: casting it as SingleOutputStreamOperator. Then, you can pass it to select function to add the next operator, and get the result to add another slotSharing group. Best Regards, Tony Wei 2017-10-27 17:18 GMT+08:00 AndreaKinn <[hidden email]>: I'm trying to create an API to get results from build() but anyway I'm stuck |
In reply to this post by AndreaKinn
Thanks for your help, I solved the issue refactoring HTMStream adding new
api's -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |