java.util.concurrent.ExecutionException

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

java.util.concurrent.ExecutionException

kant kodali
Hi All,

I am just trying to read edges which has the following format in Kafka

1,2
1,3
1,5

using the Table API and then converting to DataStream of Edge Objects and printing them. However I am getting java.util.concurrent.ExecutionException but not sure why?

Here is the sample code

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.graph.Edge;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.NullValue;
import org.apache.flink.types.Row;

import java.util.UUID;

public class Test {

public static void main(String... args) throws Exception {

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend((StateBackend) new RocksDBStateBackend("file:///tmp/rocksdb"));

StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);

bsTableEnv.connect(
new Kafka()
.property("bootstrap.servers", "localhost:9092")
.property("zookeeper.connect", "localhost:2181")
.property("group.id", UUID.randomUUID().toString())
.startFromEarliest()
.version("universal")
.topic("edges")
)
.withFormat(new Csv().fieldDelimiter(','))
.withSchema(
new Schema()
.field("source", DataTypes.BIGINT())
.field("target", DataTypes.BIGINT())
)
.createTemporaryTable("kafka_source");

Table kafkaSourceTable = bsTableEnv.sqlQuery("select * from kafka_source");

TypeInformation<Edge<Long, NullValue>> edgeTypeInformation = TypeInformation.of(new TypeHint<Edge<Long, NullValue>>() {
@Override
public TypeInformation<Edge<Long, NullValue>> getTypeInfo() {
return super.getTypeInfo();
}
});

DataStream<Edge<Long, NullValue>> edges = bsTableEnv.toAppendStream(kafkaSourceTable, Row.class)
.map(row -> new Edge<>((Long) row.getField(0), (Long) row.getField(1), NullValue.getInstance()))
.returns(edgeTypeInformation);

edges.print();

bsTableEnv.execute("sample job");
}
}


Reply | Threaded
Open this post in threaded view
|

Re: java.util.concurrent.ExecutionException

Gary Yao-5
Hi,

Can you post the complete stacktrace?

Best,
Gary

On Tue, Mar 3, 2020 at 1:08 PM kant kodali <[hidden email]> wrote:
Hi All,

I am just trying to read edges which has the following format in Kafka

1,2
1,3
1,5

using the Table API and then converting to DataStream of Edge Objects and printing them. However I am getting java.util.concurrent.ExecutionException but not sure why?

Here is the sample code

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.graph.Edge;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.NullValue;
import org.apache.flink.types.Row;

import java.util.UUID;

public class Test {

public static void main(String... args) throws Exception {

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend((StateBackend) new RocksDBStateBackend("file:///tmp/rocksdb"));

StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);

bsTableEnv.connect(
new Kafka()
.property("bootstrap.servers", "localhost:9092")
.property("zookeeper.connect", "localhost:2181")
.property("group.id", UUID.randomUUID().toString())
.startFromEarliest()
.version("universal")
.topic("edges")
)
.withFormat(new Csv().fieldDelimiter(','))
.withSchema(
new Schema()
.field("source", DataTypes.BIGINT())
.field("target", DataTypes.BIGINT())
)
.createTemporaryTable("kafka_source");

Table kafkaSourceTable = bsTableEnv.sqlQuery("select * from kafka_source");

TypeInformation<Edge<Long, NullValue>> edgeTypeInformation = TypeInformation.of(new TypeHint<Edge<Long, NullValue>>() {
@Override
public TypeInformation<Edge<Long, NullValue>> getTypeInfo() {
return super.getTypeInfo();
}
});

DataStream<Edge<Long, NullValue>> edges = bsTableEnv.toAppendStream(kafkaSourceTable, Row.class)
.map(row -> new Edge<>((Long) row.getField(0), (Long) row.getField(1), NullValue.getInstance()))
.returns(edgeTypeInformation);

edges.print();

bsTableEnv.execute("sample job");
}
}


Reply | Threaded
Open this post in threaded view
|

Re: java.util.concurrent.ExecutionException

kant kodali

The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: f57b682f5867a8bf6ff6e1ddce93a1ab)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)

at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)

at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: f57b682f5867a8bf6ff6e1ddce93a1ab)

at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)

at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)

at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)

at Test.main(Test.java:71)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)

... 8 more

Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: f57b682f5867a8bf6ff6e1ddce93a1ab)

at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)

at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)

at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)

at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)

at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)

at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)

at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)

at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)

at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)

at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)

at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)

at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)

at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)

at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)

at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)

... 19 more

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy

at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)

at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)

at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)

at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)

at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)

at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)

at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)

at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

at akka.dispatch.Mailbox.run(Mailbox.scala:225)

at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition edges-0 could be determined


On Tue, Mar 3, 2020 at 8:03 AM Gary Yao <[hidden email]> wrote:
Hi,

Can you post the complete stacktrace?

Best,
Gary

On Tue, Mar 3, 2020 at 1:08 PM kant kodali <[hidden email]> wrote:
Hi All,

I am just trying to read edges which has the following format in Kafka

1,2
1,3
1,5

using the Table API and then converting to DataStream of Edge Objects and printing them. However I am getting java.util.concurrent.ExecutionException but not sure why?

Here is the sample code

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.graph.Edge;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.NullValue;
import org.apache.flink.types.Row;

import java.util.UUID;

public class Test {

public static void main(String... args) throws Exception {

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend((StateBackend) new RocksDBStateBackend("file:///tmp/rocksdb"));

StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);

bsTableEnv.connect(
new Kafka()
.property("bootstrap.servers", "localhost:9092")
.property("zookeeper.connect", "localhost:2181")
.property("group.id", UUID.randomUUID().toString())
.startFromEarliest()
.version("universal")
.topic("edges")
)
.withFormat(new Csv().fieldDelimiter(','))
.withSchema(
new Schema()
.field("source", DataTypes.BIGINT())
.field("target", DataTypes.BIGINT())
)
.createTemporaryTable("kafka_source");

Table kafkaSourceTable = bsTableEnv.sqlQuery("select * from kafka_source");

TypeInformation<Edge<Long, NullValue>> edgeTypeInformation = TypeInformation.of(new TypeHint<Edge<Long, NullValue>>() {
@Override
public TypeInformation<Edge<Long, NullValue>> getTypeInfo() {
return super.getTypeInfo();
}
});

DataStream<Edge<Long, NullValue>> edges = bsTableEnv.toAppendStream(kafkaSourceTable, Row.class)
.map(row -> new Edge<>((Long) row.getField(0), (Long) row.getField(1), NullValue.getInstance()))
.returns(edgeTypeInformation);

edges.print();

bsTableEnv.execute("sample job");
}
}


Reply | Threaded
Open this post in threaded view
|

Re: java.util.concurrent.ExecutionException

kant kodali
Hi Gary,

This has to do with my Kafka. After restarting Kafka it seems to work fine! 

Thanks!

On Tue, Mar 3, 2020 at 8:18 AM kant kodali <[hidden email]> wrote:

The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: f57b682f5867a8bf6ff6e1ddce93a1ab)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)

at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)

at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: f57b682f5867a8bf6ff6e1ddce93a1ab)

at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)

at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)

at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)

at Test.main(Test.java:71)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)

... 8 more

Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: f57b682f5867a8bf6ff6e1ddce93a1ab)

at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)

at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)

at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)

at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)

at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)

at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)

at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)

at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)

at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)

at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)

at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)

at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)

at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)

at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)

at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)

... 19 more

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy

at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)

at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)

at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)

at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)

at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)

at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)

at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)

at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

at akka.dispatch.Mailbox.run(Mailbox.scala:225)

at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition edges-0 could be determined


On Tue, Mar 3, 2020 at 8:03 AM Gary Yao <[hidden email]> wrote:
Hi,

Can you post the complete stacktrace?

Best,
Gary

On Tue, Mar 3, 2020 at 1:08 PM kant kodali <[hidden email]> wrote:
Hi All,

I am just trying to read edges which has the following format in Kafka

1,2
1,3
1,5

using the Table API and then converting to DataStream of Edge Objects and printing them. However I am getting java.util.concurrent.ExecutionException but not sure why?

Here is the sample code

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.graph.Edge;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.NullValue;
import org.apache.flink.types.Row;

import java.util.UUID;

public class Test {

public static void main(String... args) throws Exception {

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend((StateBackend) new RocksDBStateBackend("file:///tmp/rocksdb"));

StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);

bsTableEnv.connect(
new Kafka()
.property("bootstrap.servers", "localhost:9092")
.property("zookeeper.connect", "localhost:2181")
.property("group.id", UUID.randomUUID().toString())
.startFromEarliest()
.version("universal")
.topic("edges")
)
.withFormat(new Csv().fieldDelimiter(','))
.withSchema(
new Schema()
.field("source", DataTypes.BIGINT())
.field("target", DataTypes.BIGINT())
)
.createTemporaryTable("kafka_source");

Table kafkaSourceTable = bsTableEnv.sqlQuery("select * from kafka_source");

TypeInformation<Edge<Long, NullValue>> edgeTypeInformation = TypeInformation.of(new TypeHint<Edge<Long, NullValue>>() {
@Override
public TypeInformation<Edge<Long, NullValue>> getTypeInfo() {
return super.getTypeInfo();
}
});

DataStream<Edge<Long, NullValue>> edges = bsTableEnv.toAppendStream(kafkaSourceTable, Row.class)
.map(row -> new Edge<>((Long) row.getField(0), (Long) row.getField(1), NullValue.getInstance()))
.returns(edgeTypeInformation);

edges.print();

bsTableEnv.execute("sample job");
}
}


Reply | Threaded
Open this post in threaded view
|

Re: java.util.concurrent.ExecutionException

Gary Yao-5
Hi,

Thanks for getting back, and I am glad that you were able to resolve the issue. The
root cause in the stacktrace you posted also indicates a problem related
to Kafka:

    Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition edges-0 could be determined

Best,
Gary

On Tue, Mar 3, 2020 at 5:46 PM kant kodali <[hidden email]> wrote:
Hi Gary,

This has to do with my Kafka. After restarting Kafka it seems to work fine! 

Thanks!

On Tue, Mar 3, 2020 at 8:18 AM kant kodali <[hidden email]> wrote:

The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: f57b682f5867a8bf6ff6e1ddce93a1ab)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)

at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)

at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: f57b682f5867a8bf6ff6e1ddce93a1ab)

at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)

at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)

at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)

at Test.main(Test.java:71)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)

... 8 more

Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: f57b682f5867a8bf6ff6e1ddce93a1ab)

at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)

at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)

at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)

at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)

at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)

at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)

at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)

at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)

at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)

at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)

at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)

at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)

at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)

at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)

at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)

... 19 more

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy

at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)

at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)

at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)

at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)

at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)

at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)

at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)

at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

at akka.dispatch.Mailbox.run(Mailbox.scala:225)

at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition edges-0 could be determined


On Tue, Mar 3, 2020 at 8:03 AM Gary Yao <[hidden email]> wrote:
Hi,

Can you post the complete stacktrace?

Best,
Gary

On Tue, Mar 3, 2020 at 1:08 PM kant kodali <[hidden email]> wrote:
Hi All,

I am just trying to read edges which has the following format in Kafka

1,2
1,3
1,5

using the Table API and then converting to DataStream of Edge Objects and printing them. However I am getting java.util.concurrent.ExecutionException but not sure why?

Here is the sample code

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.graph.Edge;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.NullValue;
import org.apache.flink.types.Row;

import java.util.UUID;

public class Test {

public static void main(String... args) throws Exception {

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend((StateBackend) new RocksDBStateBackend("file:///tmp/rocksdb"));

StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);

bsTableEnv.connect(
new Kafka()
.property("bootstrap.servers", "localhost:9092")
.property("zookeeper.connect", "localhost:2181")
.property("group.id", UUID.randomUUID().toString())
.startFromEarliest()
.version("universal")
.topic("edges")
)
.withFormat(new Csv().fieldDelimiter(','))
.withSchema(
new Schema()
.field("source", DataTypes.BIGINT())
.field("target", DataTypes.BIGINT())
)
.createTemporaryTable("kafka_source");

Table kafkaSourceTable = bsTableEnv.sqlQuery("select * from kafka_source");

TypeInformation<Edge<Long, NullValue>> edgeTypeInformation = TypeInformation.of(new TypeHint<Edge<Long, NullValue>>() {
@Override
public TypeInformation<Edge<Long, NullValue>> getTypeInfo() {
return super.getTypeInfo();
}
});

DataStream<Edge<Long, NullValue>> edges = bsTableEnv.toAppendStream(kafkaSourceTable, Row.class)
.map(row -> new Edge<>((Long) row.getField(0), (Long) row.getField(1), NullValue.getInstance()))
.returns(edgeTypeInformation);

edges.print();

bsTableEnv.execute("sample job");
}
}