Hi guys,
I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types. It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null. Here the derived class, using RainfallPOJO: public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction<RainfallPOJO, RainfallPOJO> { public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){ super(num, den, time_data_name, start_time, end_time, interval, time_unit); } public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) throws Exception { DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS"); try { Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) ); if(time.after(this.startTime) && time.before(this.endTime)){ coll.collect(obj); } } catch(Exception e){ e.printStackTrace(); } } } My Base class is: public class CullTimeBase { protected int numerator; protected int denominator; protected String timeDataName; protected Date startTime; protected Date endTime; protected int interval; protected String timeUnit; public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){ numerator = num; denominator = den; timeDataName = time_data_name; interval = interv; timeUnit = time_unit; DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); try { startTime = formatter.parse(start_time); endTime = formatter.parse(end_time); } catch (ParseException e) { e.printStackTrace(); } } It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type. Thanks a lot, Giacomo |
Hi Giacomo,
I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example). public [abstract] class CullTimeBase<IN, OUT> implements FlatMapFunction<IN, OUT> { // ... } public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, RainFallPOJO> { // ... } This should work. Best, Martin On 16.09.2015 10:41, Giacomo Licari
wrote:
|
Hi! Interesting case. We use plain Java Serialization to distribute UDFs, and perform additional "cleaning" of scopes, which may be causing the issue. Can you try the following to see if any of those resolves the problem? 1) On the environment, disable the closure cleaner (in the execution config). 2) Let the CullTimeBase class implement java.io.Serializable. Please let us know how it turns out! Greetings, Stephan On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <[hidden email]> wrote:
|
Thank you Martin and Stephan for your help. I tried directly to implement java.io.Serializable in Base class and it worked perfectly! Now I can develop more flexible and maintainable code. Thank you a lot guys. Greetings, Giacomo On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <[hidden email]> wrote:
|
Could you also try the other variant (disabeling the closure cleaner)? I would be curious if this behavior is expected Java Serialization behavior, or whether our pre-processing code is causing it. Greetings, Stephan On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <[hidden email]> wrote:
|
I run it only implementing java.io.Serializable without disabling the closure cleaner. Another question I have is about POJO classes. I would also create a base POJO class with some common proprerties, and then extend it in new classes. These classes are used to convert a CSV into a dataset of POJO objects (of derived class type). In the following example, I create a DataSet of TwitterPOJO, which extends a Base class, adding the new property "tweet". DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter) .pojoType(TwitterPOJO.class, "table", "time", "tweet"); I obtain this error: [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.Flink.POJO.TwitterPOJO is not a valid POJO type Exception in thread "main" java.lang.ClassCastException: org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.PojoTypeInfo Greetings, G.L. On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <[hidden email]> wrote:
|
Hi Giacomo,
You should set your field as public. If you are set your field as private or protected, the class must provide getter and setter to be treated as POJO. Maybe the documentation in homepage [1] would be helpful. Regards, Chiwan Park [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos > On Sep 16, 2015, at 11:25 PM, Giacomo Licari <[hidden email]> wrote: > > I run it only implementing java.io.Serializable without disabling the closure cleaner. > > Another question I have is about POJO classes. > I would also create a base POJO class with some common proprerties, and then extend it in new classes. These classes are used to convert a CSV into a dataset of POJO objects (of derived class type). > > In the following example, I create a DataSet of TwitterPOJO, which extends a Base class, adding the new property "tweet". > > DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter) > .pojoType(TwitterPOJO.class, "table", "time", "tweet"); > > I obtain this error: > [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.Flink.POJO.TwitterPOJO is not a valid POJO type > Exception in thread "main" java.lang.ClassCastException: org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.PojoTypeInfo > > Greetings, > G.L. > > On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <[hidden email]> wrote: > Could you also try the other variant (disabeling the closure cleaner)? I would be curious if this behavior is expected Java Serialization behavior, or whether our pre-processing code is causing it. > > Greetings, > Stephan > > > On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <[hidden email]> wrote: > Thank you Martin and Stephan for your help. > I tried directly to implement java.io.Serializable in Base class and it worked perfectly! > > Now I can develop more flexible and maintainable code. Thank you a lot guys. > > Greetings, > Giacomo > > On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <[hidden email]> wrote: > Hi! > > Interesting case. We use plain Java Serialization to distribute UDFs, and perform additional "cleaning" of scopes, which may be causing the issue. > > Can you try the following to see if any of those resolves the problem? > > 1) On the environment, disable the closure cleaner (in the execution config). > > 2) Let the CullTimeBase class implement java.io.Serializable. > > Please let us know how it turns out! > > Greetings, > Stephan > > > On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <[hidden email]> wrote: > Hi Giacomo, > > I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example). > > public [abstract] class CullTimeBase<IN, OUT> implements FlatMapFunction<IN, OUT> { > // ... > } > > public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, RainFallPOJO> { > // ... > } > > This should work. > > Best, > Martin > > > On 16.09.2015 10:41, Giacomo Licari wrote: >> Hi guys, >> I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types. >> >> It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null. >> >> Here the derived class, using RainfallPOJO: >> >> public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction<RainfallPOJO, RainfallPOJO> { >> >> public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){ >> super(num, den, time_data_name, start_time, end_time, interval, time_unit); >> } >> >> public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) throws Exception { >> DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS"); >> try { >> Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) ); >> if(time.after(this.startTime) && time.before(this.endTime)){ >> coll.collect(obj); >> } >> } catch(Exception e){ >> e.printStackTrace(); >> } >> } >> >> } >> >> My Base class is: >> >> public class CullTimeBase { >> >> protected int numerator; >> protected int denominator; >> protected String timeDataName; >> protected Date startTime; >> protected Date endTime; >> protected int interval; >> protected String timeUnit; >> >> public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){ >> numerator = num; >> denominator = den; >> timeDataName = time_data_name; >> interval = interv; >> timeUnit = time_unit; >> DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); >> try { >> startTime = formatter.parse(start_time); >> endTime = formatter.parse(end_time); >> } catch (ParseException e) { >> e.printStackTrace(); >> } >> } >> >> It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type. >> >> Thanks a lot, >> Giacomo > > > > > |
Hi Chiwan, I followed instructions in documentation. I have a simple base class with some properties (all public). Then I extend that class with a new public property (tweet in my case), I provide also getter and setter for that property. Now when I execute: DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter) .pojoType(TwitterPOJO.class, "table", "time", "tweet"); I receive: There is no field called "table" in com.Flink.POJO.TwitterPOJO table is a field of the Base class, declared as public with also getter and setter. Thank you for your help. Giacomo On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park <[hidden email]> wrote: Hi Giacomo, |
Hi Giacomo,
Did you create constructors without arguments in both base class and derived class? If you do, it seems like a bug. Regards, Chiwan Park > On Sep 17, 2015, at 12:04 AM, Giacomo Licari <[hidden email]> wrote: > > Hi Chiwan, > I followed instructions in documentation. > I have a simple base class with some properties (all public). > Then I extend that class with a new public property (tweet in my case), I provide also getter and setter for that property. > > Now when I execute: > DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter) > .pojoType(TwitterPOJO.class, "table", "time", "tweet"); > > I receive: > There is no field called "table" in com.Flink.POJO.TwitterPOJO > > table is a field of the Base class, declared as public with also getter and setter. > > Thank you for your help. > > Giacomo > > On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park <[hidden email]> wrote: > Hi Giacomo, > > You should set your field as public. If you are set your field as private or protected, the class must provide getter and setter to be treated as POJO. > > Maybe the documentation in homepage [1] would be helpful. > > Regards, > Chiwan Park > > [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos > > > On Sep 16, 2015, at 11:25 PM, Giacomo Licari <[hidden email]> wrote: > > > > I run it only implementing java.io.Serializable without disabling the closure cleaner. > > > > Another question I have is about POJO classes. > > I would also create a base POJO class with some common proprerties, and then extend it in new classes. These classes are used to convert a CSV into a dataset of POJO objects (of derived class type). > > > > In the following example, I create a DataSet of TwitterPOJO, which extends a Base class, adding the new property "tweet". > > > > DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter) > > .pojoType(TwitterPOJO.class, "table", "time", "tweet"); > > > > I obtain this error: > > [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.Flink.POJO.TwitterPOJO is not a valid POJO type > > Exception in thread "main" java.lang.ClassCastException: org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.PojoTypeInfo > > > > Greetings, > > G.L. > > > > On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <[hidden email]> wrote: > > Could you also try the other variant (disabeling the closure cleaner)? I would be curious if this behavior is expected Java Serialization behavior, or whether our pre-processing code is causing it. > > > > Greetings, > > Stephan > > > > > > On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <[hidden email]> wrote: > > Thank you Martin and Stephan for your help. > > I tried directly to implement java.io.Serializable in Base class and it worked perfectly! > > > > Now I can develop more flexible and maintainable code. Thank you a lot guys. > > > > Greetings, > > Giacomo > > > > On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <[hidden email]> wrote: > > Hi! > > > > Interesting case. We use plain Java Serialization to distribute UDFs, and perform additional "cleaning" of scopes, which may be causing the issue. > > > > Can you try the following to see if any of those resolves the problem? > > > > 1) On the environment, disable the closure cleaner (in the execution config). > > > > 2) Let the CullTimeBase class implement java.io.Serializable. > > > > Please let us know how it turns out! > > > > Greetings, > > Stephan > > > > > > On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <[hidden email]> wrote: > > Hi Giacomo, > > > > I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example). > > > > public [abstract] class CullTimeBase<IN, OUT> implements FlatMapFunction<IN, OUT> { > > // ... > > } > > > > public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, RainFallPOJO> { > > // ... > > } > > > > This should work. > > > > Best, > > Martin > > > > > > On 16.09.2015 10:41, Giacomo Licari wrote: > >> Hi guys, > >> I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types. > >> > >> It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null. > >> > >> Here the derived class, using RainfallPOJO: > >> > >> public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction<RainfallPOJO, RainfallPOJO> { > >> > >> public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){ > >> super(num, den, time_data_name, start_time, end_time, interval, time_unit); > >> } > >> > >> public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) throws Exception { > >> DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS"); > >> try { > >> Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) ); > >> if(time.after(this.startTime) && time.before(this.endTime)){ > >> coll.collect(obj); > >> } > >> } catch(Exception e){ > >> e.printStackTrace(); > >> } > >> } > >> > >> } > >> > >> My Base class is: > >> > >> public class CullTimeBase { > >> > >> protected int numerator; > >> protected int denominator; > >> protected String timeDataName; > >> protected Date startTime; > >> protected Date endTime; > >> protected int interval; > >> protected String timeUnit; > >> > >> public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){ > >> numerator = num; > >> denominator = den; > >> timeDataName = time_data_name; > >> interval = interv; > >> timeUnit = time_unit; > >> DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); > >> try { > >> startTime = formatter.parse(start_time); > >> endTime = formatter.parse(end_time); > >> } catch (ParseException e) { > >> e.printStackTrace(); > >> } > >> } > >> > >> It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type. > >> > >> Thanks a lot, > >> Giacomo > > > > > > > > > > > > > > |
Yes I did. if anyone has a bypass solution, let us know. Regards, Giacomo Licari On Wed, Sep 16, 2015 at 5:15 PM, Chiwan Park <[hidden email]> wrote: Hi Giacomo, |
It seems like a bug of CsvInputFormat. I succeed in reproducing in my local machine.
I will create a JIRA issue for this and submit a patch to fix it. Which version of Flink are used? Regards, Chiwan Park > On Sep 17, 2015, at 12:20 AM, Giacomo Licari <[hidden email]> wrote: > > Yes I did. > > if anyone has a bypass solution, let us know. > > Regards, > Giacomo Licari > > On Wed, Sep 16, 2015 at 5:15 PM, Chiwan Park <[hidden email]> wrote: > Hi Giacomo, > > Did you create constructors without arguments in both base class and derived class? > If you do, it seems like a bug. > > Regards, > Chiwan Park > > > On Sep 17, 2015, at 12:04 AM, Giacomo Licari <[hidden email]> wrote: > > > > Hi Chiwan, > > I followed instructions in documentation. > > I have a simple base class with some properties (all public). > > Then I extend that class with a new public property (tweet in my case), I provide also getter and setter for that property. > > > > Now when I execute: > > DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter) > > .pojoType(TwitterPOJO.class, "table", "time", "tweet"); > > > > I receive: > > There is no field called "table" in com.Flink.POJO.TwitterPOJO > > > > table is a field of the Base class, declared as public with also getter and setter. > > > > Thank you for your help. > > > > Giacomo > > > > On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park <[hidden email]> wrote: > > Hi Giacomo, > > > > You should set your field as public. If you are set your field as private or protected, the class must provide getter and setter to be treated as POJO. > > > > Maybe the documentation in homepage [1] would be helpful. > > > > Regards, > > Chiwan Park > > > > [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos > > > > > On Sep 16, 2015, at 11:25 PM, Giacomo Licari <[hidden email]> wrote: > > > > > > I run it only implementing java.io.Serializable without disabling the closure cleaner. > > > > > > Another question I have is about POJO classes. > > > I would also create a base POJO class with some common proprerties, and then extend it in new classes. These classes are used to convert a CSV into a dataset of POJO objects (of derived class type). > > > > > > In the following example, I create a DataSet of TwitterPOJO, which extends a Base class, adding the new property "tweet". > > > > > > DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter) > > > .pojoType(TwitterPOJO.class, "table", "time", "tweet"); > > > > > > I obtain this error: > > > [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.Flink.POJO.TwitterPOJO is not a valid POJO type > > > Exception in thread "main" java.lang.ClassCastException: org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.PojoTypeInfo > > > > > > Greetings, > > > G.L. > > > > > > On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <[hidden email]> wrote: > > > Could you also try the other variant (disabeling the closure cleaner)? I would be curious if this behavior is expected Java Serialization behavior, or whether our pre-processing code is causing it. > > > > > > Greetings, > > > Stephan > > > > > > > > > On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <[hidden email]> wrote: > > > Thank you Martin and Stephan for your help. > > > I tried directly to implement java.io.Serializable in Base class and it worked perfectly! > > > > > > Now I can develop more flexible and maintainable code. Thank you a lot guys. > > > > > > Greetings, > > > Giacomo > > > > > > On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <[hidden email]> wrote: > > > Hi! > > > > > > Interesting case. We use plain Java Serialization to distribute UDFs, and perform additional "cleaning" of scopes, which may be causing the issue. > > > > > > Can you try the following to see if any of those resolves the problem? > > > > > > 1) On the environment, disable the closure cleaner (in the execution config). > > > > > > 2) Let the CullTimeBase class implement java.io.Serializable. > > > > > > Please let us know how it turns out! > > > > > > Greetings, > > > Stephan > > > > > > > > > On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <[hidden email]> wrote: > > > Hi Giacomo, > > > > > > I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example). > > > > > > public [abstract] class CullTimeBase<IN, OUT> implements FlatMapFunction<IN, OUT> { > > > // ... > > > } > > > > > > public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, RainFallPOJO> { > > > // ... > > > } > > > > > > This should work. > > > > > > Best, > > > Martin > > > > > > > > > On 16.09.2015 10:41, Giacomo Licari wrote: > > >> Hi guys, > > >> I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types. > > >> > > >> It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null. > > >> > > >> Here the derived class, using RainfallPOJO: > > >> > > >> public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction<RainfallPOJO, RainfallPOJO> { > > >> > > >> public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){ > > >> super(num, den, time_data_name, start_time, end_time, interval, time_unit); > > >> } > > >> > > >> public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) throws Exception { > > >> DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS"); > > >> try { > > >> Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) ); > > >> if(time.after(this.startTime) && time.before(this.endTime)){ > > >> coll.collect(obj); > > >> } > > >> } catch(Exception e){ > > >> e.printStackTrace(); > > >> } > > >> } > > >> > > >> } > > >> > > >> My Base class is: > > >> > > >> public class CullTimeBase { > > >> > > >> protected int numerator; > > >> protected int denominator; > > >> protected String timeDataName; > > >> protected Date startTime; > > >> protected Date endTime; > > >> protected int interval; > > >> protected String timeUnit; > > >> > > >> public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){ > > >> numerator = num; > > >> denominator = den; > > >> timeDataName = time_data_name; > > >> interval = interv; > > >> timeUnit = time_unit; > > >> DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); > > >> try { > > >> startTime = formatter.parse(start_time); > > >> endTime = formatter.parse(end_time); > > >> } catch (ParseException e) { > > >> e.printStackTrace(); > > >> } > > >> } > > >> > > >> It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type. > > >> > > >> Thanks a lot, > > >> Giacomo > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
I created a JIRA issue [1]. After FLINK-2637 [2][3] is resolved, I’ll submit a patch to solve this.
Currently, there is no way to use derived class with CSV input. Thank you for reporting. [1] https://issues.apache.org/jira/browse/FLINK-2690 [2] https://issues.apache.org/jira/browse/FLINK-2637 [3] https://github.com/apache/flink/pull/1134 Regards, Chiwan Park > On Sep 17, 2015, at 1:33 AM, Chiwan Park <[hidden email]> wrote: > > It seems like a bug of CsvInputFormat. I succeed in reproducing in my local machine. > I will create a JIRA issue for this and submit a patch to fix it. > > Which version of Flink are used? > > Regards, > Chiwan Park > >> On Sep 17, 2015, at 12:20 AM, Giacomo Licari <[hidden email]> wrote: >> >> Yes I did. >> >> if anyone has a bypass solution, let us know. >> >> Regards, >> Giacomo Licari >> >> On Wed, Sep 16, 2015 at 5:15 PM, Chiwan Park <[hidden email]> wrote: >> Hi Giacomo, >> >> Did you create constructors without arguments in both base class and derived class? >> If you do, it seems like a bug. >> >> Regards, >> Chiwan Park >> >>> On Sep 17, 2015, at 12:04 AM, Giacomo Licari <[hidden email]> wrote: >>> >>> Hi Chiwan, >>> I followed instructions in documentation. >>> I have a simple base class with some properties (all public). >>> Then I extend that class with a new public property (tweet in my case), I provide also getter and setter for that property. >>> >>> Now when I execute: >>> DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter) >>> .pojoType(TwitterPOJO.class, "table", "time", "tweet"); >>> >>> I receive: >>> There is no field called "table" in com.Flink.POJO.TwitterPOJO >>> >>> table is a field of the Base class, declared as public with also getter and setter. >>> >>> Thank you for your help. >>> >>> Giacomo >>> >>> On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park <[hidden email]> wrote: >>> Hi Giacomo, >>> >>> You should set your field as public. If you are set your field as private or protected, the class must provide getter and setter to be treated as POJO. >>> >>> Maybe the documentation in homepage [1] would be helpful. >>> >>> Regards, >>> Chiwan Park >>> >>> [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos >>> >>>> On Sep 16, 2015, at 11:25 PM, Giacomo Licari <[hidden email]> wrote: >>>> >>>> I run it only implementing java.io.Serializable without disabling the closure cleaner. >>>> >>>> Another question I have is about POJO classes. >>>> I would also create a base POJO class with some common proprerties, and then extend it in new classes. These classes are used to convert a CSV into a dataset of POJO objects (of derived class type). >>>> >>>> In the following example, I create a DataSet of TwitterPOJO, which extends a Base class, adding the new property "tweet". >>>> >>>> DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter) >>>> .pojoType(TwitterPOJO.class, "table", "time", "tweet"); >>>> >>>> I obtain this error: >>>> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.Flink.POJO.TwitterPOJO is not a valid POJO type >>>> Exception in thread "main" java.lang.ClassCastException: org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.PojoTypeInfo >>>> >>>> Greetings, >>>> G.L. >>>> >>>> On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <[hidden email]> wrote: >>>> Could you also try the other variant (disabeling the closure cleaner)? I would be curious if this behavior is expected Java Serialization behavior, or whether our pre-processing code is causing it. >>>> >>>> Greetings, >>>> Stephan >>>> >>>> >>>> On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <[hidden email]> wrote: >>>> Thank you Martin and Stephan for your help. >>>> I tried directly to implement java.io.Serializable in Base class and it worked perfectly! >>>> >>>> Now I can develop more flexible and maintainable code. Thank you a lot guys. >>>> >>>> Greetings, >>>> Giacomo >>>> >>>> On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <[hidden email]> wrote: >>>> Hi! >>>> >>>> Interesting case. We use plain Java Serialization to distribute UDFs, and perform additional "cleaning" of scopes, which may be causing the issue. >>>> >>>> Can you try the following to see if any of those resolves the problem? >>>> >>>> 1) On the environment, disable the closure cleaner (in the execution config). >>>> >>>> 2) Let the CullTimeBase class implement java.io.Serializable. >>>> >>>> Please let us know how it turns out! >>>> >>>> Greetings, >>>> Stephan >>>> >>>> >>>> On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <[hidden email]> wrote: >>>> Hi Giacomo, >>>> >>>> I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example). >>>> >>>> public [abstract] class CullTimeBase<IN, OUT> implements FlatMapFunction<IN, OUT> { >>>> // ... >>>> } >>>> >>>> public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, RainFallPOJO> { >>>> // ... >>>> } >>>> >>>> This should work. >>>> >>>> Best, >>>> Martin >>>> >>>> >>>> On 16.09.2015 10:41, Giacomo Licari wrote: >>>>> Hi guys, >>>>> I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types. >>>>> >>>>> It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null. >>>>> >>>>> Here the derived class, using RainfallPOJO: >>>>> >>>>> public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction<RainfallPOJO, RainfallPOJO> { >>>>> >>>>> public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){ >>>>> super(num, den, time_data_name, start_time, end_time, interval, time_unit); >>>>> } >>>>> >>>>> public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) throws Exception { >>>>> DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS"); >>>>> try { >>>>> Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) ); >>>>> if(time.after(this.startTime) && time.before(this.endTime)){ >>>>> coll.collect(obj); >>>>> } >>>>> } catch(Exception e){ >>>>> e.printStackTrace(); >>>>> } >>>>> } >>>>> >>>>> } >>>>> >>>>> My Base class is: >>>>> >>>>> public class CullTimeBase { >>>>> >>>>> protected int numerator; >>>>> protected int denominator; >>>>> protected String timeDataName; >>>>> protected Date startTime; >>>>> protected Date endTime; >>>>> protected int interval; >>>>> protected String timeUnit; >>>>> >>>>> public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){ >>>>> numerator = num; >>>>> denominator = den; >>>>> timeDataName = time_data_name; >>>>> interval = interv; >>>>> timeUnit = time_unit; >>>>> DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); >>>>> try { >>>>> startTime = formatter.parse(start_time); >>>>> endTime = formatter.parse(end_time); >>>>> } catch (ParseException e) { >>>>> e.printStackTrace(); >>>>> } >>>>> } >>>>> >>>>> It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type. >>>>> >>>>> Thanks a lot, >>>>> Giacomo >>>> >>>> >>>> >>>> >>>> >>> >>> >>> >>> >> >> >> >> >> > > > |
Hi Chiwan, Cheers, I created a JIRA issue [1]. After FLINK-2637 [2][3] is resolved, I’ll submit a patch to solve this.
Currently, there is no way to use derived class with CSV input. Thank you for reporting. [1] https://issues.apache.org/jira/browse/FLINK-2690 [2] https://issues.apache.org/jira/browse/FLINK-2637 [3] https://github.com/apache/flink/pull/1134 Regards, Chiwan Park > On Sep 17, 2015, at 1:33 AM, Chiwan Park <[hidden email]> wrote: > > It seems like a bug of CsvInputFormat. I succeed in reproducing in my local machine. > I will create a JIRA issue for this and submit a patch to fix it. > > Which version of Flink are used? > > Regards, > Chiwan Park > >> On Sep 17, 2015, at 12:20 AM, Giacomo Licari <[hidden email]> wrote: >> >> Yes I did. >> >> if anyone has a bypass solution, let us know. >> >> Regards, >> Giacomo Licari >> >> On Wed, Sep 16, 2015 at 5:15 PM, Chiwan Park <[hidden email]> wrote: >> Hi Giacomo, >> >> Did you create constructors without arguments in both base class and derived class? >> If you do, it seems like a bug. >> >> Regards, >> Chiwan Park >> >>> On Sep 17, 2015, at 12:04 AM, Giacomo Licari <[hidden email]> wrote: >>> >>> Hi Chiwan, >>> I followed instructions in documentation. >>> I have a simple base class with some properties (all public). >>> Then I extend that class with a new public property (tweet in my case), I provide also getter and setter for that property. >>> >>> Now when I execute: >>> DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter) >>> .pojoType(TwitterPOJO.class, "table", "time", "tweet"); >>> >>> I receive: >>> There is no field called "table" in com.Flink.POJO.TwitterPOJO >>> >>> table is a field of the Base class, declared as public with also getter and setter. >>> >>> Thank you for your help. >>> >>> Giacomo >>> >>> On Wed, Sep 16, 2015 at 4:42 PM, Chiwan Park <[hidden email]> wrote: >>> Hi Giacomo, >>> >>> You should set your field as public. If you are set your field as private or protected, the class must provide getter and setter to be treated as POJO. >>> >>> Maybe the documentation in homepage [1] would be helpful. >>> >>> Regards, >>> Chiwan Park >>> >>> [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#pojos >>> >>>> On Sep 16, 2015, at 11:25 PM, Giacomo Licari <[hidden email]> wrote: >>>> >>>> I run it only implementing java.io.Serializable without disabling the closure cleaner. >>>> >>>> Another question I have is about POJO classes. >>>> I would also create a base POJO class with some common proprerties, and then extend it in new classes. These classes are used to convert a CSV into a dataset of POJO objects (of derived class type). >>>> >>>> In the following example, I create a DataSet of TwitterPOJO, which extends a Base class, adding the new property "tweet". >>>> >>>> DataSet<TwitterPOJO> ds_twitter = env.readCsvFile("file://"+path_twitter) >>>> .pojoType(TwitterPOJO.class, "table", "time", "tweet"); >>>> >>>> I obtain this error: >>>> [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.Flink.POJO.TwitterPOJO is not a valid POJO type >>>> Exception in thread "main" java.lang.ClassCastException: org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.PojoTypeInfo >>>> >>>> Greetings, >>>> G.L. >>>> >>>> On Wed, Sep 16, 2015 at 4:05 PM, Stephan Ewen <[hidden email]> wrote: >>>> Could you also try the other variant (disabeling the closure cleaner)? I would be curious if this behavior is expected Java Serialization behavior, or whether our pre-processing code is causing it. >>>> >>>> Greetings, >>>> Stephan >>>> >>>> >>>> On Wed, Sep 16, 2015 at 3:38 PM, Giacomo Licari <[hidden email]> wrote: >>>> Thank you Martin and Stephan for your help. >>>> I tried directly to implement java.io.Serializable in Base class and it worked perfectly! >>>> >>>> Now I can develop more flexible and maintainable code. Thank you a lot guys. >>>> >>>> Greetings, >>>> Giacomo >>>> >>>> On Wed, Sep 16, 2015 at 1:46 PM, Stephan Ewen <[hidden email]> wrote: >>>> Hi! >>>> >>>> Interesting case. We use plain Java Serialization to distribute UDFs, and perform additional "cleaning" of scopes, which may be causing the issue. >>>> >>>> Can you try the following to see if any of those resolves the problem? >>>> >>>> 1) On the environment, disable the closure cleaner (in the execution config). >>>> >>>> 2) Let the CullTimeBase class implement java.io.Serializable. >>>> >>>> Please let us know how it turns out! >>>> >>>> Greetings, >>>> Stephan >>>> >>>> >>>> On Wed, Sep 16, 2015 at 1:29 PM, Martin Junghanns <[hidden email]> wrote: >>>> Hi Giacomo, >>>> >>>> I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example). >>>> >>>> public [abstract] class CullTimeBase<IN, OUT> implements FlatMapFunction<IN, OUT> { >>>> // ... >>>> } >>>> >>>> public class CullTimeRainFall extends CullTimeBase<RainFallPOJO, RainFallPOJO> { >>>> // ... >>>> } >>>> >>>> This should work. >>>> >>>> Best, >>>> Martin >>>> >>>> >>>> On 16.09.2015 10:41, Giacomo Licari wrote: >>>>> Hi guys, >>>>> I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types. >>>>> >>>>> It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null. >>>>> >>>>> Here the derived class, using RainfallPOJO: >>>>> >>>>> public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction<RainfallPOJO, RainfallPOJO> { >>>>> >>>>> public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){ >>>>> super(num, den, time_data_name, start_time, end_time, interval, time_unit); >>>>> } >>>>> >>>>> public void flatMap(RainfallPOJO obj, Collector<RainfallPOJO> coll) throws Exception { >>>>> DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSS"); >>>>> try { >>>>> Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) ); >>>>> if(time.after(this.startTime) && time.before(this.endTime)){ >>>>> coll.collect(obj); >>>>> } >>>>> } catch(Exception e){ >>>>> e.printStackTrace(); >>>>> } >>>>> } >>>>> >>>>> } >>>>> >>>>> My Base class is: >>>>> >>>>> public class CullTimeBase { >>>>> >>>>> protected int numerator; >>>>> protected int denominator; >>>>> protected String timeDataName; >>>>> protected Date startTime; >>>>> protected Date endTime; >>>>> protected int interval; >>>>> protected String timeUnit; >>>>> >>>>> public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){ >>>>> numerator = num; >>>>> denominator = den; >>>>> timeDataName = time_data_name; >>>>> interval = interv; >>>>> timeUnit = time_unit; >>>>> DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); >>>>> try { >>>>> startTime = formatter.parse(start_time); >>>>> endTime = formatter.parse(end_time); >>>>> } catch (ParseException e) { >>>>> e.printStackTrace(); >>>>> } >>>>> } >>>>> >>>>> It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type. >>>>> >>>>> Thanks a lot, >>>>> Giacomo >>>> >>>> >>>> >>>> >>>> >>> >>> >>> >>> >> >> >> >> >> > > > |
Free forum by Nabble | Edit this page |