weird client failure/timeout

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

weird client failure/timeout

abhishekrs
I am trying to construct a topology like this (shown for parallelism of 4) - basically n parallel windowed processing sub-pipelines with single source and single sink:


I am getting the following  failure (if I go beyond 28 - found empirically using binary search). There is nothing in the job manager logs to troubleshoot this further.

Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
Using address 127.0.0.1:6123 to connect to JobManager.
JobManager web interface address http://127.0.0.1:10620
Starting execution of program
Submitting job with JobID: 27ae3db2946aac3336941bdfa184e537. Waiting for job completion.
Connected to JobManager at Actor[<a href="akka.tcp://flink@127.0.0.1:6123/user/jobmanager#2043695445" class="">akka.tcp://flink@127.0.0.1:6123/user/jobmanager#2043695445]

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Communication with JobManager failed: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:410)
        at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:95)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:383)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68)
        at com.tetration.pipeline.IngestionPipelineMain.main(IngestionPipelineMain.java:116)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:510)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:404)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:321)
        at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Communication with JobManager failed: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
        at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:137)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:406)
        ... 15 more
Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
        at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:264)
        at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
        at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
        at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The code to reproduce this problem is shown below (flink job submission itself fails, the code has been dumbed down to focus on the topology I am trying to build)

int nParts = cfg.getInt("dummyPartitions", 4);

SingleOutputStreamOperator<String> in = env.socketTextStream("localhost",
cfg.getInt("dummyPort", 16408)).setParallelism(1).name("src");

SingleOutputStreamOperator<String> fanout =
in.flatMap(new FlatMapFunction<String, String>() {
@Override public void flatMap(String input, Collector<String> out)
throws Exception {
for (int i = 0; i < nParts; i++) {
out.collect(Integer.toString(i));
}
}
}).setParallelism(1).name("flatmap");


SplitStream<String> afterSplit =
fanout.split(value -> Collections.singletonList(value));

ArrayList<DataStream<String>> splitUp = new ArrayList<>(nParts);
for (int i = 0; i < nParts; i++) {
splitUp.add(
afterSplit.select(Integer.toString(i))
.map(a -> a).startNewChain().setParallelism(1)
.keyBy(s -> s).window(TumblingEventTimeWindows.of(Time.seconds(10))).max(0).setParallelism(1)
);
}

DataStream<String> combined = splitUp.get(0);
for (int i = 1; i < nParts; i++) {
combined = combined.union(splitUp.get(i));
}

combined.print().setParallelism(1);




Reply | Threaded
Open this post in threaded view
|

Re: weird client failure/timeout

abhishekrs
I am using version 1.1.4 (latest stable)

On Jan 23, 2017, at 12:41 AM, Abhishek R. Singh <[hidden email]> wrote:

I am trying to construct a topology like this (shown for parallelism of 4) - basically n parallel windowed processing sub-pipelines with single source and single sink:

<PastedGraphic-1.png>

I am getting the following  failure (if I go beyond 28 - found empirically using binary search). There is nothing in the job manager logs to troubleshoot this further.

Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
Using address 127.0.0.1:6123 to connect to JobManager.
JobManager web interface address http://127.0.0.1:10620
Starting execution of program
Submitting job with JobID: 27ae3db2946aac3336941bdfa184e537. Waiting for job completion.
Connected to JobManager at Actor[<a href="akka.tcp://flink@127.0.0.1:6123/user/jobmanager#2043695445" class="">akka.tcp://flink@127.0.0.1:6123/user/jobmanager#2043695445]

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Communication with JobManager failed: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:410)
        at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:95)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:383)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68)
        at com.tetration.pipeline.IngestionPipelineMain.main(IngestionPipelineMain.java:116)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:510)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:404)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:321)
        at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Communication with JobManager failed: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
        at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:137)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:406)
        ... 15 more
Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
        at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:264)
        at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
        at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
        at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The code to reproduce this problem is shown below (flink job submission itself fails, the code has been dumbed down to focus on the topology I am trying to build)

int nParts = cfg.getInt("dummyPartitions", 4);

SingleOutputStreamOperator<String> in = env.socketTextStream("localhost",
cfg.getInt("dummyPort", 16408)).setParallelism(1).name("src");

SingleOutputStreamOperator<String> fanout =
in.flatMap(new FlatMapFunction<String, String>() {
@Override public void flatMap(String input, Collector<String> out)
throws Exception {
for (int i = 0; i < nParts; i++) {
out.collect(Integer.toString(i));
}
}
}).setParallelism(1).name("flatmap");


SplitStream<String> afterSplit =
fanout.split(value -> Collections.singletonList(value));

ArrayList<DataStream<String>> splitUp = new ArrayList<>(nParts);
for (int i = 0; i < nParts; i++) {
splitUp.add(
afterSplit.select(Integer.toString(i))
.map(a -> a).startNewChain().setParallelism(1)
.keyBy(s -> s).window(TumblingEventTimeWindows.of(Time.seconds(10))).max(0).setParallelism(1)
);
}

DataStream<String> combined = splitUp.get(0);
for (int i = 1; i < nParts; i++) {
combined = combined.union(splitUp.get(i));
}

combined.print().setParallelism(1);





Reply | Threaded
Open this post in threaded view
|

Re: weird client failure/timeout

Jonas Gröger
The exception says that
Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
Did you already try that?
Reply | Threaded
Open this post in threaded view
|

Re: weird client failure/timeout

abhishekrs
yes, I had increased it to 5 minutes. It just sits there and bails out again.

> On Jan 23, 2017, at 1:47 AM, Jonas <[hidden email]> wrote:
>
> The exception says that
>
> Did you already try that?
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/weird-client-failure-timeout-tp11201p11204.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: weird client failure/timeout

abhishekrs
I even make it 10 minutes:

akka.client.timeout: 600s

But doesn’t feel like it is taking effect. It still comes out at about the same time with the same error.

-Abhishek-

On Jan 23, 2017, at 6:04 AM, Abhishek R. Singh <[hidden email]> wrote:

yes, I had increased it to 5 minutes. It just sits there and bails out again.

On Jan 23, 2017, at 1:47 AM, Jonas <[hidden email]> wrote:

The exception says that

Did you already try that?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/weird-client-failure-timeout-tp11201p11204.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: weird client failure/timeout

abhishekrs
Is there a limit on how many DataStreams can be defined in a streaming program?

Looks like flink has problems handling too many data streams? I simplified my topology further. For eg, this works (parallelism of 4)


However, when I try to go beyond 51 (found empirically by parametrizing nParts), it barfs again. Submission fails, it wants me to increase akka.client.timeout

Here is the reduced code for repro (union at the end itself is not an issue). It is the parallelism of the first for loop:
int nParts = cfg.getInt("dummyPartitions", 4);
boolean combineAtEnd = cfg.getBoolean("dummyCombineAtEnd", true);

// create lots of streams
List<SingleOutputStreamOperator<String>> streams = new ArrayList<>(nParts);
for (int i = 0; i < nParts; i++) {
streams.add(env
.readFile(
new TextInputFormat(new Path("/tmp/input")),
"/tmp/input",
FileProcessingMode.PROCESS_CONTINUOUSLY,
1000,
FilePathFilter.createDefaultFilter())
.setParallelism(1).name("src"));
}

if (combineAtEnd == true) {
DataStream<String> combined = streams.get(0);
for (int i = 1; i < nParts; i++) {
combined = combined.union(streams.get(i));
}
combined.print().setParallelism(1);
} else { // die parallel
for (int i = 1; i < nParts; i++) {
streams.get(i).print();
}
}


On Jan 23, 2017, at 6:14 AM, Abhishek R. Singh <[hidden email]> wrote:

I even make it 10 minutes:

akka.client.timeout: 600s

But doesn’t feel like it is taking effect. It still comes out at about the same time with the same error.

-Abhishek-

On Jan 23, 2017, at 6:04 AM, Abhishek R. Singh <[hidden email]> wrote:

yes, I had increased it to 5 minutes. It just sits there and bails out again.

On Jan 23, 2017, at 1:47 AM, Jonas <[hidden email]> wrote:

The exception says that

Did you already try that?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/weird-client-failure-timeout-tp11201p11204.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



Reply | Threaded
Open this post in threaded view
|

Re: weird client failure/timeout

abhishekrs
Actually, I take it back. It is the last union that is causing issues (of job being un-submittable). If I don’t conbineAtEnd, I can go higher (at least deploy the job), all the way up to 63. 

After that it starts failing in too many files open in Rocks DB (which I can understand and is at least better than silently not accepting my job).

Caused by: java.lang.RuntimeException: Error while opening RocksDB instance.
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:306)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:821)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:118)
        ... 4 more
Caused by: org.rocksdb.RocksDBException: IO error: /var/folders/l1/ncffkbq11_lg6tjk_3cvc_n00000gn/T/flink-io-45a78866-a9da-40ca-be51-a894c4fac9be/3815eb68c3777ba4f504e8529db6e145/StreamSource_39_0/dummy_state/7ff48c49-b6ce-4de8-ba7e-8a240b181ae2/db/MANIFEST-000001: Too many open files
        at org.rocksdb.RocksDB.open(Native Method)
        at org.rocksdb.RocksDB.open(RocksDB.java:239)
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:304)
        ... 6 more


On Jan 23, 2017, at 8:20 AM, Abhishek R. Singh <[hidden email]> wrote:

Is there a limit on how many DataStreams can be defined in a streaming program?

Looks like flink has problems handling too many data streams? I simplified my topology further. For eg, this works (parallelism of 4)

<PastedGraphic-2.png>

However, when I try to go beyond 51 (found empirically by parametrizing nParts), it barfs again. Submission fails, it wants me to increase akka.client.timeout

Here is the reduced code for repro (union at the end itself is not an issue). It is the parallelism of the first for loop:
int nParts = cfg.getInt("dummyPartitions", 4);
boolean combineAtEnd = cfg.getBoolean("dummyCombineAtEnd", true);

// create lots of streams
List<SingleOutputStreamOperator<String>> streams = new ArrayList<>(nParts);
for (int i = 0; i < nParts; i++) {
streams.add(env
.readFile(
new TextInputFormat(new Path("/tmp/input")),
"/tmp/input",
FileProcessingMode.PROCESS_CONTINUOUSLY,
1000,
FilePathFilter.createDefaultFilter())
.setParallelism(1).name("src"));
}

if (combineAtEnd == true) {
DataStream<String> combined = streams.get(0);
for (int i = 1; i < nParts; i++) {
combined = combined.union(streams.get(i));
}
combined.print().setParallelism(1);
} else { // die parallel
for (int i = 1; i < nParts; i++) {
streams.get(i).print();
}
}


On Jan 23, 2017, at 6:14 AM, Abhishek R. Singh <[hidden email]> wrote:

I even make it 10 minutes:

akka.client.timeout: 600s

But doesn’t feel like it is taking effect. It still comes out at about the same time with the same error.

-Abhishek-

On Jan 23, 2017, at 6:04 AM, Abhishek R. Singh <[hidden email]> wrote:

yes, I had increased it to 5 minutes. It just sits there and bails out again.

On Jan 23, 2017, at 1:47 AM, Jonas <[hidden email]> wrote:

The exception says that

Did you already try that?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/weird-client-failure-timeout-tp11201p11204.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.




Reply | Threaded
Open this post in threaded view
|

Re: weird client failure/timeout

Stephan Ewen
Hi!

I think what you are seeing is the effect of too mans tasks going to the same task slot. Have a look here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/runtime.html#task-slots-and-resources

By default, Flink shares task slots across all distinct pipelines of the same program, for easier "getting started" scheduling behavior.
For proper deployments (or setups where you just have very , I would make sure that the program sets different "sharing groups" (via "..slotSharingGroup()") on the different streams.

Also, rather than defining 100s of different sources, I would consider defining one source and making it parallel. It works better with Flink's default scheduling parameters.

Hope that helps.

Stephan




On Mon, Jan 23, 2017 at 5:40 PM, Abhishek R. Singh <[hidden email]> wrote:
Actually, I take it back. It is the last union that is causing issues (of job being un-submittable). If I don’t conbineAtEnd, I can go higher (at least deploy the job), all the way up to 63. 

After that it starts failing in too many files open in Rocks DB (which I can understand and is at least better than silently not accepting my job).

Caused by: java.lang.RuntimeException: Error while opening RocksDB instance.
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:306)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:821)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:118)
        ... 4 more
Caused by: org.rocksdb.RocksDBException: IO error: /var/folders/l1/ncffkbq11_lg6tjk_3cvc_n00000gn/T/flink-io-45a78866-a9da-40ca-be51-a894c4fac9be/3815eb68c3777ba4f504e8529db6e145/StreamSource_39_0/dummy_state/7ff48c49-b6ce-4de8-ba7e-8a240b181ae2/db/MANIFEST-000001: Too many open files
        at org.rocksdb.RocksDB.open(Native Method)
        at org.rocksdb.RocksDB.open(RocksDB.java:239)
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:304)
        ... 6 more


On Jan 23, 2017, at 8:20 AM, Abhishek R. Singh <[hidden email]> wrote:

Is there a limit on how many DataStreams can be defined in a streaming program?

Looks like flink has problems handling too many data streams? I simplified my topology further. For eg, this works (parallelism of 4)

<PastedGraphic-2.png>

However, when I try to go beyond 51 (found empirically by parametrizing nParts), it barfs again. Submission fails, it wants me to increase akka.client.timeout

Here is the reduced code for repro (union at the end itself is not an issue). It is the parallelism of the first for loop:
int nParts = cfg.getInt("dummyPartitions", 4);
boolean combineAtEnd = cfg.getBoolean("dummyCombineAtEnd", true);

// create lots of streams
List<SingleOutputStreamOperator<String>> streams = new ArrayList<>(nParts);
for (int i = 0; i < nParts; i++) {
streams.add(env
.readFile(
new TextInputFormat(new Path("/tmp/input")),
"/tmp/input",
FileProcessingMode.PROCESS_CONTINUOUSLY,
1000,
FilePathFilter.createDefaultFilter())
.setParallelism(1).name("src"));
}

if (combineAtEnd == true) {
DataStream<String> combined = streams.get(0);
for (int i = 1; i < nParts; i++) {
combined = combined.union(streams.get(i));
}
combined.print().setParallelism(1);
} else { // die parallel
for (int i = 1; i < nParts; i++) {
streams.get(i).print();
}
}


On Jan 23, 2017, at 6:14 AM, Abhishek R. Singh <[hidden email]> wrote:

I even make it 10 minutes:

akka.client.timeout: 600s

But doesn’t feel like it is taking effect. It still comes out at about the same time with the same error.

-Abhishek-

On Jan 23, 2017, at 6:04 AM, Abhishek R. Singh <[hidden email]> wrote:

yes, I had increased it to 5 minutes. It just sits there and bails out again.

On Jan 23, 2017, at 1:47 AM, Jonas <[hidden email]> wrote:

The exception says that

Did you already try that?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/weird-client-failure-timeout-tp11201p11204.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.





Reply | Threaded
Open this post in threaded view
|

Re: weird client failure/timeout

abhishekrs
Hi Stephan,

This did not work. For the working case I do see a better utilization of available slots. However the non working case still doesn't work. 

Basically I assigned a unique group to the sources in my for loop - given I have way more slots than the parallelism I seek.

I know about the parallel source. Doesn't source eat up a slot (like spark)? Since my data is pre partitioned, I was merely monitoring from source (keeping it lightweight) and then fanning out to do the actual reads/work from the next (event driven) operator (after splitting the stream from source). 

This is more like a batch use case. However, I want to use a single streaming job to do streaming + batch. This batch job emits a application level marker that gets fanned back in to declare success/completion for the batch. 

Since my data is pre partitioned, my windows don't need to run globally. Also I don't know how to have a global keyBy (shuffle) and then send a app marker from source to all the operators. Which is why I keep things hand partitioned (I can send something from source to each of my partitions and they get sent to my sink for a count up to indicate completion). I can control how the markers are sent forward, and my keyBy and windowing happens with a parallelism of 1 - so I know I can reach the next stage to keep propagating  my marker. Except that the pattern doesn't scale beyond 8 partitions:(

-Abhishek-

On Mon, Jan 23, 2017 at 10:42 AM Stephan Ewen <[hidden email]> wrote:
Hi!

I think what you are seeing is the effect of too mans tasks going to the same task slot. Have a look here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/runtime.html#task-slots-and-resources

By default, Flink shares task slots across all distinct pipelines of the same program, for easier "getting started" scheduling behavior.
For proper deployments (or setups where you just have very , I would make sure that the program sets different "sharing groups" (via "..slotSharingGroup()") on the different streams.

Also, rather than defining 100s of different sources, I would consider defining one source and making it parallel. It works better with Flink's default scheduling parameters.

Hope that helps.

Stephan




On Mon, Jan 23, 2017 at 5:40 PM, Abhishek R. Singh <[hidden email]> wrote:
Actually, I take it back. It is the last union that is causing issues (of job being un-submittable). If I don’t conbineAtEnd, I can go higher (at least deploy the job), all the way up to 63. 

After that it starts failing in too many files open in Rocks DB (which I can understand and is at least better than silently not accepting my job).

Caused by: java.lang.RuntimeException: Error while opening RocksDB instance.
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:306)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:821)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:118)
        ... 4 more
Caused by: org.rocksdb.RocksDBException: IO error: /var/folders/l1/ncffkbq11_lg6tjk_3cvc_n00000gn/T/flink-io-45a78866-a9da-40ca-be51-a894c4fac9be/3815eb68c3777ba4f504e8529db6e145/StreamSource_39_0/dummy_state/7ff48c49-b6ce-4de8-ba7e-8a240b181ae2/db/MANIFEST-000001: Too many open files
        at org.rocksdb.RocksDB.open(Native Method)
        at org.rocksdb.RocksDB.open(RocksDB.java:239)
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:304)
        ... 6 more


On Jan 23, 2017, at 8:20 AM, Abhishek R. Singh <[hidden email]> wrote:

Is there a limit on how many DataStreams can be defined in a streaming program?

Looks like flink has problems handling too many data streams? I simplified my topology further. For eg, this works (parallelism of 4)

<PastedGraphic-2.png>

However, when I try to go beyond 51 (found empirically by parametrizing nParts), it barfs again. Submission fails, it wants me to increase akka.client.timeout

Here is the reduced code for repro (union at the end itself is not an issue). It is the parallelism of the first for loop:
int nParts = cfg.getInt("dummyPartitions", 4);
boolean combineAtEnd = cfg.getBoolean("dummyCombineAtEnd", true);

// create lots of streams
List<SingleOutputStreamOperator<String>> streams = new ArrayList<>(nParts);
for (int i = 0; i < nParts; i++) {
streams.add(env
.readFile(
new TextInputFormat(new Path("/tmp/input")),
"/tmp/input",
FileProcessingMode.PROCESS_CONTINUOUSLY,
1000,
FilePathFilter.createDefaultFilter())
.setParallelism(1).name("src"));
}

if (combineAtEnd == true) {
DataStream<String> combined = streams.get(0);
for (int i = 1; i < nParts; i++) {
combined = combined.union(streams.get(i));
}
combined.print().setParallelism(1);
} else { // die parallel
for (int i = 1; i < nParts; i++) {
streams.get(i).print();
}
}


On Jan 23, 2017, at 6:14 AM, Abhishek R. Singh <[hidden email]> wrote:

I even make it 10 minutes:

akka.client.timeout: 600s

But doesn’t feel like it is taking effect. It still comes out at about the same time with the same error.

-Abhishek-

On Jan 23, 2017, at 6:04 AM, Abhishek R. Singh <[hidden email]> wrote:

yes, I had increased it to 5 minutes. It just sits there and bails out again.

On Jan 23, 2017, at 1:47 AM, Jonas <[hidden email]> wrote:

The exception says that

Did you already try that?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/weird-client-failure-timeout-tp11201p11204.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.