Different result on running Flink in local mode and Yarn cluster

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Different result on running Flink in local mode and Yarn cluster

Soheil Pourbafrani
I run a code using Flink Java API that gets some bytes from Kafka and parses it following by inserting into Cassandra database using another library static method (both parsing and inserting results is done by the library). Running code on local in IDE, I get the desired answer, but running on YARN cluster the parse method didn't work as expected!

public class Test {
    static HashMap<Integer, Object> ConfigHashMap = new HashMap<>();

    public static void main(String[] args) throws Exception {

        CassandraConnection.connect();
        Parser.setInsert(true);

        stream.flatMap(new FlatMapFunction<byte[], Void>() {
            @Override
            public void flatMap(byte[] value, Collector<Void> out) throws Exception {
                Parser.parse(ByteBuffer.wrap(value), ConfigHashMap);
                // Parser.parse(ByteBuffer.wrap(value));
            }
        });
        env.execute();
    }
}

There is a static HashMap field in the classParser that configuration of parsing data is based on its information, and data will insert it during the execution. The problem running on YARN was this data was not available for taskmanagers and they just print config is not available!

So I redefine that HashMap as a parameter for the methodparse, but no differences in results!

How can I fix the problem?


Reply | Threaded
Open this post in threaded view
|

Re: Different result on running Flink in local mode and Yarn cluster

Jörn Franke
The problem maybe that it is still static. How will the parser use this HashMap?

On 26. Apr 2018, at 06:42, Soheil Pourbafrani <[hidden email]> wrote:

I run a code using Flink Java API that gets some bytes from Kafka and parses it following by inserting into Cassandra database using another library static method (both parsing and inserting results is done by the library). Running code on local in IDE, I get the desired answer, but running on YARN cluster the parse method didn't work as expected!

public class Test {
    static HashMap<Integer, Object> ConfigHashMap = new HashMap<>();

    public static void main(String[] args) throws Exception {

        CassandraConnection.connect();
        Parser.setInsert(true);

        stream.flatMap(new FlatMapFunction<byte[], Void>() {
            @Override
            public void flatMap(byte[] value, Collector<Void> out) throws Exception {
                Parser.parse(ByteBuffer.wrap(value), ConfigHashMap);
                // Parser.parse(ByteBuffer.wrap(value));
            }
        });
        env.execute();
    }
}

There is a static HashMap field in the classParser that configuration of parsing data is based on its information, and data will insert it during the execution. The problem running on YARN was this data was not available for taskmanagers and they just print config is not available!

So I redefine that HashMap as a parameter for the methodparse, but no differences in results!

How can I fix the problem?


Reply | Threaded
Open this post in threaded view
|

Re: Different result on running Flink in local mode and Yarn cluster

Michael Latta
In reply to this post by Soheil Pourbafrani
Only the anonymous FlatMapFunction instance is sent to the TaskManager. Move the static field to that class. 

Michael

Sent from my iPad

On Apr 25, 2018, at 10:42 PM, Soheil Pourbafrani <[hidden email]> wrote:

I run a code using Flink Java API that gets some bytes from Kafka and parses it following by inserting into Cassandra database using another library static method (both parsing and inserting results is done by the library). Running code on local in IDE, I get the desired answer, but running on YARN cluster the parse method didn't work as expected!

public class Test {
    static HashMap<Integer, Object> ConfigHashMap = new HashMap<>();

    public static void main(String[] args) throws Exception {

        CassandraConnection.connect();
        Parser.setInsert(true);

        stream.flatMap(new FlatMapFunction<byte[], Void>() {
            @Override
            public void flatMap(byte[] value, Collector<Void> out) throws Exception {
                Parser.parse(ByteBuffer.wrap(value), ConfigHashMap);
                // Parser.parse(ByteBuffer.wrap(value));
            }
        });
        env.execute();
    }
}

There is a static HashMap field in the classParser that configuration of parsing data is based on its information, and data will insert it during the execution. The problem running on YARN was this data was not available for taskmanagers and they just print config is not available!

So I redefine that HashMap as a parameter for the methodparse, but no differences in results!

How can I fix the problem?


Reply | Threaded
Open this post in threaded view
|

Re: Different result on running Flink in local mode and Yarn cluster

Michael Latta
Not knowing the library or the config needs I do not have a suggestion. If the config is accumulated from inputs and needs to see all inputs I would suggest setting parallelism to 1 as an experiment, but it would need a redesign to run in parallel. 

Michael

Sent from my iPad

On Apr 26, 2018, at 12:50 AM, Soheil Pourbafrani <[hidden email]> wrote:

Thanks, So what is your suggestion to solve the problem? Is it possible to use Broadcast Variables for this senario?

On Thu, Apr 26, 2018 at 10:57 AM, TechnoMage <[hidden email]> wrote:
What parallelism are you using?  If it is > 1 you can not rely on the config value being passed to each of the parsing functions as they are running on separate threads or even separate machines.

Michael


On Apr 26, 2018, at 12:24 AM, Soheil Pourbafrani <[hidden email]> wrote:

As I said at first version of the code I didn't pass any argument to the parse function and the HashMap was static among the Parser class, but it didn't get the desired anwser and I test giving the HashMap as an argument for parse method, but still not getting desired answers! The code is something like the following:

public class Test {

    public static void main(String[] args) throws Exception {

        CassandraConnection.connect();
        Parser.setInsert(true);

        stream.flatMap(new FlatMapFunction<byte[], Void>() {
            @Override
            public void flatMap(byte[] value, Collector<Void> out) throws Exception {
                // Parser.parse(ByteBuffer.wrap(value));
            }
        });
        env.execute();
    }
}




I summary I want the HashMap to be shared among the taskmanagers.