Use element of the DataStream in parameter of RichMapFunction (open function not called)

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Use element of the DataStream in parameter of RichMapFunction (open function not called)

Robin, Isabelle

Hello,

 

I’m working with Flink 1.4.2 (Scala API) and I’m having some trouble with my custom RichMapFunction as I want the element in my Datastream to also be used for a parameter of this custom class. My RichMapFunction is a simple counter based on a MapState

 

Let’s say I have those classes

-          case class Feature(transaction: Transaction) { override def getKey: (String, String) = … }

-          class CustomMapFunction(featureKey: (String, String)) extends RichMapFunction[Transaction, Transaction]

 

I implemented my custom map function with the needed functions but I encountered different issues as I tried several solutions for this. In the following chunks of code, stream is a DataStream[Transaction] and I expect a DataStream[Transaction] as output type too

 

·         stream.keyBy(transaction => Feature(transaction).getKey).map(transaction => new CustomMapFunction(Feature(transaction).getKey))

o   this leads to a compilation error (“Expression of type CustomMapFunction doesn’t conform to expected type R_”), which, as far as I understand, should come from the fact I’m already using transaction for the Feature(transaction) part

·         stream.keyBy(transaction => Feature(transaction).getKey).map(transaction => new CustomMapFunction(Feature(transaction).getKey).map(transaction))

o   compiles but fails with a NullPointerException at runtime as the MapState is not initialized. When running with debugger the open function was not used which leads the MapState to stay null (I don’t have this problem with a more simple version of my CustomMapFunction which does not need this parameter based on the transaction)

 

Do you have an idea of how I could solve this issue ?

 

Thanks in advance for any help and I hope I was clear enough (that’s my first question on the mailing list, don’t hesitate to say if I forgot some steps or elements J)

 

Best regards,

 

Isabelle

Reply | Threaded
Open this post in threaded view
|

Re: Use element of the DataStream in parameter of RichMapFunction (open function not called)

Fabian Hueske-2
Hi Isabelle,

Welcome to the Flink user mailing list!

You are mixing up the two ways to specify a function:

1. Defining a function as a class / object and passing an instance in the map() method. Given your CustomMapFunction class, this looks as follows:
stream.keyBy(...).map(new CustomMapFunction())

2. Defining a function as a lambda function. In this case, you don't have to define an extra class (CustomMapFunction in your case) but rather give the logic that you would put into the MapFunction.map() method as a lambda function.
Internally, the lambda function is "wrapped" in a MapFunction. This style looks as follows:
stream.keyBy(...).map(t => [directly apply your map() logic and return a Transaction])

In your code, you define a lambda function (by using "transaction => ...") that returns a CustomMapFunction, instead of a Transaction. Hence, the first program fails due to an invalid type.
I would recommend to use the 1. class/object style for complex logic and the 2. lambda function style for simple transformations, filters, key-extractions, ...

Btw. your key extractor (transaction => Feature.transaction).getKey) is not very efficient because it creates a new Feature object, for each transaction.

Best, Fabian


2018-05-30 17:38 GMT+02:00 Robin, Isabelle <[hidden email]>:

Hello,

 

I’m working with Flink 1.4.2 (Scala API) and I’m having some trouble with my custom RichMapFunction as I want the element in my Datastream to also be used for a parameter of this custom class. My RichMapFunction is a simple counter based on a MapState

 

Let’s say I have those classes

-          case class Feature(transaction: Transaction) { override def getKey: (String, String) = … }

-          class CustomMapFunction(featureKey: (String, String)) extends RichMapFunction[Transaction, Transaction]

 

I implemented my custom map function with the needed functions but I encountered different issues as I tried several solutions for this. In the following chunks of code, stream is a DataStream[Transaction] and I expect a DataStream[Transaction] as output type too

 

·         stream.keyBy(transaction => Feature(transaction).getKey).map(transaction => new CustomMapFunction(Feature(transaction).getKey))

o   this leads to a compilation error (“Expression of type CustomMapFunction doesn’t conform to expected type R_”), which, as far as I understand, should come from the fact I’m already using transaction for the Feature(transaction) part

·         stream.keyBy(transaction => Feature(transaction).getKey).map(transaction => new CustomMapFunction(Feature(transaction).getKey).map(transaction))

o   compiles but fails with a NullPointerException at runtime as the MapState is not initialized. When running with debugger the open function was not used which leads the MapState to stay null (I don’t have this problem with a more simple version of my CustomMapFunction which does not need this parameter based on the transaction)

 

Do you have an idea of how I could solve this issue ?

 

Thanks in advance for any help and I hope I was clear enough (that’s my first question on the mailing list, don’t hesitate to say if I forgot some steps or elements J)

 

Best regards,

 

Isabelle


Reply | Threaded
Open this post in threaded view
|

RE: Use element of the DataStream in parameter of RichMapFunction (open function not called)

Robin, Isabelle

Hello Fabian,

 

Thanks for this quick answer ! I understand I cannot mix the custom map function with the lambda definition. The workaround we found for the issue was to pass the function to get the key as a parameter to the CustomMapFunction instead of the key itself, and now it works !

 

Regards,

 

Isabelle

 

De : Fabian Hueske [mailto:[hidden email]]
Envoyé : jeudi 31 mai 2018 09:34
À : Robin, Isabelle <[hidden email]>
Cc : [hidden email]; Nejjar, Driss <[hidden email]>
Objet : Re: Use element of the DataStream in parameter of RichMapFunction (open function not called)

 

Hi Isabelle,

Welcome to the Flink user mailing list!

You are mixing up the two ways to specify a function:

1. Defining a function as a class / object and passing an instance in the map() method. Given your CustomMapFunction class, this looks as follows:

stream.keyBy(...).map(new CustomMapFunction())

2. Defining a function as a lambda function. In this case, you don't have to define an extra class (CustomMapFunction in your case) but rather give the logic that you would put into the MapFunction.map() method as a lambda function.
Internally, the lambda function is "wrapped" in a MapFunction. This style looks as follows:

stream.keyBy(...).map(t => [directly apply your map() logic and return a Transaction])

 

In your code, you define a lambda function (by using "transaction => ...") that returns a CustomMapFunction, instead of a Transaction. Hence, the first program fails due to an invalid type.

I would recommend to use the 1. class/object style for complex logic and the 2. lambda function style for simple transformations, filters, key-extractions, ...

Btw. your key extractor (transaction => Feature.transaction).getKey) is not very efficient because it creates a new Feature object, for each transaction.

Best, Fabian

 

 

2018-05-30 17:38 GMT+02:00 Robin, Isabelle <[hidden email]>:

Hello,

 

I’m working with Flink 1.4.2 (Scala API) and I’m having some trouble with my custom RichMapFunction as I want the element in my Datastream to also be used for a parameter of this custom class. My RichMapFunction is a simple counter based on a MapState

 

Let’s say I have those classes

-          case class Feature(transaction: Transaction) { override def getKey: (String, String) = … }

-          class CustomMapFunction(featureKey: (String, String)) extends RichMapFunction[Transaction, Transaction]

 

I implemented my custom map function with the needed functions but I encountered different issues as I tried several solutions for this. In the following chunks of code, stream is a DataStream[Transaction] and I expect a DataStream[Transaction] as output type too

 

·         stream.keyBy(transaction => Feature(transaction).getKey).map(transaction => new CustomMapFunction(Feature(transaction).getKey))

o   this leads to a compilation error (“Expression of type CustomMapFunction doesn’t conform to expected type R_”), which, as far as I understand, should come from the fact I’m already using transaction for the Feature(transaction) part

·         stream.keyBy(transaction => Feature(transaction).getKey).map(transaction => new CustomMapFunction(Feature(transaction).getKey).map(transaction))

o   compiles but fails with a NullPointerException at runtime as the MapState is not initialized. When running with debugger the open function was not used which leads the MapState to stay null (I don’t have this problem with a more simple version of my CustomMapFunction which does not need this parameter based on the transaction)

 

Do you have an idea of how I could solve this issue ?

 

Thanks in advance for any help and I hope I was clear enough (that’s my first question on the mailing list, don’t hesitate to say if I forgot some steps or elements J)

 

Best regards,

 

Isabelle