Flink and factories?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink and factories?

Sebastian Neef
Hi,

I'm currently working with flink for my bachelor thesis and I'm running
into some odd issues with flink in regards to factories.

I've built a small "proof of concept" and the code can be found here:
https://gitlab.tubit.tu-berlin.de/gehaxelt/flink-factorytest

The idea is that a Config-singleton holds information or objects to use,
e.g. an AppleFactory (default) which implements a specific IDataFactory
interface. This AppleFactory is then used in a flatMap to create Apples
(objects which implement the IData interface):

>
>         System.out.println("Factory before processedData: " + Config.getInstance().getFactory().getClass());
>         DataSet<IData> processedData = this.getEnv().fromCollection(inputData).flatMap(new FlatMapFunction<Integer, IData>() {
>             @Override
>             public void flatMap(Integer integer, Collector<IData> collector) throws Exception {
>                 if (integer % 2 == 0) {
>                     collector.collect(Config.getInstance().getFactory().newInstance());
>                 }
>             }
>         });
>        System.out.println("Factory after processedData: " + Config.getInstance().getFactory().getClass());
>         try {
>             System.out.println("Class created: " + processedData.collect().get(0).getClass());
>             this.getDataHolder().setDataList(processedData.collect());
>         } catch (Exception e) {
>             e.printStackTrace();
>         }


This happens in the "Config -> initData()" function. My Flink-Job looks
like this:

> public static void main(String[] args) throws Exception {
>
> Config c = Config.getInstance(); //Use AppleFactory by default
>
>         //BOOM: Somehow flink ignores this?
>         c.setFactory(new PearFactory());
>
>         c.initData();
>
>         DataSet<IData> data = c.getEnv().fromCollection(c.getDataHolder().getDataList());

As you can see before the "c.initData()" call I set the factory to a
"PearFactory()" which will produce Pear-objects (also implementing the
IData interface).

Running the code will print the following text:

> Class created: class factorytest.Data.Apple

This, however, means that flink didn't catch (or ignored?) that the
factory has changed and still creates objects of type Apple.

Instead I'd expect the processedData.collect() list to contain
Pear-objects. What is even more confusing is that the two "Factory
before/after processedData" print statements correctly return the
PearFactory class.

What's the best way to fix this? Any tips/tricks/questions?

I guess that this issue is might be hard to explain in words, so I'd
really appreciate it if someone could have a look at the code and maybe
do an example run:

Job.java:
https://gitlab.tubit.tu-berlin.de/gehaxelt/flink-factorytest/blob/master/src/main/java/factorytest/Job.java

Config.java:
https://gitlab.tubit.tu-berlin.de/gehaxelt/flink-factorytest/blob/master/src/main/java/factorytest/Config.java

Example run:
https://gitlab.tubit.tu-berlin.de/gehaxelt/flink-factorytest/blob/master/EXAMPLE_RUN_OUTPUT.txt

Kind regards,
Sebastian Neef
Reply | Threaded
Open this post in threaded view
|

Re: Flink and factories?

Chesnay Schepler
Hello,

admittedly i didn't look to deeply into this, but I would assume that
you are only modifying the factory on the client. When the operators are
deserialized on a cluster, your singleton instance is back to the
default, which is apples (i think), since the statement that changes the
factory is never being executed there.

Basically, don't use static singletons. Instead, store the factory
returned by Config.getInstance().getFactory() in a field in your function.

Regards,
Chesnay

On 19.10.2016 18:07, Sebastian Neef wrote:

> Hi,
>
> I'm currently working with flink for my bachelor thesis and I'm running
> into some odd issues with flink in regards to factories.
>
> I've built a small "proof of concept" and the code can be found here:
> https://gitlab.tubit.tu-berlin.de/gehaxelt/flink-factorytest
>
> The idea is that a Config-singleton holds information or objects to use,
> e.g. an AppleFactory (default) which implements a specific IDataFactory
> interface. This AppleFactory is then used in a flatMap to create Apples
> (objects which implement the IData interface):
>
>>          System.out.println("Factory before processedData: " + Config.getInstance().getFactory().getClass());
>>          DataSet<IData> processedData = this.getEnv().fromCollection(inputData).flatMap(new FlatMapFunction<Integer, IData>() {
>>              @Override
>>              public void flatMap(Integer integer, Collector<IData> collector) throws Exception {
>>                  if (integer % 2 == 0) {
>>                      collector.collect(Config.getInstance().getFactory().newInstance());
>>                  }
>>              }
>>          });
>>         System.out.println("Factory after processedData: " + Config.getInstance().getFactory().getClass());
>>          try {
>>              System.out.println("Class created: " + processedData.collect().get(0).getClass());
>>              this.getDataHolder().setDataList(processedData.collect());
>>          } catch (Exception e) {
>>              e.printStackTrace();
>>          }
>
> This happens in the "Config -> initData()" function. My Flink-Job looks
> like this:
>
>> public static void main(String[] args) throws Exception {
>>
>> Config c = Config.getInstance(); //Use AppleFactory by default
>>
>>          //BOOM: Somehow flink ignores this?
>>          c.setFactory(new PearFactory());
>>
>>          c.initData();
>>
>>          DataSet<IData> data = c.getEnv().fromCollection(c.getDataHolder().getDataList());
> As you can see before the "c.initData()" call I set the factory to a
> "PearFactory()" which will produce Pear-objects (also implementing the
> IData interface).
>
> Running the code will print the following text:
>
>> Class created: class factorytest.Data.Apple
> This, however, means that flink didn't catch (or ignored?) that the
> factory has changed and still creates objects of type Apple.
>
> Instead I'd expect the processedData.collect() list to contain
> Pear-objects. What is even more confusing is that the two "Factory
> before/after processedData" print statements correctly return the
> PearFactory class.
>
> What's the best way to fix this? Any tips/tricks/questions?
>
> I guess that this issue is might be hard to explain in words, so I'd
> really appreciate it if someone could have a look at the code and maybe
> do an example run:
>
> Job.java:
> https://gitlab.tubit.tu-berlin.de/gehaxelt/flink-factorytest/blob/master/src/main/java/factorytest/Job.java
>
> Config.java:
> https://gitlab.tubit.tu-berlin.de/gehaxelt/flink-factorytest/blob/master/src/main/java/factorytest/Config.java
>
> Example run:
> https://gitlab.tubit.tu-berlin.de/gehaxelt/flink-factorytest/blob/master/EXAMPLE_RUN_OUTPUT.txt
>
> Kind regards,
> Sebastian Neef
>

Reply | Threaded
Open this post in threaded view
|

Re: Flink and factories?

Sebastian Neef
Hi Chesnay,

thank you for looking into this!

Is there any way to tell Flink to (re)sync the changed classes and/or
tell it to distribute the serialized classes at a given point (e.g.
first on a env.execute() ) or so?

The thing is, that I'm working on a small framework which bases on
flink, so passing a configuration object to all functions/classes would
be overkill, I guess.

Thanks again and kind regards,
Sebastian
Reply | Threaded
Open this post in threaded view
|

Re: Flink and factories?

Chesnay Schepler
The functions are serialized when env.execute() is being executed. The thing is, as i understand it, that your singleton is simply not part of the serialized function, so it doesn't actually matter when the function is serialized.

Storing the factory instance in the function shouldn't be too much work actually, the following code might do the trick already (changes in bold):
DataSet<IData> processedData = this.getEnv().fromCollection(inputData).flatMap(new FlatMapFunction<Integer, IData>() {
	private final <FactoryType> factory = Config.getInstance().getFactory();       
	@Override
	public void flatMap(Integer integer, Collector<IData> collector) throws Exception {
 		if (integer % 2 == 0) {
			collector.collect(factory.newInstance());
                }
        }
});
Regards,
Chesnay

On 19.10.2016 23:09, Sebastian Neef wrote:
Hi Chesnay,

thank you for looking into this!

Is there any way to tell Flink to (re)sync the changed classes and/or
tell it to distribute the serialized classes at a given point (e.g.
first on a env.execute() ) or so?

The thing is, that I'm working on a small framework which bases on
flink, so passing a configuration object to all functions/classes would
be overkill, I guess.

Thanks again and kind regards,
Sebastian


Reply | Threaded
Open this post in threaded view
|

Re: Flink and factories?

Sebastian Neef
Hi,

wow, oh that's indeed a nice solution.

Your version still threw some errors:

> Caused by: org.apache.flink.api.common.InvalidProgramException: Object factorytest.Config$1@5143c662 not serializable
> Caused by: java.io.NotSerializableException: factorytest.factory.PearFactory

I fixed this by adding "implements java.io.Serializable" to the
IDataFactory (and all other interfaces right away) - I hope that won't
backfire in the future.

Anyway, the problem seems solved. Yay and thank you!

Kind regards,
Sebastian