StreamTransformation<T> object

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

StreamTransformation<T> object

AndreaKinn
Hi,
I'm using an external library with Flink I'm trying to implement
slotSharingGroup(String) method on it.
To do it I looked at SingleOutputStreamOperator Flink's class to see how the
method slotSharingGroup(String) is implemented.

An abstract:

/public class SingleOutputStreamOperator<T> extends DataStream<T> {

SingleOutputStreamOperator(StreamExecutionEnvironment environment,
StreamTransformation<T> transformation) {
                super(environment, transformation);
        }

SingleOutputStreamOperator<T> slotSharingGroup(String slotSharingGroup) {
                transformation.setSlotSharingGroup(slotSharingGroup);
                return this;
        }
}/

so I changed the constructor of external library class which has to offer
the slotSharingGroup() method making it more adherent to
SingleOutputStreamOperator template.

Now my problem is how to call it (see below) because I don't understand what
is StreamTransformation<T> object among the parameters of the constructor of
SingleOutputStreamOperator and how to obtain it in main class.

Following the method I call:

/DataStream<Tuple7&lt;String, String, Date, String, String, Double, Double>>
LCxAccResult = HTM.learn(env, */* what STREAMTRANSFORMATION<T> here?
*/*).slotSharingGroup("group");
/

Hope you can help me, thanks in advance
Andrea



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: StreamTransformation<T> object

Tony Wei
Hi Andrea,

How about return `SingleOutputStreamOperator` when you called `HTM.learn()`, instead of create a new method in the external library.
Since I guessed it called the API of Flink inner that function and the transformation in Flink, such as map, is actually return `SingleOutputStreamOperator` [1], I think it is easier to just change the return type of that function.
And you can leverage the functionality of the `SingleOutputStreamOperator`.
Hope this will help you.

Best Regards,
Tony Wei


2017-10-26 0:27 GMT+08:00 AndreaKinn <[hidden email]>:
Hi,
I'm using an external library with Flink I'm trying to implement
slotSharingGroup(String) method on it.
To do it I looked at SingleOutputStreamOperator Flink's class to see how the
method slotSharingGroup(String) is implemented.

An abstract:

/public class SingleOutputStreamOperator<T> extends DataStream<T> {

SingleOutputStreamOperator(StreamExecutionEnvironment environment,
StreamTransformation<T> transformation) {
                super(environment, transformation);
        }

SingleOutputStreamOperator<T> slotSharingGroup(String slotSharingGroup) {
                transformation.setSlotSharingGroup(slotSharingGroup);
                return this;
        }
}/

so I changed the constructor of external library class which has to offer
the slotSharingGroup() method making it more adherent to
SingleOutputStreamOperator template.

Now my problem is how to call it (see below) because I don't understand what
is StreamTransformation<T> object among the parameters of the constructor of
SingleOutputStreamOperator and how to obtain it in main class.

Following the method I call:

/DataStream<Tuple7&lt;String, String, Date, String, String, Double, Double>>
LCxAccResult = HTM.learn(env, */* what STREAMTRANSFORMATION<T> here?
*/*).slotSharingGroup("group");
/

Hope you can help me, thanks in advance
Andrea



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: StreamTransformation<T> object

AndreaKinn
This post was updated on .
Sorry Tony it is my fault, I was wrong the first post. Actually now my
situation is the following:


DataStream<Tuple7&lt;String, String, Date, String, String, Double, Double>>
LCxAccResult = HTM.learn(LCxAccStream, new Harness.AnomalyNetwork())
                                .select(new InferenceSelectFunction<Harness.KafkaRecord,
Tuple7&lt;String, String, Date, String, String, Double, Double>>() {...}


so actually the return value of "Learn" is a HTMStream object and the return
value of "Select" is a DataStream where I need to implement slotSharingGroup
on Learn. So I think I can't set SingleOutputStreamOperator as return value
of learn, I believe (I hope not since I have not a clue how to do it :D) I
need to define slotSharingGroup directly in HTMStream class, as in the first
post.


I think my only hope is to discover what is and how to retrieve that StreamTransformation object


--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: StreamTransformation<T> object

Tony Wei
Hi Andrea,

I roughly read that external library[1], and I think the return object of "select" function could be casted as `SingleOutputStreamOperator` type [2].
How about trying the following code?

DataStream<Tuple7<String, String, Date, String, String, Double, Double>> LCxAccResult = HTM.learn(LCxAccStream, new Harness.AnomalyNetwork())
      .select(new InferenceSelectFunction<Harness.KafkaRecord, Tuple7<String, String, Date, String, String, Double, Double>>() {...};
((SingleOutputStreamOperator) LCxAccResult).slotSharingGroup("...");

Best Regards,
Tony Wei


2017-10-26 16:31 GMT+08:00 AndreaKinn <[hidden email]>:
Sorry Tony it is my fault, I was wrong the first post. Actually now my
situation is the following:


DataStream<Tuple7&lt;String, String, Date, String, String, Double, Double>>
LCxAccResult = HTM.learn(LCxAccStream, new Harness.AnomalyNetwork())
                                .select(new InferenceSelectFunction<Harness.KafkaRecord,
Tuple7&lt;String, String, Date, String, String, Double, Double>>() {...}


so actually the return value of "Learn" is a HTMStream object and the return
value of "Select" is a DataStream where I need to implement slotSharingGroup
on Learn. So I think I can't set SingleOutputStreamOperator as return value
of learn, I believe (I hope not since I have not a clue how to do it :D) I
need to define slotSharingGroup directly in HTMStream class, as in the first
post.

Reply | Threaded
Open this post in threaded view
|

Re: StreamTransformation<T> object

AndreaKinn
Mmm looks good. This solution would be great.
In this way am I setting a slotSharing group for both learn and select
method and not only on select?
I believed I need to call slotSharingGroup exactly on the return type of
learn.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: StreamTransformation<T> object

Tony Wei
Hi Andrea,

In this way, you will only set a slotSharing group on select operator and learn operator will remain in the default group.
If you want to set lean operator as well, I am afraid that you need to refactor `HTMStream` to expose `InferenceStreamBuilder.build()`.

Best Regards,
Tony Wei

2017-10-26 17:01 GMT+08:00 AndreaKinn <[hidden email]>:
Mmm looks good. This solution would be great.
In this way am I setting a slotSharing group for both learn and select
method and not only on select?
I believed I need to call slotSharingGroup exactly on the return type of
learn.

Reply | Threaded
Open this post in threaded view
|

Re: StreamTransformation<T> object

AndreaKinn
Can you be clearer about this part?

I'm really appreciating your help


Tony Wei wrote
> you need to refactor `HTMStream` to expose
> `InferenceStreamBuilder.build()`.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: StreamTransformation<T> object

Tony Wei
Hi Andrea,

The `learn` operator is defined in this method [1]. If you need to set its slotSharing group, you should add `slotSharingGroup(...)` behind line 97 [2] or a new API to get the result from `inferenceStreamBuilder.build()`.

Best Regards,
Tony Wei


2017-10-26 17:36 GMT+08:00 AndreaKinn <[hidden email]>:
Can you be clearer about this part?

I'm really appreciating your help


Tony Wei wrote
> you need to refactor `HTMStream` to expose
> `InferenceStreamBuilder.build()`.






Reply | Threaded
Open this post in threaded view
|

Re: StreamTransformation<T> object

AndreaKinn
I'm trying to create an API to get results from build() but anyway I'm stuck
on the StreamTransformation which is essential to implement
slotSharingGroup(...). I have to provide it from the main class.


Tony Wei wrote

> Hi Andrea,
>
> The `learn` operator is defined in this method [1]. If you need to set its
> slotSharing group, you should add `slotSharingGroup(...)` behind line 97
> [2] or a new API to get the result from `inferenceStreamBuilder.build()`.
>
> Best Regards,
> Tony Wei
>
> [1]
> https://github.com/htm-community/flink-htm/blob/master/flink-htm-streaming-java/src/main/java/org/numenta/nupic/flink/streaming/api/HTMStream.java#L148
> [2]
> https://github.com/htm-community/flink-htm/blob/master/flink-htm-streaming-java/src/main/java/org/numenta/nupic/flink/streaming/api/HTMStream.java#L97
>
> 2017-10-26 17:36 GMT+08:00 AndreaKinn &lt;

> kinn6aer@

> &gt;:
>
>> Can you be clearer about this part?
>>
>> I'm really appreciating your help
>>
>>
>> Tony Wei wrote
>> > you need to refactor `HTMStream` to expose
>> > `InferenceStreamBuilder.build()`.
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/
>>





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: StreamTransformation<T> object

Tony Wei
Hi Andrea,

I don't think you need to touch `StreamTransformation`. If you can get the result from build(), you can do the same thing I mentioned above: casting it as SingleOutputStreamOperator.
Then, you can pass it to select function to add the next operator, and get the result to add another slotSharing group.

Best Regards,
Tony Wei

2017-10-27 17:18 GMT+08:00 AndreaKinn <[hidden email]>:
I'm trying to create an API to get results from build() but anyway I'm stuck
on the StreamTransformation which is essential to implement
slotSharingGroup(...). I have to provide it from the main class.


Tony Wei wrote
> Hi Andrea,
>
> The `learn` operator is defined in this method [1]. If you need to set its
> slotSharing group, you should add `slotSharingGroup(...)` behind line 97
> [2] or a new API to get the result from `inferenceStreamBuilder.build()`.
>
> Best Regards,
> Tony Wei
>
> [1]
> https://github.com/htm-community/flink-htm/blob/master/flink-htm-streaming-java/src/main/java/org/numenta/nupic/flink/streaming/api/HTMStream.java#L148
> [2]
> https://github.com/htm-community/flink-htm/blob/master/flink-htm-streaming-java/src/main/java/org/numenta/nupic/flink/streaming/api/HTMStream.java#L97
>
> 2017-10-26 17:36 GMT+08:00 AndreaKinn &lt;

> kinn6aer@

> &gt;:
>
>> Can you be clearer about this part?
>>
>> I'm really appreciating your help
>>
>>
>> Tony Wei wrote
>> > you need to refactor `HTMStream` to expose
>> > `InferenceStreamBuilder.build()`.
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/
>>





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: StreamTransformation<T> object

AndreaKinn
In reply to this post by AndreaKinn
Thanks for your help, I solved the issue refactoring HTMStream adding new
api's



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/