when run this program in big data display this error but when run on small data not display error why
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Customer> customer = getCustomerDataSet(env,mask,l,map); DataSet<Orders> order= getOrdersDataSet(env,maskorder,l1,maporder); customer.filter(new RichFilterFunction<Customer>() { private Collection<Orders> order1; @Override public void open(Configuration parameters) throws Exception { order1 = getRuntimeContext().getBroadcastVariable("order"); } @Override public boolean filter(Customer c) throws Exception { for(Orders o: order1){ // System.out.println("c.f0="+c.f0+" o.f0="+o.f0+" "+c.f0.equals(o.f0)); if(((c.f0.equals(o.f1)) && (c.f1.equals("AUTOMOBILE"))) && ((o.f2.equals("O")) || (o.f0==7))) return true; } return false; } }).withBroadcastSet(order,"order").writeAsCsv("/home/hadoop/Desktop/Dataset/complex_query_optimization.csv","\n","|", WriteMode.OVERWRITE); env.execute(); Error 08/19/2015 07:49:23 Job execution switched to status RUNNING. 08/19/2015 07:49:23 DataSource (at getOrdersDataSet(TPCHQuery3.java:319) (org.apache.flink.api.java.io.CsvInputFormat))(1/1) switched to SCHEDULED 08/19/2015 07:49:23 DataSource (at getOrdersDataSet(TPCHQuery3.java:319) (org.apache.flink.api.java.io.CsvInputFormat))(1/1) switched to DEPLOYING 08/19/2015 07:49:23 DataSource (at getCustomerDataSet(TPCHQuery3.java:282) (org.apache.flink.api.java.io.CsvInputFormat))(1/1) switched to SCHEDULED 08/19/2015 07:49:23 DataSource (at getCustomerDataSet(TPCHQuery3.java:282) (org.apache.flink.api.java.io.CsvInputFormat))(1/1) switched to DEPLOYING 08/19/2015 07:49:23 DataSource (at getCustomerDataSet(TPCHQuery3.java:282) (org.apache.flink.api.java.io.CsvInputFormat))(1/1) switched to RUNNING 08/19/2015 07:49:23 DataSource (at getOrdersDataSet(TPCHQuery3.java:319) (org.apache.flink.api.java.io.CsvInputFormat))(1/1) switched to RUNNING 08/19/2015 07:49:23 Filter (Filter at main(TPCHQuery3.java:240))(1/1) switched to SCHEDULED 08/19/2015 07:49:23 Filter (Filter at main(TPCHQuery3.java:240))(1/1) switched to DEPLOYING 08/19/2015 07:49:23 Filter (Filter at main(TPCHQuery3.java:240))(1/1) switched to RUNNING 08/19/2015 07:50:04 Filter (Filter at main(TPCHQuery3.java:240))(1/1) switched to FAILED java.io.IOException: Materialization of the broadcast variable failed. at org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization.materializeVariable(BroadcastVariableMaterialization.java:154) at org.apache.flink.runtime.broadcast.BroadcastVariableManager.materializeBroadcastVariable(BroadcastVariableManager.java:50) at org.apache.flink.runtime.operators.RegularPactTask.readAndSetBroadcastInput(RegularPactTask.java:439) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:358) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:724) Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.Long.valueOf(Long.java:577) at org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:68) at org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:27) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:64) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) at org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization.materializeVariable(BroadcastVariableMaterialization.java:115) ... 5 more 08/19/2015 07:50:04 Job execution switched to status FAILING. 08/19/2015 07:50:04 DataSource (at getCustomerDataSet(TPCHQuery3.java:282) (org.apache.flink.api.java.io.CsvInputFormat))(1/1) switched to CANCELING 08/19/2015 07:50:04 DataSource (at getOrdersDataSet(TPCHQuery3.java:319) (org.apache.flink.api.java.io.CsvInputFormat))(1/1) switched to CANCELING 08/19/2015 07:50:04 DataSink (CsvOutputFormat (path: /home/hadoop/Desktop/Dataset/complex_query_optimization.csv, delimiter: |))(1/1) switched to CANCELED 08/19/2015 07:50:04 DataSource (at getOrdersDataSet(TPCHQuery3.java:319) (org.apache.flink.api.java.io.CsvInputFormat))(1/1) switched to CANCELED 08/19/2015 07:50:04 DataSource (at getCustomerDataSet(TPCHQuery3.java:282) (org.apache.flink.api.java.io.CsvInputFormat))(1/1) switched to CANCELED org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. at org.apache.flink.client.program.Client.run(Client.java:413) at org.apache.flink.client.program.Client.run(Client.java:356) at org.apache.flink.client.program.Client.run(Client.java:349) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) at org.apache.flink.examples.java.relational.TPCHQuery3.main(TPCHQuery3.java:261) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) at org.apache.flink.client.program.Client.run(Client.java:315) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.io.IOException: Materialization of the broadcast variable failed. at org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization.materializeVariable(BroadcastVariableMaterialization.java:154) at org.apache.flink.runtime.broadcast.BroadcastVariableManager.materializeBroadcastVariable(BroadcastVariableManager.java:50) at org.apache.flink.runtime.operators.RegularPactTask.readAndSetBroadcastInput(RegularPactTask.java:439) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPact Task.java:358) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:724) Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.Long.valueOf(Long.java:577) at org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:68) at org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:27) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:64) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) at org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization.materializeVariable(BroadcastVariableMaterialization.java:115) ... 5 more |
please help
|
please help
|
As you can see from the exceptions your broadcast variable is too large to fit into the main memory.
I think storing that amount of data in a broadcast variable is not the best approach. Try to use a dataset for this, I would suggest. > Am 20.08.2015 um 11:56 schrieb hagersaleh <[hidden email]>: > > please help > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/when-use-broadcast-variable-and-run-on-bigdata-display-this-error-please-help-tp2455p2461.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
why this is not good broadcast variable use in bigdata
|
Because the broadcasted variable is completely stored at each operator.
If you use a hash join, then both inputs can be hash partitioned. This reduces the amount of memory needed for each operator, I think. > Am 20.08.2015 um 12:14 schrieb hagersaleh <[hidden email]>: > > why this is not good broadcast variable use in bigdata > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/when-use-broadcast-variable-and-run-on-bigdata-display-this-error-please-help-tp2455p2468.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
When to use broadcast variable?
Distribute data with a broadcast variable when The data is large The data has been produced by some form of computation and is already a DataSet (distributed result) Typical use case: Redistribute intermediate results, such as trained models from link https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables |
Note: As the content of broadcast variables is kept in-memory on each node, it should not become too large. For simpler things like scalar values you can simply make parameters part of the closure of a function, or use the withParameters(...) method to pass in a configuration.
|
where are any ways for use broadcast variable with bigdata
|
Hi hagersaleh,
Sorry for late reply. I think using an external system could be a solution for large scale data. To use an external system, you have to implement rich functions such as RichFilterFunction, RichMapFunction, …, etc. Regards, Chiwan Park > On Aug 30, 2015, at 1:30 AM, hagersaleh <[hidden email]> wrote: > > where are any ways for use broadcast variable with bigdata > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/when-use-broadcast-variable-and-run-on-bigdata-display-this-error-please-help-tp2455p2566.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Chiwan has a good point. Once the data that needs to be available to all machines is too large for one machine, there is no good solution any more. The best approach is an external store to which all nodes have access. It is not going to be terribly fast, though. If you are in the situation that you need to broadcast so much data, you may want to rethink your approach to the problem in the first place. Is there no solution that can work with partitioned data? Even at the cost of re-partitioning twice or so? On Thu, Sep 3, 2015 at 10:35 AM, Chiwan Park <[hidden email]> wrote: Hi hagersaleh, |
Hi Chiwan Park
not understand this solution please explain more |
Hi hagersaleh,
You should know why the error occurred with large scale data. Broadcast variables can handle only data of which size is fit for single machine. I meant that using an external system such as Redis, HBase, …, etc. The connection with the external system could be initialized in `open` method of rich functions such as `RichFilterFunction`, `RichFlatMapFunction`, …, etc.. You can choose another solution which Stephen said. He said that rethink your approach. I think that rethinking your algorithm would be better than my suggestion. From your code, I don’t understand why you want to use broadcast variable. You can do same thing with filter and join operations. Here is my implementation [1]. Regards, Chiwan Park [1] https://gist.github.com/chiwanpark/a0b0269c9a9b058d15d3 > On Sep 4, 2015, at 3:51 AM, hagersaleh <[hidden email]> wrote: > > Hi Chiwan Park > not understand this solution please explain more > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/when-use-broadcast-variable-and-run-on-bigdata-display-this-error-please-help-tp2455p2676.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
very thanks Chiwan Park
|
Free forum by Nabble | Edit this page |