Inheritance and FlatMap with custom POJO

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Inheritance and FlatMap with custom POJO

Giacomo Licari
Hi guys,
I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types.

It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null. 

Here the derived class, using RainfallPOJO:

public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction<RainfallPOJO, RainfallPOJO> {

public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){
super(num, den, time_data_name, start_time, end_time, interval, time_unit);
}

public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) throws Exception {
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS");
try {
   Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
       if(time.after(this.startTime) && time.before(this.endTime)){
coll.collect(obj);
}    
} catch(Exception e){
e.printStackTrace();
}
}
}

My Base class is:

public class CullTimeBase {

  protected int numerator;
    protected int denominator;
    protected String timeDataName;
    protected Date startTime;
    protected Date endTime;
    protected int interval;
    protected String timeUnit;
public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){
numerator = num;
denominator = den;
timeDataName = time_data_name;
interval = interv;
timeUnit = time_unit;
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
try {
startTime = formatter.parse(start_time);
endTime = formatter.parse(end_time);
} catch (ParseException e) {
e.printStackTrace();
}
}

It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type.

Thanks a lot,
Giacomo
Reply | Threaded
Open this post in threaded view
|

Re: Inheritance and FlatMap with custom POJO

Martin Junghanns-2
Hi Giacomo,

I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example).

public [abstract] class CullTimeBase<IN, OUT> implements FlatMapFunction<IN, OUT> {
// ...
}

public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, RainFallPOJO> {
// ...
}

This should work.

Best,
Martin

On 16.09.2015 10:41, Giacomo Licari wrote:
Hi guys,
I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types.

It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null. 

Here the derived class, using RainfallPOJO:

public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction<RainfallPOJO, RainfallPOJO> {

public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){
super(num, den, time_data_name, start_time, end_time, interval, time_unit);
}

public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) throws Exception {
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS");
try {
   Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
       if(time.after(this.startTime) && time.before(this.endTime)){
coll.collect(obj);
}    
} catch(Exception e){
e.printStackTrace();
}
}
}

My Base class is:

public class CullTimeBase {

  protected int numerator;
    protected int denominator;
    protected String timeDataName;
    protected Date startTime;
    protected Date endTime;
    protected int interval;
    protected String timeUnit;
public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){
numerator = num;
denominator = den;
timeDataName = time_data_name;
interval = interv;
timeUnit = time_unit;
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
try {
startTime = formatter.parse(start_time);
endTime = formatter.parse(end_time);
} catch (ParseException e) {
e.printStackTrace();
}
}

It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type.

Thanks a lot,
Giacomo

Reply | Threaded
Open this post in threaded view
|

Re: Inheritance and FlatMap with custom POJO

Stephan Ewen
Hi!

Interesting case. We use plain Java Serialization to distribute UDFs, and perform additional "cleaning" of scopes, which may be causing the issue.

Can you try the following to see if any of those resolves the problem?

1) On the environment, disable the closure cleaner (in the execution config).

2) Let the CullTimeBase class implement java.io.Serializable.

Please let us know how it turns out!

Greetings,
Stephan


On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <[hidden email]> wrote:
Hi Giacomo,

I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example).

public [abstract] class CullTimeBase<IN, OUT> implements FlatMapFunction<IN, OUT> {
// ...
}

public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, RainFallPOJO> {
// ...
}

This should work.

Best,
Martin


On 16.09.2015 10:41, Giacomo Licari wrote:
Hi guys,
I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types.

It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null. 

Here the derived class, using RainfallPOJO:

public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction<RainfallPOJO, RainfallPOJO> {

public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){
super(num, den, time_data_name, start_time, end_time, interval, time_unit);
}

public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) throws Exception {
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS");
try {
   Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
       if(time.after(this.startTime) && time.before(this.endTime)){
coll.collect(obj);
}    
} catch(Exception e){
e.printStackTrace();
}
}
}

My Base class is:

public class CullTimeBase {

  protected int numerator;
    protected int denominator;
    protected String timeDataName;
    protected Date startTime;
    protected Date endTime;
    protected int interval;
    protected String timeUnit;
public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){
numerator = num;
denominator = den;
timeDataName = time_data_name;
interval = interv;
timeUnit = time_unit;
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
try {
startTime = formatter.parse(start_time);
endTime = formatter.parse(end_time);
} catch (ParseException e) {
e.printStackTrace();
}
}

It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type.

Thanks a lot,
Giacomo


Reply | Threaded
Open this post in threaded view
|

Re: Inheritance and FlatMap with custom POJO

Giacomo Licari
Thank you Martin and Stephan for your help.
I tried directly to implement java.io.Serializable in Base class and it worked perfectly!

Now I can develop more flexible and maintainable code. Thank you a lot guys.

Greetings,
Giacomo

On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

Interesting case. We use plain Java Serialization to distribute UDFs, and perform additional "cleaning" of scopes, which may be causing the issue.

Can you try the following to see if any of those resolves the problem?

1) On the environment, disable the closure cleaner (in the execution config).

2) Let the CullTimeBase class implement java.io.Serializable.

Please let us know how it turns out!

Greetings,
Stephan


On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <[hidden email]> wrote:
Hi Giacomo,

I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example).

public [abstract] class CullTimeBase<IN, OUT> implements FlatMapFunction<IN, OUT> {
// ...
}

public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, RainFallPOJO> {
// ...
}

This should work.

Best,
Martin


On 16.09.2015 10:41, Giacomo Licari wrote:
Hi guys,
I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types.

It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null. 

Here the derived class, using RainfallPOJO:

public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction<RainfallPOJO, RainfallPOJO> {

public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){
super(num, den, time_data_name, start_time, end_time, interval, time_unit);
}

public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) throws Exception {
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS");
try {
   Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
       if(time.after(this.startTime) && time.before(this.endTime)){
coll.collect(obj);
}    
} catch(Exception e){
e.printStackTrace();
}
}
}

My Base class is:

public class CullTimeBase {

  protected int numerator;
    protected int denominator;
    protected String timeDataName;
    protected Date startTime;
    protected Date endTime;
    protected int interval;
    protected String timeUnit;
public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){
numerator = num;
denominator = den;
timeDataName = time_data_name;
interval = interv;
timeUnit = time_unit;
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
try {
startTime = formatter.parse(start_time);
endTime = formatter.parse(end_time);
} catch (ParseException e) {
e.printStackTrace();
}
}

It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type.

Thanks a lot,
Giacomo



Reply | Threaded
Open this post in threaded view
|

Re: Inheritance and FlatMap with custom POJO

Stephan Ewen
Could you also try the other variant (disabeling the closure cleaner)? I would be curious if this behavior is expected Java Serialization behavior, or whether our pre-processing code is causing it.

Greetings,
Stephan


On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <[hidden email]> wrote:
Thank you Martin and Stephan for your help.
I tried directly to implement java.io.Serializable in Base class and it worked perfectly!

Now I can develop more flexible and maintainable code. Thank you a lot guys.

Greetings,
Giacomo

On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

Interesting case. We use plain Java Serialization to distribute UDFs, and perform additional "cleaning" of scopes, which may be causing the issue.

Can you try the following to see if any of those resolves the problem?

1) On the environment, disable the closure cleaner (in the execution config).

2) Let the CullTimeBase class implement java.io.Serializable.

Please let us know how it turns out!

Greetings,
Stephan


On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <[hidden email]> wrote:
Hi Giacomo,

I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example).

public [abstract] class CullTimeBase<IN, OUT> implements FlatMapFunction<IN, OUT> {
// ...
}

public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, RainFallPOJO> {
// ...
}

This should work.

Best,
Martin


On 16.09.2015 10:41, Giacomo Licari wrote:
Hi guys,
I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types.

It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null. 

Here the derived class, using RainfallPOJO:

public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction<RainfallPOJO, RainfallPOJO> {

public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){
super(num, den, time_data_name, start_time, end_time, interval, time_unit);
}

public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) throws Exception {
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS");
try {
   Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
       if(time.after(this.startTime) && time.before(this.endTime)){
coll.collect(obj);
}    
} catch(Exception e){
e.printStackTrace();
}
}
}

My Base class is:

public class CullTimeBase {

  protected int numerator;
    protected int denominator;
    protected String timeDataName;
    protected Date startTime;
    protected Date endTime;
    protected int interval;
    protected String timeUnit;
public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){
numerator = num;
denominator = den;
timeDataName = time_data_name;
interval = interv;
timeUnit = time_unit;
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
try {
startTime = formatter.parse(start_time);
endTime = formatter.parse(end_time);
} catch (ParseException e) {
e.printStackTrace();
}
}

It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type.

Thanks a lot,
Giacomo




Reply | Threaded
Open this post in threaded view
|

Re: Inheritance and FlatMap with custom POJO

Giacomo Licari
I run it only implementing java.io.Serializable without disabling the closure cleaner.

Another question I have is about POJO classes. 
I would also create a base POJO class with some common proprerties, and then extend it in new classes. These classes are used to convert a CSV into a dataset of POJO objects (of derived class type). 

In the following example, I create a DataSet of TwitterPOJO, which extends a Base class, adding the new property "tweet".

DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter)
                   .pojoType(TwitterPOJO.class, "table", "time", "tweet"); 

I obtain this error:
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.Flink.POJO.TwitterPOJO is not a valid POJO type
Exception in thread "main" java.lang.ClassCastException: org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.PojoTypeInfo

Greetings,
G.L.

On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <[hidden email]> wrote:
Could you also try the other variant (disabeling the closure cleaner)? I would be curious if this behavior is expected Java Serialization behavior, or whether our pre-processing code is causing it.

Greetings,
Stephan


On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <[hidden email]> wrote:
Thank you Martin and Stephan for your help.
I tried directly to implement java.io.Serializable in Base class and it worked perfectly!

Now I can develop more flexible and maintainable code. Thank you a lot guys.

Greetings,
Giacomo

On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

Interesting case. We use plain Java Serialization to distribute UDFs, and perform additional "cleaning" of scopes, which may be causing the issue.

Can you try the following to see if any of those resolves the problem?

1) On the environment, disable the closure cleaner (in the execution config).

2) Let the CullTimeBase class implement java.io.Serializable.

Please let us know how it turns out!

Greetings,
Stephan


On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <[hidden email]> wrote:
Hi Giacomo,

I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example).

public [abstract] class CullTimeBase<IN, OUT> implements FlatMapFunction<IN, OUT> {
// ...
}

public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, RainFallPOJO> {
// ...
}

This should work.

Best,
Martin


On 16.09.2015 10:41, Giacomo Licari wrote:
Hi guys,
I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types.

It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null. 

Here the derived class, using RainfallPOJO:

public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction<RainfallPOJO, RainfallPOJO> {

public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){
super(num, den, time_data_name, start_time, end_time, interval, time_unit);
}

public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) throws Exception {
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS");
try {
   Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
       if(time.after(this.startTime) && time.before(this.endTime)){
coll.collect(obj);
}    
} catch(Exception e){
e.printStackTrace();
}
}
}

My Base class is:

public class CullTimeBase {

  protected int numerator;
    protected int denominator;
    protected String timeDataName;
    protected Date startTime;
    protected Date endTime;
    protected int interval;
    protected String timeUnit;
public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){
numerator = num;
denominator = den;
timeDataName = time_data_name;
interval = interv;
timeUnit = time_unit;
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
try {
startTime = formatter.parse(start_time);
endTime = formatter.parse(end_time);
} catch (ParseException e) {
e.printStackTrace();
}
}

It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type.

Thanks a lot,
Giacomo





Reply | Threaded
Open this post in threaded view
|

Re: Inheritance and FlatMap with custom POJO

Chiwan Park-2
Hi Giacomo,

You should set your field as public. If you are set your field as private or protected, the class must provide getter and setter to be treated as POJO.

Maybe the documentation in homepage [1] would be helpful.

Regards,
Chiwan Park

[1] https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos

> On Sep 16, 2015, at 11:25 PM, Giacomo Licari <[hidden email]> wrote:
>
> I run it only implementing java.io.Serializable without disabling the closure cleaner.
>
> Another question I have is about POJO classes.
> I would also create a base POJO class with some common proprerties, and then extend it in new classes. These classes are used to convert a CSV into a dataset of POJO objects (of derived class type).
>
> In the following example, I create a DataSet of TwitterPOJO, which extends a Base class, adding the new property "tweet".
>
> DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter)
>                    .pojoType(TwitterPOJO.class, "table", "time", "tweet");
>
> I obtain this error:
> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.Flink.POJO.TwitterPOJO is not a valid POJO type
> Exception in thread "main" java.lang.ClassCastException: org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.PojoTypeInfo
>
> Greetings,
> G.L.
>
> On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <[hidden email]> wrote:
> Could you also try the other variant (disabeling the closure cleaner)? I would be curious if this behavior is expected Java Serialization behavior, or whether our pre-processing code is causing it.
>
> Greetings,
> Stephan
>
>
> On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <[hidden email]> wrote:
> Thank you Martin and Stephan for your help.
> I tried directly to implement java.io.Serializable in Base class and it worked perfectly!
>
> Now I can develop more flexible and maintainable code. Thank you a lot guys.
>
> Greetings,
> Giacomo
>
> On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <[hidden email]> wrote:
> Hi!
>
> Interesting case. We use plain Java Serialization to distribute UDFs, and perform additional "cleaning" of scopes, which may be causing the issue.
>
> Can you try the following to see if any of those resolves the problem?
>
> 1) On the environment, disable the closure cleaner (in the execution config).
>
> 2) Let the CullTimeBase class implement java.io.Serializable.
>
> Please let us know how it turns out!
>
> Greetings,
> Stephan
>
>
> On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <[hidden email]> wrote:
> Hi Giacomo,
>
> I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example).
>
> public [abstract] class CullTimeBase<IN, OUT> implements FlatMapFunction<IN, OUT> {
> // ...
> }
>
> public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, RainFallPOJO> {
> // ...
> }
>
> This should work.
>
> Best,
> Martin
>
>
> On 16.09.2015 10:41, Giacomo Licari wrote:
>> Hi guys,
>> I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types.
>>
>> It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null.
>>
>> Here the derived class, using RainfallPOJO:
>>
>> public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction<RainfallPOJO, RainfallPOJO> {
>>
>> public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){
>> super(num, den, time_data_name, start_time, end_time, interval, time_unit);
>> }
>>
>> public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) throws Exception {
>> DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS");
>> try {
>>    Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
>>        if(time.after(this.startTime) && time.before(this.endTime)){
>> coll.collect(obj);
>> }    
>> } catch(Exception e){
>> e.printStackTrace();
>> }
>> }
>>
>> }
>>
>> My Base class is:
>>
>> public class CullTimeBase {
>>
>>     protected int numerator;
>>     protected int denominator;
>>     protected String timeDataName;
>>     protected Date startTime;
>>     protected Date endTime;
>>     protected int interval;
>>     protected String timeUnit;
>>
>> public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){
>> numerator = num;
>> denominator = den;
>> timeDataName = time_data_name;
>> interval = interv;
>> timeUnit = time_unit;
>> DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
>> try {
>> startTime = formatter.parse(start_time);
>> endTime = formatter.parse(end_time);
>> } catch (ParseException e) {
>> e.printStackTrace();
>> }
>> }
>>
>> It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type.
>>
>> Thanks a lot,
>> Giacomo
>
>
>
>
>



Reply | Threaded
Open this post in threaded view
|

Re: Inheritance and FlatMap with custom POJO

Giacomo Licari
Hi Chiwan,
I followed instructions in documentation.
I have a simple base class with some properties (all public).
Then I extend that class with a new public property (tweet in my case), I provide also getter and setter for that property.

Now when I execute:
DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter)
                   .pojoType(TwitterPOJO.class, "table", "time", "tweet");

I receive:
There is no field called "table" in com.Flink.POJO.TwitterPOJO

table is a field of the Base class, declared as public with also getter and setter.

Thank you for your help.

Giacomo

On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park <[hidden email]> wrote:
Hi Giacomo,

You should set your field as public. If you are set your field as private or protected, the class must provide getter and setter to be treated as POJO.

Maybe the documentation in homepage [1] would be helpful.

Regards,
Chiwan Park

[1] https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos

> On Sep 16, 2015, at 11:25 PM, Giacomo Licari <[hidden email]> wrote:
>
> I run it only implementing java.io.Serializable without disabling the closure cleaner.
>
> Another question I have is about POJO classes.
> I would also create a base POJO class with some common proprerties, and then extend it in new classes. These classes are used to convert a CSV into a dataset of POJO objects (of derived class type).
>
> In the following example, I create a DataSet of TwitterPOJO, which extends a Base class, adding the new property "tweet".
>
> DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter)
>                    .pojoType(TwitterPOJO.class, "table", "time", "tweet");
>
> I obtain this error:
> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.Flink.POJO.TwitterPOJO is not a valid POJO type
> Exception in thread "main" java.lang.ClassCastException: org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.PojoTypeInfo
>
> Greetings,
> G.L.
>
> On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <[hidden email]> wrote:
> Could you also try the other variant (disabeling the closure cleaner)? I would be curious if this behavior is expected Java Serialization behavior, or whether our pre-processing code is causing it.
>
> Greetings,
> Stephan
>
>
> On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <[hidden email]> wrote:
> Thank you Martin and Stephan for your help.
> I tried directly to implement java.io.Serializable in Base class and it worked perfectly!
>
> Now I can develop more flexible and maintainable code. Thank you a lot guys.
>
> Greetings,
> Giacomo
>
> On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <[hidden email]> wrote:
> Hi!
>
> Interesting case. We use plain Java Serialization to distribute UDFs, and perform additional "cleaning" of scopes, which may be causing the issue.
>
> Can you try the following to see if any of those resolves the problem?
>
> 1) On the environment, disable the closure cleaner (in the execution config).
>
> 2) Let the CullTimeBase class implement java.io.Serializable.
>
> Please let us know how it turns out!
>
> Greetings,
> Stephan
>
>
> On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <[hidden email]> wrote:
> Hi Giacomo,
>
> I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example).
>
> public [abstract] class CullTimeBase<IN, OUT> implements FlatMapFunction<IN, OUT> {
> // ...
> }
>
> public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, RainFallPOJO> {
> // ...
> }
>
> This should work.
>
> Best,
> Martin
>
>
> On 16.09.2015 10:41, Giacomo Licari wrote:
>> Hi guys,
>> I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types.
>>
>> It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null.
>>
>> Here the derived class, using RainfallPOJO:
>>
>> public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction<RainfallPOJO, RainfallPOJO> {
>>
>>      public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){
>>              super(num, den, time_data_name, start_time, end_time, interval, time_unit);
>>      }
>>
>>      public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) throws Exception {
>>              DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS");
>>              try {
>>                  Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
>>              if(time.after(this.startTime) && time.before(this.endTime)){
>>                              coll.collect(obj);
>>                      }
>>              } catch(Exception e){
>>                      e.printStackTrace();
>>              }
>>      }
>>
>> }
>>
>> My Base class is:
>>
>> public class CullTimeBase {
>>
>>     protected int numerator;
>>     protected int denominator;
>>     protected String timeDataName;
>>     protected Date startTime;
>>     protected Date endTime;
>>     protected int interval;
>>     protected String timeUnit;
>>
>>      public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){
>>              numerator = num;
>>              denominator = den;
>>              timeDataName = time_data_name;
>>              interval = interv;
>>              timeUnit = time_unit;
>>              DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
>>              try {
>>                      startTime = formatter.parse(start_time);
>>                      endTime = formatter.parse(end_time);
>>              } catch (ParseException e) {
>>                      e.printStackTrace();
>>              }
>>      }
>>
>> It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type.
>>
>> Thanks a lot,
>> Giacomo
>
>
>
>
>




Reply | Threaded
Open this post in threaded view
|

Re: Inheritance and FlatMap with custom POJO

Chiwan Park-2
Hi Giacomo,

Did you create constructors without arguments in both base class and derived class?
If you do, it seems like a bug.

Regards,
Chiwan Park

> On Sep 17, 2015, at 12:04 AM, Giacomo Licari <[hidden email]> wrote:
>
> Hi Chiwan,
> I followed instructions in documentation.
> I have a simple base class with some properties (all public).
> Then I extend that class with a new public property (tweet in my case), I provide also getter and setter for that property.
>
> Now when I execute:
> DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter)
>                    .pojoType(TwitterPOJO.class, "table", "time", "tweet");
>
> I receive:
> There is no field called "table" in com.Flink.POJO.TwitterPOJO
>
> table is a field of the Base class, declared as public with also getter and setter.
>
> Thank you for your help.
>
> Giacomo
>
> On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park <[hidden email]> wrote:
> Hi Giacomo,
>
> You should set your field as public. If you are set your field as private or protected, the class must provide getter and setter to be treated as POJO.
>
> Maybe the documentation in homepage [1] would be helpful.
>
> Regards,
> Chiwan Park
>
> [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos
>
> > On Sep 16, 2015, at 11:25 PM, Giacomo Licari <[hidden email]> wrote:
> >
> > I run it only implementing java.io.Serializable without disabling the closure cleaner.
> >
> > Another question I have is about POJO classes.
> > I would also create a base POJO class with some common proprerties, and then extend it in new classes. These classes are used to convert a CSV into a dataset of POJO objects (of derived class type).
> >
> > In the following example, I create a DataSet of TwitterPOJO, which extends a Base class, adding the new property "tweet".
> >
> > DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter)
> >                    .pojoType(TwitterPOJO.class, "table", "time", "tweet");
> >
> > I obtain this error:
> > [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.Flink.POJO.TwitterPOJO is not a valid POJO type
> > Exception in thread "main" java.lang.ClassCastException: org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.PojoTypeInfo
> >
> > Greetings,
> > G.L.
> >
> > On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <[hidden email]> wrote:
> > Could you also try the other variant (disabeling the closure cleaner)? I would be curious if this behavior is expected Java Serialization behavior, or whether our pre-processing code is causing it.
> >
> > Greetings,
> > Stephan
> >
> >
> > On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <[hidden email]> wrote:
> > Thank you Martin and Stephan for your help.
> > I tried directly to implement java.io.Serializable in Base class and it worked perfectly!
> >
> > Now I can develop more flexible and maintainable code. Thank you a lot guys.
> >
> > Greetings,
> > Giacomo
> >
> > On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <[hidden email]> wrote:
> > Hi!
> >
> > Interesting case. We use plain Java Serialization to distribute UDFs, and perform additional "cleaning" of scopes, which may be causing the issue.
> >
> > Can you try the following to see if any of those resolves the problem?
> >
> > 1) On the environment, disable the closure cleaner (in the execution config).
> >
> > 2) Let the CullTimeBase class implement java.io.Serializable.
> >
> > Please let us know how it turns out!
> >
> > Greetings,
> > Stephan
> >
> >
> > On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <[hidden email]> wrote:
> > Hi Giacomo,
> >
> > I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example).
> >
> > public [abstract] class CullTimeBase<IN, OUT> implements FlatMapFunction<IN, OUT> {
> > // ...
> > }
> >
> > public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, RainFallPOJO> {
> > // ...
> > }
> >
> > This should work.
> >
> > Best,
> > Martin
> >
> >
> > On 16.09.2015 10:41, Giacomo Licari wrote:
> >> Hi guys,
> >> I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types.
> >>
> >> It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null.
> >>
> >> Here the derived class, using RainfallPOJO:
> >>
> >> public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction<RainfallPOJO, RainfallPOJO> {
> >>
> >>      public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){
> >>              super(num, den, time_data_name, start_time, end_time, interval, time_unit);
> >>      }
> >>
> >>      public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) throws Exception {
> >>              DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS");
> >>              try {
> >>                  Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
> >>              if(time.after(this.startTime) && time.before(this.endTime)){
> >>                              coll.collect(obj);
> >>                      }
> >>              } catch(Exception e){
> >>                      e.printStackTrace();
> >>              }
> >>      }
> >>
> >> }
> >>
> >> My Base class is:
> >>
> >> public class CullTimeBase {
> >>
> >>     protected int numerator;
> >>     protected int denominator;
> >>     protected String timeDataName;
> >>     protected Date startTime;
> >>     protected Date endTime;
> >>     protected int interval;
> >>     protected String timeUnit;
> >>
> >>      public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){
> >>              numerator = num;
> >>              denominator = den;
> >>              timeDataName = time_data_name;
> >>              interval = interv;
> >>              timeUnit = time_unit;
> >>              DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
> >>              try {
> >>                      startTime = formatter.parse(start_time);
> >>                      endTime = formatter.parse(end_time);
> >>              } catch (ParseException e) {
> >>                      e.printStackTrace();
> >>              }
> >>      }
> >>
> >> It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type.
> >>
> >> Thanks a lot,
> >> Giacomo
> >
> >
> >
> >
> >
>
>
>
>




Reply | Threaded
Open this post in threaded view
|

Re: Inheritance and FlatMap with custom POJO

Giacomo Licari
Yes I did.

if anyone has a bypass solution, let us know.

Regards,
Giacomo Licari

On Wed, Sep 16, 2015 at 5:15 PM, Chiwan Park <[hidden email]> wrote:
Hi Giacomo,

Did you create constructors without arguments in both base class and derived class?
If you do, it seems like a bug.

Regards,
Chiwan Park

> On Sep 17, 2015, at 12:04 AM, Giacomo Licari <[hidden email]> wrote:
>
> Hi Chiwan,
> I followed instructions in documentation.
> I have a simple base class with some properties (all public).
> Then I extend that class with a new public property (tweet in my case), I provide also getter and setter for that property.
>
> Now when I execute:
> DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter)
>                    .pojoType(TwitterPOJO.class, "table", "time", "tweet");
>
> I receive:
> There is no field called "table" in com.Flink.POJO.TwitterPOJO
>
> table is a field of the Base class, declared as public with also getter and setter.
>
> Thank you for your help.
>
> Giacomo
>
> On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park <[hidden email]> wrote:
> Hi Giacomo,
>
> You should set your field as public. If you are set your field as private or protected, the class must provide getter and setter to be treated as POJO.
>
> Maybe the documentation in homepage [1] would be helpful.
>
> Regards,
> Chiwan Park
>
> [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos
>
> > On Sep 16, 2015, at 11:25 PM, Giacomo Licari <[hidden email]> wrote:
> >
> > I run it only implementing java.io.Serializable without disabling the closure cleaner.
> >
> > Another question I have is about POJO classes.
> > I would also create a base POJO class with some common proprerties, and then extend it in new classes. These classes are used to convert a CSV into a dataset of POJO objects (of derived class type).
> >
> > In the following example, I create a DataSet of TwitterPOJO, which extends a Base class, adding the new property "tweet".
> >
> > DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter)
> >                    .pojoType(TwitterPOJO.class, "table", "time", "tweet");
> >
> > I obtain this error:
> > [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.Flink.POJO.TwitterPOJO is not a valid POJO type
> > Exception in thread "main" java.lang.ClassCastException: org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.PojoTypeInfo
> >
> > Greetings,
> > G.L.
> >
> > On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <[hidden email]> wrote:
> > Could you also try the other variant (disabeling the closure cleaner)? I would be curious if this behavior is expected Java Serialization behavior, or whether our pre-processing code is causing it.
> >
> > Greetings,
> > Stephan
> >
> >
> > On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <[hidden email]> wrote:
> > Thank you Martin and Stephan for your help.
> > I tried directly to implement java.io.Serializable in Base class and it worked perfectly!
> >
> > Now I can develop more flexible and maintainable code. Thank you a lot guys.
> >
> > Greetings,
> > Giacomo
> >
> > On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <[hidden email]> wrote:
> > Hi!
> >
> > Interesting case. We use plain Java Serialization to distribute UDFs, and perform additional "cleaning" of scopes, which may be causing the issue.
> >
> > Can you try the following to see if any of those resolves the problem?
> >
> > 1) On the environment, disable the closure cleaner (in the execution config).
> >
> > 2) Let the CullTimeBase class implement java.io.Serializable.
> >
> > Please let us know how it turns out!
> >
> > Greetings,
> > Stephan
> >
> >
> > On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <[hidden email]> wrote:
> > Hi Giacomo,
> >
> > I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example).
> >
> > public [abstract] class CullTimeBase<IN, OUT> implements FlatMapFunction<IN, OUT> {
> > // ...
> > }
> >
> > public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, RainFallPOJO> {
> > // ...
> > }
> >
> > This should work.
> >
> > Best,
> > Martin
> >
> >
> > On 16.09.2015 10:41, Giacomo Licari wrote:
> >> Hi guys,
> >> I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types.
> >>
> >> It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null.
> >>
> >> Here the derived class, using RainfallPOJO:
> >>
> >> public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction<RainfallPOJO, RainfallPOJO> {
> >>
> >>      public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){
> >>              super(num, den, time_data_name, start_time, end_time, interval, time_unit);
> >>      }
> >>
> >>      public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) throws Exception {
> >>              DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS");
> >>              try {
> >>                  Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
> >>              if(time.after(this.startTime) && time.before(this.endTime)){
> >>                              coll.collect(obj);
> >>                      }
> >>              } catch(Exception e){
> >>                      e.printStackTrace();
> >>              }
> >>      }
> >>
> >> }
> >>
> >> My Base class is:
> >>
> >> public class CullTimeBase {
> >>
> >>     protected int numerator;
> >>     protected int denominator;
> >>     protected String timeDataName;
> >>     protected Date startTime;
> >>     protected Date endTime;
> >>     protected int interval;
> >>     protected String timeUnit;
> >>
> >>      public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){
> >>              numerator = num;
> >>              denominator = den;
> >>              timeDataName = time_data_name;
> >>              interval = interv;
> >>              timeUnit = time_unit;
> >>              DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
> >>              try {
> >>                      startTime = formatter.parse(start_time);
> >>                      endTime = formatter.parse(end_time);
> >>              } catch (ParseException e) {
> >>                      e.printStackTrace();
> >>              }
> >>      }
> >>
> >> It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type.
> >>
> >> Thanks a lot,
> >> Giacomo
> >
> >
> >
> >
> >
>
>
>
>





Reply | Threaded
Open this post in threaded view
|

Re: Inheritance and FlatMap with custom POJO

Chiwan Park-2
It seems like a bug of CsvInputFormat. I succeed in reproducing in my local machine.
I will create a JIRA issue for this and submit a patch to fix it.

Which version of Flink are used?

Regards,
Chiwan Park

> On Sep 17, 2015, at 12:20 AM, Giacomo Licari <[hidden email]> wrote:
>
> Yes I did.
>
> if anyone has a bypass solution, let us know.
>
> Regards,
> Giacomo Licari
>
> On Wed, Sep 16, 2015 at 5:15 PM, Chiwan Park <[hidden email]> wrote:
> Hi Giacomo,
>
> Did you create constructors without arguments in both base class and derived class?
> If you do, it seems like a bug.
>
> Regards,
> Chiwan Park
>
> > On Sep 17, 2015, at 12:04 AM, Giacomo Licari <[hidden email]> wrote:
> >
> > Hi Chiwan,
> > I followed instructions in documentation.
> > I have a simple base class with some properties (all public).
> > Then I extend that class with a new public property (tweet in my case), I provide also getter and setter for that property.
> >
> > Now when I execute:
> > DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter)
> >                    .pojoType(TwitterPOJO.class, "table", "time", "tweet");
> >
> > I receive:
> > There is no field called "table" in com.Flink.POJO.TwitterPOJO
> >
> > table is a field of the Base class, declared as public with also getter and setter.
> >
> > Thank you for your help.
> >
> > Giacomo
> >
> > On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park <[hidden email]> wrote:
> > Hi Giacomo,
> >
> > You should set your field as public. If you are set your field as private or protected, the class must provide getter and setter to be treated as POJO.
> >
> > Maybe the documentation in homepage [1] would be helpful.
> >
> > Regards,
> > Chiwan Park
> >
> > [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos
> >
> > > On Sep 16, 2015, at 11:25 PM, Giacomo Licari <[hidden email]> wrote:
> > >
> > > I run it only implementing java.io.Serializable without disabling the closure cleaner.
> > >
> > > Another question I have is about POJO classes.
> > > I would also create a base POJO class with some common proprerties, and then extend it in new classes. These classes are used to convert a CSV into a dataset of POJO objects (of derived class type).
> > >
> > > In the following example, I create a DataSet of TwitterPOJO, which extends a Base class, adding the new property "tweet".
> > >
> > > DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter)
> > >                    .pojoType(TwitterPOJO.class, "table", "time", "tweet");
> > >
> > > I obtain this error:
> > > [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.Flink.POJO.TwitterPOJO is not a valid POJO type
> > > Exception in thread "main" java.lang.ClassCastException: org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.PojoTypeInfo
> > >
> > > Greetings,
> > > G.L.
> > >
> > > On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <[hidden email]> wrote:
> > > Could you also try the other variant (disabeling the closure cleaner)? I would be curious if this behavior is expected Java Serialization behavior, or whether our pre-processing code is causing it.
> > >
> > > Greetings,
> > > Stephan
> > >
> > >
> > > On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <[hidden email]> wrote:
> > > Thank you Martin and Stephan for your help.
> > > I tried directly to implement java.io.Serializable in Base class and it worked perfectly!
> > >
> > > Now I can develop more flexible and maintainable code. Thank you a lot guys.
> > >
> > > Greetings,
> > > Giacomo
> > >
> > > On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <[hidden email]> wrote:
> > > Hi!
> > >
> > > Interesting case. We use plain Java Serialization to distribute UDFs, and perform additional "cleaning" of scopes, which may be causing the issue.
> > >
> > > Can you try the following to see if any of those resolves the problem?
> > >
> > > 1) On the environment, disable the closure cleaner (in the execution config).
> > >
> > > 2) Let the CullTimeBase class implement java.io.Serializable.
> > >
> > > Please let us know how it turns out!
> > >
> > > Greetings,
> > > Stephan
> > >
> > >
> > > On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <[hidden email]> wrote:
> > > Hi Giacomo,
> > >
> > > I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example).
> > >
> > > public [abstract] class CullTimeBase<IN, OUT> implements FlatMapFunction<IN, OUT> {
> > > // ...
> > > }
> > >
> > > public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, RainFallPOJO> {
> > > // ...
> > > }
> > >
> > > This should work.
> > >
> > > Best,
> > > Martin
> > >
> > >
> > > On 16.09.2015 10:41, Giacomo Licari wrote:
> > >> Hi guys,
> > >> I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types.
> > >>
> > >> It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null.
> > >>
> > >> Here the derived class, using RainfallPOJO:
> > >>
> > >> public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction<RainfallPOJO, RainfallPOJO> {
> > >>
> > >>      public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){
> > >>              super(num, den, time_data_name, start_time, end_time, interval, time_unit);
> > >>      }
> > >>
> > >>      public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) throws Exception {
> > >>              DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS");
> > >>              try {
> > >>                  Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
> > >>              if(time.after(this.startTime) && time.before(this.endTime)){
> > >>                              coll.collect(obj);
> > >>                      }
> > >>              } catch(Exception e){
> > >>                      e.printStackTrace();
> > >>              }
> > >>      }
> > >>
> > >> }
> > >>
> > >> My Base class is:
> > >>
> > >> public class CullTimeBase {
> > >>
> > >>     protected int numerator;
> > >>     protected int denominator;
> > >>     protected String timeDataName;
> > >>     protected Date startTime;
> > >>     protected Date endTime;
> > >>     protected int interval;
> > >>     protected String timeUnit;
> > >>
> > >>      public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){
> > >>              numerator = num;
> > >>              denominator = den;
> > >>              timeDataName = time_data_name;
> > >>              interval = interv;
> > >>              timeUnit = time_unit;
> > >>              DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
> > >>              try {
> > >>                      startTime = formatter.parse(start_time);
> > >>                      endTime = formatter.parse(end_time);
> > >>              } catch (ParseException e) {
> > >>                      e.printStackTrace();
> > >>              }
> > >>      }
> > >>
> > >> It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type.
> > >>
> > >> Thanks a lot,
> > >> Giacomo
> > >
> > >
> > >
> > >
> > >
> >
> >
> >
> >
>
>
>
>
>



Reply | Threaded
Open this post in threaded view
|

Re: Inheritance and FlatMap with custom POJO

Chiwan Park-2
I created a JIRA issue [1]. After FLINK-2637 [2][3] is resolved, I’ll submit a patch to solve this.
Currently, there is no way to use derived class with CSV input.

Thank you for reporting.

[1] https://issues.apache.org/jira/browse/FLINK-2690
[2] https://issues.apache.org/jira/browse/FLINK-2637
[3] https://github.com/apache/flink/pull/1134

Regards,
Chiwan Park

> On Sep 17, 2015, at 1:33 AM, Chiwan Park <[hidden email]> wrote:
>
> It seems like a bug of CsvInputFormat. I succeed in reproducing in my local machine.
> I will create a JIRA issue for this and submit a patch to fix it.
>
> Which version of Flink are used?
>
> Regards,
> Chiwan Park
>
>> On Sep 17, 2015, at 12:20 AM, Giacomo Licari <[hidden email]> wrote:
>>
>> Yes I did.
>>
>> if anyone has a bypass solution, let us know.
>>
>> Regards,
>> Giacomo Licari
>>
>> On Wed, Sep 16, 2015 at 5:15 PM, Chiwan Park <[hidden email]> wrote:
>> Hi Giacomo,
>>
>> Did you create constructors without arguments in both base class and derived class?
>> If you do, it seems like a bug.
>>
>> Regards,
>> Chiwan Park
>>
>>> On Sep 17, 2015, at 12:04 AM, Giacomo Licari <[hidden email]> wrote:
>>>
>>> Hi Chiwan,
>>> I followed instructions in documentation.
>>> I have a simple base class with some properties (all public).
>>> Then I extend that class with a new public property (tweet in my case), I provide also getter and setter for that property.
>>>
>>> Now when I execute:
>>> DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter)
>>>                   .pojoType(TwitterPOJO.class, "table", "time", "tweet");
>>>
>>> I receive:
>>> There is no field called "table" in com.Flink.POJO.TwitterPOJO
>>>
>>> table is a field of the Base class, declared as public with also getter and setter.
>>>
>>> Thank you for your help.
>>>
>>> Giacomo
>>>
>>> On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park <[hidden email]> wrote:
>>> Hi Giacomo,
>>>
>>> You should set your field as public. If you are set your field as private or protected, the class must provide getter and setter to be treated as POJO.
>>>
>>> Maybe the documentation in homepage [1] would be helpful.
>>>
>>> Regards,
>>> Chiwan Park
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos
>>>
>>>> On Sep 16, 2015, at 11:25 PM, Giacomo Licari <[hidden email]> wrote:
>>>>
>>>> I run it only implementing java.io.Serializable without disabling the closure cleaner.
>>>>
>>>> Another question I have is about POJO classes.
>>>> I would also create a base POJO class with some common proprerties, and then extend it in new classes. These classes are used to convert a CSV into a dataset of POJO objects (of derived class type).
>>>>
>>>> In the following example, I create a DataSet of TwitterPOJO, which extends a Base class, adding the new property "tweet".
>>>>
>>>> DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter)
>>>>                   .pojoType(TwitterPOJO.class, "table", "time", "tweet");
>>>>
>>>> I obtain this error:
>>>> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.Flink.POJO.TwitterPOJO is not a valid POJO type
>>>> Exception in thread "main" java.lang.ClassCastException: org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.PojoTypeInfo
>>>>
>>>> Greetings,
>>>> G.L.
>>>>
>>>> On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <[hidden email]> wrote:
>>>> Could you also try the other variant (disabeling the closure cleaner)? I would be curious if this behavior is expected Java Serialization behavior, or whether our pre-processing code is causing it.
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <[hidden email]> wrote:
>>>> Thank you Martin and Stephan for your help.
>>>> I tried directly to implement java.io.Serializable in Base class and it worked perfectly!
>>>>
>>>> Now I can develop more flexible and maintainable code. Thank you a lot guys.
>>>>
>>>> Greetings,
>>>> Giacomo
>>>>
>>>> On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <[hidden email]> wrote:
>>>> Hi!
>>>>
>>>> Interesting case. We use plain Java Serialization to distribute UDFs, and perform additional "cleaning" of scopes, which may be causing the issue.
>>>>
>>>> Can you try the following to see if any of those resolves the problem?
>>>>
>>>> 1) On the environment, disable the closure cleaner (in the execution config).
>>>>
>>>> 2) Let the CullTimeBase class implement java.io.Serializable.
>>>>
>>>> Please let us know how it turns out!
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <[hidden email]> wrote:
>>>> Hi Giacomo,
>>>>
>>>> I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example).
>>>>
>>>> public [abstract] class CullTimeBase<IN, OUT> implements FlatMapFunction<IN, OUT> {
>>>> // ...
>>>> }
>>>>
>>>> public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, RainFallPOJO> {
>>>> // ...
>>>> }
>>>>
>>>> This should work.
>>>>
>>>> Best,
>>>> Martin
>>>>
>>>>
>>>> On 16.09.2015 10:41, Giacomo Licari wrote:
>>>>> Hi guys,
>>>>> I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types.
>>>>>
>>>>> It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null.
>>>>>
>>>>> Here the derived class, using RainfallPOJO:
>>>>>
>>>>> public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction<RainfallPOJO, RainfallPOJO> {
>>>>>
>>>>>     public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){
>>>>>             super(num, den, time_data_name, start_time, end_time, interval, time_unit);
>>>>>     }
>>>>>
>>>>>     public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) throws Exception {
>>>>>             DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS");
>>>>>             try {
>>>>>                 Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
>>>>>             if(time.after(this.startTime) && time.before(this.endTime)){
>>>>>                             coll.collect(obj);
>>>>>                     }
>>>>>             } catch(Exception e){
>>>>>                     e.printStackTrace();
>>>>>             }
>>>>>     }
>>>>>
>>>>> }
>>>>>
>>>>> My Base class is:
>>>>>
>>>>> public class CullTimeBase {
>>>>>
>>>>>    protected int numerator;
>>>>>    protected int denominator;
>>>>>    protected String timeDataName;
>>>>>    protected Date startTime;
>>>>>    protected Date endTime;
>>>>>    protected int interval;
>>>>>    protected String timeUnit;
>>>>>
>>>>>     public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){
>>>>>             numerator = num;
>>>>>             denominator = den;
>>>>>             timeDataName = time_data_name;
>>>>>             interval = interv;
>>>>>             timeUnit = time_unit;
>>>>>             DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
>>>>>             try {
>>>>>                     startTime = formatter.parse(start_time);
>>>>>                     endTime = formatter.parse(end_time);
>>>>>             } catch (ParseException e) {
>>>>>                     e.printStackTrace();
>>>>>             }
>>>>>     }
>>>>>
>>>>> It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type.
>>>>>
>>>>> Thanks a lot,
>>>>> Giacomo
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>>
>>
>>
>>
>>
>>
>
>
>




Reply | Threaded
Open this post in threaded view
|

Re: Inheritance and FlatMap with custom POJO

Giacomo Licari

Hi Chiwan,
I'm using Flink 0.9. 1

Cheers,
Giacomo

I created a JIRA issue [1]. After FLINK-2637 [2][3] is resolved, I’ll submit a patch to solve this.
Currently, there is no way to use derived class with CSV input.

Thank you for reporting.

[1] https://issues.apache.org/jira/browse/FLINK-2690
[2] https://issues.apache.org/jira/browse/FLINK-2637
[3] https://github.com/apache/flink/pull/1134

Regards,
Chiwan Park

> On Sep 17, 2015, at 1:33 AM, Chiwan Park <[hidden email]> wrote:
>
> It seems like a bug of CsvInputFormat. I succeed in reproducing in my local machine.
> I will create a JIRA issue for this and submit a patch to fix it.
>
> Which version of Flink are used?
>
> Regards,
> Chiwan Park
>
>> On Sep 17, 2015, at 12:20 AM, Giacomo Licari <[hidden email]> wrote:
>>
>> Yes I did.
>>
>> if anyone has a bypass solution, let us know.
>>
>> Regards,
>> Giacomo Licari
>>
>> On Wed, Sep 16, 2015 at 5:15 PM, Chiwan Park <[hidden email]> wrote:
>> Hi Giacomo,
>>
>> Did you create constructors without arguments in both base class and derived class?
>> If you do, it seems like a bug.
>>
>> Regards,
>> Chiwan Park
>>
>>> On Sep 17, 2015, at 12:04 AM, Giacomo Licari <[hidden email]> wrote:
>>>
>>> Hi Chiwan,
>>> I followed instructions in documentation.
>>> I have a simple base class with some properties (all public).
>>> Then I extend that class with a new public property (tweet in my case), I provide also getter and setter for that property.
>>>
>>> Now when I execute:
>>> DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter)
>>>                   .pojoType(TwitterPOJO.class, "table", "time", "tweet");
>>>
>>> I receive:
>>> There is no field called "table" in com.Flink.POJO.TwitterPOJO
>>>
>>> table is a field of the Base class, declared as public with also getter and setter.
>>>
>>> Thank you for your help.
>>>
>>> Giacomo
>>>
>>> On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park <[hidden email]> wrote:
>>> Hi Giacomo,
>>>
>>> You should set your field as public. If you are set your field as private or protected, the class must provide getter and setter to be treated as POJO.
>>>
>>> Maybe the documentation in homepage [1] would be helpful.
>>>
>>> Regards,
>>> Chiwan Park
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos
>>>
>>>> On Sep 16, 2015, at 11:25 PM, Giacomo Licari <[hidden email]> wrote:
>>>>
>>>> I run it only implementing java.io.Serializable without disabling the closure cleaner.
>>>>
>>>> Another question I have is about POJO classes.
>>>> I would also create a base POJO class with some common proprerties, and then extend it in new classes. These classes are used to convert a CSV into a dataset of POJO objects (of derived class type).
>>>>
>>>> In the following example, I create a DataSet of TwitterPOJO, which extends a Base class, adding the new property "tweet".
>>>>
>>>> DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter)
>>>>                   .pojoType(TwitterPOJO.class, "table", "time", "tweet");
>>>>
>>>> I obtain this error:
>>>> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.Flink.POJO.TwitterPOJO is not a valid POJO type
>>>> Exception in thread "main" java.lang.ClassCastException: org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.PojoTypeInfo
>>>>
>>>> Greetings,
>>>> G.L.
>>>>
>>>> On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <[hidden email]> wrote:
>>>> Could you also try the other variant (disabeling the closure cleaner)? I would be curious if this behavior is expected Java Serialization behavior, or whether our pre-processing code is causing it.
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <[hidden email]> wrote:
>>>> Thank you Martin and Stephan for your help.
>>>> I tried directly to implement java.io.Serializable in Base class and it worked perfectly!
>>>>
>>>> Now I can develop more flexible and maintainable code. Thank you a lot guys.
>>>>
>>>> Greetings,
>>>> Giacomo
>>>>
>>>> On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <[hidden email]> wrote:
>>>> Hi!
>>>>
>>>> Interesting case. We use plain Java Serialization to distribute UDFs, and perform additional "cleaning" of scopes, which may be causing the issue.
>>>>
>>>> Can you try the following to see if any of those resolves the problem?
>>>>
>>>> 1) On the environment, disable the closure cleaner (in the execution config).
>>>>
>>>> 2) Let the CullTimeBase class implement java.io.Serializable.
>>>>
>>>> Please let us know how it turns out!
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <[hidden email]> wrote:
>>>> Hi Giacomo,
>>>>
>>>> I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example).
>>>>
>>>> public [abstract] class CullTimeBase<IN, OUT> implements FlatMapFunction<IN, OUT> {
>>>> // ...
>>>> }
>>>>
>>>> public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, RainFallPOJO> {
>>>> // ...
>>>> }
>>>>
>>>> This should work.
>>>>
>>>> Best,
>>>> Martin
>>>>
>>>>
>>>> On 16.09.2015 10:41, Giacomo Licari wrote:
>>>>> Hi guys,
>>>>> I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types.
>>>>>
>>>>> It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null.
>>>>>
>>>>> Here the derived class, using RainfallPOJO:
>>>>>
>>>>> public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction<RainfallPOJO, RainfallPOJO> {
>>>>>
>>>>>     public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){
>>>>>             super(num, den, time_data_name, start_time, end_time, interval, time_unit);
>>>>>     }
>>>>>
>>>>>     public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) throws Exception {
>>>>>             DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS");
>>>>>             try {
>>>>>                 Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) );
>>>>>             if(time.after(this.startTime) && time.before(this.endTime)){
>>>>>                             coll.collect(obj);
>>>>>                     }
>>>>>             } catch(Exception e){
>>>>>                     e.printStackTrace();
>>>>>             }
>>>>>     }
>>>>>
>>>>> }
>>>>>
>>>>> My Base class is:
>>>>>
>>>>> public class CullTimeBase {
>>>>>
>>>>>    protected int numerator;
>>>>>    protected int denominator;
>>>>>    protected String timeDataName;
>>>>>    protected Date startTime;
>>>>>    protected Date endTime;
>>>>>    protected int interval;
>>>>>    protected String timeUnit;
>>>>>
>>>>>     public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){
>>>>>             numerator = num;
>>>>>             denominator = den;
>>>>>             timeDataName = time_data_name;
>>>>>             interval = interv;
>>>>>             timeUnit = time_unit;
>>>>>             DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
>>>>>             try {
>>>>>                     startTime = formatter.parse(start_time);
>>>>>                     endTime = formatter.parse(end_time);
>>>>>             } catch (ParseException e) {
>>>>>                     e.printStackTrace();
>>>>>             }
>>>>>     }
>>>>>
>>>>> It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type.
>>>>>
>>>>> Thanks a lot,
>>>>> Giacomo
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>>
>>
>>
>>
>>
>>
>
>
>