RichAsyncFunction for Scala?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

RichAsyncFunction for Scala?

Shannon Carey

I have some awkward code in a few Flink jobs which is converting a Scala stream into a Java stream in order to pass it to AsyncDataStream.unorderedWait(), and using a Java RichAsyncFunction, due to old versions of Flink not having the ability to do async stuff with a Scala stream.

 

In newer versions of Flink, I see that org.apache.flink.streaming.api.scala.AsyncDataStream is available. However, it accepts only org.apache.flink.streaming.api.scala.async.AsyncFunction, and there does not appear to be an AbstractRichFunction subclass of that trait as I expected. Is there a way to use the Scala interfaces but provide a rich AsyncFunction to AsyncDataStream.unorderedWait()? If not, I will leave the old code as-is.

 

Thanks,

Shannon

Reply | Threaded
Open this post in threaded view
|

Re: RichAsyncFunction for Scala?

Fabian Hueske-2
Hi Shannon,

That's a good observation. To be honest, I know why the Scala AsyncFunction does not implement RichFunction.
Maybe this was not intentional and just overlooked when porting the functionality to Scala.

Would you mind creating a Jira ticket for this?

Thank you,
Fabian

Am Di., 14. Mai 2019 um 23:29 Uhr schrieb Shannon Carey <[hidden email]>:

I have some awkward code in a few Flink jobs which is converting a Scala stream into a Java stream in order to pass it to AsyncDataStream.unorderedWait(), and using a Java RichAsyncFunction, due to old versions of Flink not having the ability to do async stuff with a Scala stream.

 

In newer versions of Flink, I see that org.apache.flink.streaming.api.scala.AsyncDataStream is available. However, it accepts only org.apache.flink.streaming.api.scala.async.AsyncFunction, and there does not appear to be an AbstractRichFunction subclass of that trait as I expected. Is there a way to use the Scala interfaces but provide a rich AsyncFunction to AsyncDataStream.unorderedWait()? If not, I will leave the old code as-is.

 

Thanks,

Shannon

Reply | Threaded
Open this post in threaded view
|

Re: RichAsyncFunction for Scala?

Rong Rong
Hi Shannon,

I think the RichAsyncFunction[1] extends from the normal AsyncFunction so regarding on the API perspective you should be able to use it.

The problem I think is with Scala anonymous function where I think it went through a different code path when wrapping the Scala RichAsyncFunction [2]. 
Is your problem specifically with the rich anonymous async function or do you also have problem with regular function extended from RichAsyncFunction?

--
Rong


On Thu, May 16, 2019 at 12:26 AM Fabian Hueske <[hidden email]> wrote:
Hi Shannon,

That's a good observation. To be honest, I know why the Scala AsyncFunction does not implement RichFunction.
Maybe this was not intentional and just overlooked when porting the functionality to Scala.

Would you mind creating a Jira ticket for this?

Thank you,
Fabian

Am Di., 14. Mai 2019 um 23:29 Uhr schrieb Shannon Carey <[hidden email]>:

I have some awkward code in a few Flink jobs which is converting a Scala stream into a Java stream in order to pass it to AsyncDataStream.unorderedWait(), and using a Java RichAsyncFunction, due to old versions of Flink not having the ability to do async stuff with a Scala stream.

 

In newer versions of Flink, I see that org.apache.flink.streaming.api.scala.AsyncDataStream is available. However, it accepts only org.apache.flink.streaming.api.scala.async.AsyncFunction, and there does not appear to be an AbstractRichFunction subclass of that trait as I expected. Is there a way to use the Scala interfaces but provide a rich AsyncFunction to AsyncDataStream.unorderedWait()? If not, I will leave the old code as-is.

 

Thanks,

Shannon