when use broadcast variable and run on bigdata display this error please help

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

when use broadcast variable and run on bigdata display this error please help

hagersaleh
when run this program in big data display this error but when run on small data not display error why


ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Customer> customer = getCustomerDataSet(env,mask,l,map);

DataSet<Orders> order= getOrdersDataSet(env,maskorder,l1,maporder);
customer.filter(new RichFilterFunction<Customer>() {
private Collection<Orders>  order1;

@Override
public void open(Configuration parameters) throws Exception {
order1 = getRuntimeContext().getBroadcastVariable("order");

}

@Override
public boolean filter(Customer c) throws Exception {
for(Orders o: order1){
//   System.out.println("c.f0="+c.f0+"    o.f0="+o.f0+"    "+c.f0.equals(o.f0));
if(((c.f0.equals(o.f1)) && (c.f1.equals("AUTOMOBILE"))) && ((o.f2.equals("O")) || (o.f0==7)))
          return true;
}

return false;
}
}).withBroadcastSet(order,"order").writeAsCsv("/home/hadoop/Desktop/Dataset/complex_query_optimization.csv","\n","|", WriteMode.OVERWRITE);

env.execute();

Error
08/19/2015 07:49:23    Job execution switched to status RUNNING.
08/19/2015 07:49:23    DataSource (at getOrdersDataSet(TPCHQuery3.java:319) (org.apache.flink.api.java.io.CsvInputFormat))(1/1) switched to SCHEDULED
08/19/2015 07:49:23    DataSource (at getOrdersDataSet(TPCHQuery3.java:319) (org.apache.flink.api.java.io.CsvInputFormat))(1/1) switched to DEPLOYING
08/19/2015 07:49:23    DataSource (at getCustomerDataSet(TPCHQuery3.java:282) (org.apache.flink.api.java.io.CsvInputFormat))(1/1) switched to SCHEDULED
08/19/2015 07:49:23    DataSource (at getCustomerDataSet(TPCHQuery3.java:282) (org.apache.flink.api.java.io.CsvInputFormat))(1/1) switched to DEPLOYING
08/19/2015 07:49:23    DataSource (at getCustomerDataSet(TPCHQuery3.java:282) (org.apache.flink.api.java.io.CsvInputFormat))(1/1) switched to RUNNING
08/19/2015 07:49:23    DataSource (at getOrdersDataSet(TPCHQuery3.java:319) (org.apache.flink.api.java.io.CsvInputFormat))(1/1) switched to RUNNING
08/19/2015 07:49:23    Filter (Filter at main(TPCHQuery3.java:240))(1/1) switched to SCHEDULED
08/19/2015 07:49:23    Filter (Filter at main(TPCHQuery3.java:240))(1/1) switched to DEPLOYING
08/19/2015 07:49:23    Filter (Filter at main(TPCHQuery3.java:240))(1/1) switched to RUNNING
08/19/2015 07:50:04    Filter (Filter at main(TPCHQuery3.java:240))(1/1) switched to FAILED
java.io.IOException: Materialization of the broadcast variable failed.
    at org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization.materializeVariable(BroadcastVariableMaterialization.java:154)
    at org.apache.flink.runtime.broadcast.BroadcastVariableManager.materializeBroadcastVariable(BroadcastVariableManager.java:50)
    at org.apache.flink.runtime.operators.RegularPactTask.readAndSetBroadcastInput(RegularPactTask.java:439)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:358)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:724)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.lang.Long.valueOf(Long.java:577)
    at org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:68)
    at org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:27)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110)
    at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:64)
    at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
    at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
    at org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization.materializeVariable(BroadcastVariableMaterialization.java:115)
    ... 5 more

08/19/2015 07:50:04    Job execution switched to status FAILING.
08/19/2015 07:50:04    DataSource (at getCustomerDataSet(TPCHQuery3.java:282) (org.apache.flink.api.java.io.CsvInputFormat))(1/1) switched to CANCELING
08/19/2015 07:50:04    DataSource (at getOrdersDataSet(TPCHQuery3.java:319) (org.apache.flink.api.java.io.CsvInputFormat))(1/1) switched to CANCELING
08/19/2015 07:50:04    DataSink (CsvOutputFormat (path: /home/hadoop/Desktop/Dataset/complex_query_optimization.csv, delimiter: |))(1/1) switched to CANCELED
08/19/2015 07:50:04    DataSource (at getOrdersDataSet(TPCHQuery3.java:319) (org.apache.flink.api.java.io.CsvInputFormat))(1/1) switched to CANCELED
08/19/2015 07:50:04    DataSource (at getCustomerDataSet(TPCHQuery3.java:282) (org.apache.flink.api.java.io.CsvInputFormat))(1/1) switched to CANCELED
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
    at org.apache.flink.client.program.Client.run(Client.java:413)
    at org.apache.flink.client.program.Client.run(Client.java:356)
    at org.apache.flink.client.program.Client.run(Client.java:349)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
    at org.apache.flink.examples.java.relational.TPCHQuery3.main(TPCHQuery3.java:261)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Materialization of the broadcast variable failed.
    at org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization.materializeVariable(BroadcastVariableMaterialization.java:154)
    at org.apache.flink.runtime.broadcast.BroadcastVariableManager.materializeBroadcastVariable(BroadcastVariableManager.java:50)
    at org.apache.flink.runtime.operators.RegularPactTask.readAndSetBroadcastInput(RegularPactTask.java:439)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPact

Task.java:358)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:724)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.lang.Long.valueOf(Long.java:577)
    at org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:68)
    at org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:27)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110)
    at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:64)
    at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
    at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
    at org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization.materializeVariable(BroadcastVariableMaterialization.java:115)
    ... 5 more
Reply | Threaded
Open this post in threaded view
|

Re: when use broadcast variable and run on bigdata display this error please help

hagersaleh
please help
Reply | Threaded
Open this post in threaded view
|

Re: when use broadcast variable and run on bigdata display this error please help

hagersaleh
please help
Reply | Threaded
Open this post in threaded view
|

Re: when use broadcast variable and run on bigdata display this error please help

Rico Bergmann
As you can see from the exceptions your broadcast variable is too large to fit into the main memory.

I think storing that amount of data in a broadcast variable is not the best approach. Try to use a dataset for this, I would suggest.



> Am 20.08.2015 um 11:56 schrieb hagersaleh <[hidden email]>:
>
> please help
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/when-use-broadcast-variable-and-run-on-bigdata-display-this-error-please-help-tp2455p2461.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: when use broadcast variable and run on bigdata display this error please help

hagersaleh
why this is not good broadcast variable use in bigdata
Reply | Threaded
Open this post in threaded view
|

Re: when use broadcast variable and run on bigdata display this error please help

Rico Bergmann
Because the broadcasted variable is completely stored at each operator.

If you use a hash join, then both inputs can be hash partitioned. This reduces the amount of memory needed for each operator, I think.





> Am 20.08.2015 um 12:14 schrieb hagersaleh <[hidden email]>:
>
> why this is not good broadcast variable use in bigdata
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/when-use-broadcast-variable-and-run-on-bigdata-display-this-error-please-help-tp2455p2468.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: when use broadcast variable and run on bigdata display this error please help

hagersaleh
When to use broadcast variable?

Distribute data with a broadcast variable when

    The data is large
    The data has been produced by some form of computation and is already a DataSet (distributed result)
    Typical use case: Redistribute intermediate results, such as trained models


from link https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables
Reply | Threaded
Open this post in threaded view
|

Re: when use broadcast variable and run on bigdata display this error please help

hagersaleh
Note: As the content of broadcast variables is kept in-memory on each node, it should not become too large. For simpler things like scalar values you can simply make parameters part of the closure of a function, or use the withParameters(...) method to pass in a configuration.
Reply | Threaded
Open this post in threaded view
|

Re: when use broadcast variable and run on bigdata display this error please help

hagersaleh
where are any ways for use broadcast variable with bigdata
Reply | Threaded
Open this post in threaded view
|

Re: when use broadcast variable and run on bigdata display this error please help

Chiwan Park-2
Hi hagersaleh,

Sorry for late reply.

I think using an external system could be a solution for large scale data. To use an external system, you have to implement rich functions such as RichFilterFunction, RichMapFunction, …, etc.


Regards,
Chiwan Park


> On Aug 30, 2015, at 1:30 AM, hagersaleh <[hidden email]> wrote:
>
> where are any ways for use broadcast variable with bigdata
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/when-use-broadcast-variable-and-run-on-bigdata-display-this-error-please-help-tp2455p2566.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.




Reply | Threaded
Open this post in threaded view
|

Re: when use broadcast variable and run on bigdata display this error please help

Stephan Ewen
Chiwan has a good point. Once the data that needs to be available to all machines is too large for one machine, there is no good solution any more. The best approach is an external store to which all nodes have access. It is not going to be terribly fast, though.

If you are in the situation that you need to broadcast so much data, you may want to rethink your approach to the problem in the first place. Is there no solution that can work with partitioned data? Even at the cost of re-partitioning twice or so?

On Thu, Sep 3, 2015 at 10:35 AM, Chiwan Park <[hidden email]> wrote:
Hi hagersaleh,

Sorry for late reply.

I think using an external system could be a solution for large scale data. To use an external system, you have to implement rich functions such as RichFilterFunction, RichMapFunction, …, etc.


Regards,
Chiwan Park


> On Aug 30, 2015, at 1:30 AM, hagersaleh <[hidden email]> wrote:
>
> where are any ways for use broadcast variable with bigdata
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/when-use-broadcast-variable-and-run-on-bigdata-display-this-error-please-help-tp2455p2566.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.





Reply | Threaded
Open this post in threaded view
|

Re: when use broadcast variable and run on bigdata display this error please help

hagersaleh
Hi Chiwan Park
not understand this solution please explain more
Reply | Threaded
Open this post in threaded view
|

Re: when use broadcast variable and run on bigdata display this error please help

Chiwan Park-2
Hi hagersaleh,

You should know why the error occurred with large scale data. Broadcast variables can handle only data of which size is fit for single machine.

I meant that using an external system such as Redis, HBase, …, etc. The connection with the external system could be initialized in `open` method of rich functions such as `RichFilterFunction`, `RichFlatMapFunction`, …, etc..

You can choose another solution which Stephen said. He said that rethink your approach. I think that rethinking your algorithm would be better than my suggestion.

From your code, I don’t understand why you want to use broadcast variable. You can do same thing with filter and join operations. Here is my implementation [1].


Regards,
Chiwan Park

[1] https://gist.github.com/chiwanpark/a0b0269c9a9b058d15d3


> On Sep 4, 2015, at 3:51 AM, hagersaleh <[hidden email]> wrote:
>
> Hi Chiwan Park
> not understand this solution please explain more
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/when-use-broadcast-variable-and-run-on-bigdata-display-this-error-please-help-tp2455p2676.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.




Reply | Threaded
Open this post in threaded view
|

Re: when use broadcast variable and run on bigdata display this error please help

hagersaleh
very thanks Chiwan Park