I’m using a query to gather results:
I’d like to iterate over the results but using this causes an error:
Has there been an update in how to traverse results? Robert Cullen |
Hi Robert,
could you send us the error/stacktrace that is printed? An example how it should work is shown here: https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java Regards, Timo On 15.01.21 16:46, Robert Cullen wrote: > I’m using a query to gather results: > > |Table log_counts = tEnv.from("log_counts") > .filter($("hostname").isNotNull() .and($("hostname").isNotEqual(""))) > .window(Tumble .over(lit(30).seconds()) .on($("last_updated")).as("w")) > .groupBy($("msg_id"), $("hostname"), $("w")) .select($("msg_id"), > $("hostname"), $("msg_id").count().as("cnt")); | > > I’d like to iterate over the results but using this causes an error: > > |log_counts.execute().collect(); > | > > Has there been an update in how to traverse results? > > > Robert Cullen > 240-475-4490 > |
Timo, Here it is:
On Fri, Jan 15, 2021 at 11:32 AM Timo Walther twalthr@... wrote: Hi Robert, Robert Cullen 240-475-4490 |
How are you running the Flink cluster? What is your deplyment?
The exception clearly indicates that you found a bug. Could you open an ticket in Flink's JIRA? We need details how to reproduce it. Thanks, Timo On 15.01.21 17:57, Robert Cullen wrote: > Timo, > > Here it is: > > |2021-01-15 16:52:00,628 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > Configuring the job submission via query parameters is deprecated. > Please migrate to submitting a JSON request instead. 2021-01-15 > 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - Starting > program (detached: true) 2021-01-15 16:52:00,678 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted. 2021-01-15 > 16:52:00,678 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1. 2021-01-15 > 16:52:00,830 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received > JobGraph submission 84c9f12fe943bc7f32ee637666ed3bc1 (collect). > 2021-01-15 16:52:00,830 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting > job 84c9f12fe943bc7f32ee637666ed3bc1 (collect). 2021-01-15 16:52:00,831 > INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC > endpoint for org.apache.flink.runtime.jobmaster.JobMaster at > akka://flink/user/rpc/jobmanager_68 . 2021-01-15 16:52:00,831 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job > collect (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 16:52:00,832 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart back off > time strategy NoRestartBackoffTimeStrategy for collect > (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 16:52:00,832 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Running initialization > on master for job collect (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 > 16:52:00,836 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - > Successfully ran initialization on master in 3 ms. 2021-01-15 > 16:52:00,836 INFO > org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - > Built 1 pipelined regions in 0 ms 2021-01-15 16:52:00,836 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Using job/cluster > config to configure application-defined state backend: File State > Backend (checkpoints: 's3://flink/checkpoints', savepoints: > 's3://flink/savepoints', asynchronous: TRUE, fileStateThreshold: 20480) > 2021-01-15 16:52:00,836 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Using > application-defined state backend: File State Backend (checkpoints: > 's3://flink/checkpoints', savepoints: 's3://flink/savepoints', > asynchronous: TRUE, fileStateThreshold: 20480) 2021-01-15 16:52:06,865 > INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No > checkpoint found during restore. 2021-01-15 16:52:06,866 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Using failover > strategy > org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@39b9ad24 > for collect (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 16:52:06,866 > INFO org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl [] - > JobManager runner for job collect (84c9f12fe943bc7f32ee637666ed3bc1) was > granted leadership with session id 00000000-0000-0000-0000-000000000000 > at akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_68. > 2021-01-15 16:52:06,866 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Starting execution of > job collect (84c9f12fe943bc7f32ee637666ed3bc1) under job master id > 00000000000000000000000000000000. 2021-01-15 16:52:06,866 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Starting scheduling > with scheduling strategy > [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy] > 2021-01-15 16:52:06,866 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job collect > (84c9f12fe943bc7f32ee637666ed3bc1) switched from state CREATED to > RUNNING. 2021-01-15 16:52:06,866 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > JdbcTableSource(msg_id, hostname, last_updated) -> > SourceConversion(table=[default_catalog.default_database.log_counts, > source: [JdbcTableSource(msg_id, hostname, last_updated)]], > fields=[msg_id, hostname, last_updated]) -> > WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated - > 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, last_updated], > where=[(hostname <> _UTF-16LE'')]) (1/1) > (d22b3ac56f07e182ba5b74d68fa74fb1) switched from CREATED to SCHEDULED. > 2021-01-15 16:52:06,866 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > GroupWindowAggregate(groupBy=[msg_id, hostname], > window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id, > hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, > EXPR$0 AS cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink > (1/1) (09cee06206ad355b327cb8487773cd39) switched from CREATED to > SCHEDULED. 2021-01-15 16:52:06,866 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Cannot > serve slot request, no ResourceManager connected. Adding as pending > request [SlotRequestId{b3f1b6b4eaab427305ec15f92a39531e}] 2021-01-15 > 16:52:06,866 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - > Connecting to ResourceManager > akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000) > 2021-01-15 16:52:06,867 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Resolved > ResourceManager address, beginning registration 2021-01-15 16:52:06,867 > INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager > [] - Registering job manager > [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68 > for job 84c9f12fe943bc7f32ee637666ed3bc1. 2021-01-15 16:52:06,867 INFO > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - > Registered job manager > [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68 > for job 84c9f12fe943bc7f32ee637666ed3bc1. 2021-01-15 16:52:06,867 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - JobManager > successfully registered at ResourceManager, leader id: > 00000000000000000000000000000000. 2021-01-15 16:52:06,867 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Requesting > new slot [SlotRequestId{b3f1b6b4eaab427305ec15f92a39531e}] and profile > ResourceProfile{UNKNOWN} with allocation id > b0559997a428b1d31d9e57d6532e026b from resource manager. 2021-01-15 > 16:52:06,868 INFO > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - > Request slot with profile ResourceProfile{UNKNOWN} for job > 84c9f12fe943bc7f32ee637666ed3bc1 with allocation id > b0559997a428b1d31d9e57d6532e026b. 2021-01-15 16:52:06,874 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > JdbcTableSource(msg_id, hostname, last_updated) -> > SourceConversion(table=[default_catalog.default_database.log_counts, > source: [JdbcTableSource(msg_id, hostname, last_updated)]], > fields=[msg_id, hostname, last_updated]) -> > WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated - > 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, last_updated], > where=[(hostname <> _UTF-16LE'')]) (1/1) > (d22b3ac56f07e182ba5b74d68fa74fb1) switched from SCHEDULED to DEPLOYING. > 2021-01-15 16:52:06,882 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying > Source: JdbcTableSource(msg_id, hostname, last_updated) -> > SourceConversion(table=[default_catalog.default_database.log_counts, > source: [JdbcTableSource(msg_id, hostname, last_updated)]], > fields=[msg_id, hostname, last_updated]) -> > WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated - > 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, last_updated], > where=[(hostname <> _UTF-16LE'')]) (1/1) (attempt #0) with attempt id > d22b3ac56f07e182ba5b74d68fa74fb1 to 10.42.1.148:6122-9b9553 @ > 10.42.1.148 (dataPort=40391) with allocation id > b0559997a428b1d31d9e57d6532e026b 2021-01-15 16:52:06,883 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > GroupWindowAggregate(groupBy=[msg_id, hostname], > window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id, > hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, > EXPR$0 AS cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink > (1/1) (09cee06206ad355b327cb8487773cd39) switched from SCHEDULED to > DEPLOYING. 2021-01-15 16:52:06,883 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying > GroupWindowAggregate(groupBy=[msg_id, hostname], > window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id, > hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, > EXPR$0 AS cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink > (1/1) (attempt #0) with attempt id 09cee06206ad355b327cb8487773cd39 to > 10.42.1.148:6122-9b9553 @ 10.42.1.148 (dataPort=40391) with allocation > id b0559997a428b1d31d9e57d6532e026b 2021-01-15 16:52:07,038 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > GroupWindowAggregate(groupBy=[msg_id, hostname], > window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id, > hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, > EXPR$0 AS cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink > (1/1) (09cee06206ad355b327cb8487773cd39) switched from DEPLOYING to > RUNNING. 2021-01-15 16:52:07,038 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > JdbcTableSource(msg_id, hostname, last_updated) -> > SourceConversion(table=[default_catalog.default_database.log_counts, > source: [JdbcTableSource(msg_id, hostname, last_updated)]], > fields=[msg_id, hostname, last_updated]) -> > WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated - > 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, last_updated], > where=[(hostname <> _UTF-16LE'')]) (1/1) > (d22b3ac56f07e182ba5b74d68fa74fb1) switched from DEPLOYING to RUNNING. > 2021-01-15 16:52:07,057 INFO > org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator > [] - Received sink socket server address: /10.42.1.148:39303 > <http://10.42.1.148:39303> 2021-01-15 16:52:07,060 WARN > org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No > hostname could be resolved for the IP address 10.42.1.148, using IP > address as host name. Local input split assignment (such as for HDFS > files) may be impacted. 2021-01-15 16:52:07,988 WARN > org.apache.flink.client.deployment.application.DetachedApplicationRunner > [] - Could not execute application: > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: Failed to execute sql at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] at > org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] at > org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) > [?:1.8.0_275] at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [?:1.8.0_275] at > java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_275] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > [?:1.8.0_275] at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > [?:1.8.0_275] at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_275] at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_275] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275] > Caused by: org.apache.flink.table.api.TableException: Failed to execute > sql at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:719) > ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570) ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] > at > io.cmdaa.streaming.java.jdbc.TableConsumer.main(TableConsumer.java:192) > ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_275] at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_275] at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_275] at java.lang.reflect.Method.invoke(Method.java:498) > ~[?:1.8.0_275] at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] ... 13 more Caused by: > java.lang.IllegalArgumentException: Job client must be a > CoordinationRequestGateway. This is a bug. at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:142) ~[flink-dist_2.12-1.12.0.jar:1.12.0] > at > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.setJobClient(CollectResultFetcher.java:95) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.setJobClient(CollectResultIterator.java:98) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] at > org.apache.flink.table.planner.sinks.SelectTableSinkBase$1.setJobClient(SelectTableSinkBase.java:93) > ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:709) > ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570) ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] > at > io.cmdaa.streaming.java.jdbc.TableConsumer.main(TableConsumer.java:192) > ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_275] at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_275] at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_275] at java.lang.reflect.Method.invoke(Method.java:498) > ~[?:1.8.0_275] at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] ... 13 more 2021-01-15 16:52:07,989 > ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > Exception occurred in REST handler: Could not execute application. > 2021-01-15 16:52:08,462 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > JdbcTableSource(msg_id, hostname, last_updated) -> > SourceConversion(table=[default_catalog.default_database.log_counts, > source: [JdbcTableSource(msg_id, hostname, last_updated)]], > fields=[msg_id, hostname, last_updated]) -> > WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated - > 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, last_updated], > where=[(hostname <> _UTF-16LE'')]) (1/1) > (d22b3ac56f07e182ba5b74d68fa74fb1) switched from RUNNING to FINISHED. > 2021-01-15 16:52:08,465 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > GroupWindowAggregate(groupBy=[msg_id, hostname], > window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id, > hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, > EXPR$0 AS cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink > (1/1) (09cee06206ad355b327cb8487773cd39) switched from RUNNING to > FINISHED. 2021-01-15 16:52:08,465 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job collect > (84c9f12fe943bc7f32ee637666ed3bc1) switched from state RUNNING to > FINISHED. 2021-01-15 16:52:08,466 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping > checkpoint coordinator for job 84c9f12fe943bc7f32ee637666ed3bc1. > 2021-01-15 16:52:08,466 INFO > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore > [] - Shutting down 2021-01-15 16:52:08,466 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job > 84c9f12fe943bc7f32ee637666ed3bc1 reached globally terminal state > FINISHED. 2021-01-15 16:52:08,467 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the JobMaster > for job collect(84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 > 16:52:08,467 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Suspending > SlotPool. 2021-01-15 16:52:08,468 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Close ResourceManager > connection a76a3c5321498f13d0552421928c6062: Stopping JobMaster for job > collect(84c9f12fe943bc7f32ee637666ed3bc1).. 2021-01-15 16:52:08,468 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Stopping > SlotPool. 2021-01-15 16:52:08,468 INFO > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - > Disconnect job manager > [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68 > for job 84c9f12fe943bc7f32ee637666ed3bc1 from the resource manager. | > > On Fri, Jan 15, 2021 at 11:32 AM Timo Walther [hidden email] > <http://mailto:twalthr@...> wrote: > > Hi Robert, > > could you send us the error/stacktrace that is printed? > > An example how it should work is shown here: > > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java > <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java> > > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java > <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java> > > Regards, > Timo > > On 15.01.21 16:46, Robert Cullen wrote: > > I’m using a query to gather results: > > > > |Table log_counts = tEnv.from("log_counts") > > .filter($("hostname").isNotNull() > .and($("hostname").isNotEqual(""))) > > .window(Tumble .over(lit(30).seconds()) > .on($("last_updated")).as("w")) > > .groupBy($("msg_id"), $("hostname"), $("w")) .select($("msg_id"), > > $("hostname"), $("msg_id").count().as("cnt")); | > > > > I’d like to iterate over the results but using this causes an error: > > > > |log_counts.execute().collect(); > > | > > > > Has there been an update in how to traverse results? > > > > > > Robert Cullen > > 240-475-4490 > > > > -- > Robert Cullen > 240-475-4490 |
maybe Godfrey in CC knows more?
On 15.01.21 18:10, Timo Walther wrote: > How are you running the Flink cluster? What is your deplyment? > > The exception clearly indicates that you found a bug. Could you open an > ticket in Flink's JIRA? We need details how to reproduce it. > > Thanks, > Timo > > > On 15.01.21 17:57, Robert Cullen wrote: >> Timo, >> >> Here it is: >> >> |2021-01-15 16:52:00,628 WARN >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - >> Configuring the job submission via query parameters is deprecated. >> Please migrate to submitting a JSON request instead. 2021-01-15 >> 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - Starting >> program (detached: true) 2021-01-15 16:52:00,678 INFO >> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor >> [] - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted. 2021-01-15 >> 16:52:00,678 INFO >> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor >> [] - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1. >> 2021-01-15 16:52:00,830 INFO >> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received >> JobGraph submission 84c9f12fe943bc7f32ee637666ed3bc1 (collect). >> 2021-01-15 16:52:00,830 INFO >> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - >> Submitting job 84c9f12fe943bc7f32ee637666ed3bc1 (collect). 2021-01-15 >> 16:52:00,831 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] >> - Starting RPC endpoint for >> org.apache.flink.runtime.jobmaster.JobMaster at >> akka://flink/user/rpc/jobmanager_68 . 2021-01-15 16:52:00,831 INFO >> org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job >> collect (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 16:52:00,832 >> INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart >> back off time strategy NoRestartBackoffTimeStrategy for collect >> (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 16:52:00,832 INFO >> org.apache.flink.runtime.jobmaster.JobMaster [] - Running >> initialization on master for job collect >> (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 16:52:00,836 INFO >> org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully ran >> initialization on master in 3 ms. 2021-01-15 16:52:00,836 INFO >> org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] >> - Built 1 pipelined regions in 0 ms 2021-01-15 16:52:00,836 INFO >> org.apache.flink.runtime.jobmaster.JobMaster [] - Using job/cluster >> config to configure application-defined state backend: File State >> Backend (checkpoints: 's3://flink/checkpoints', savepoints: >> 's3://flink/savepoints', asynchronous: TRUE, fileStateThreshold: >> 20480) 2021-01-15 16:52:00,836 INFO >> org.apache.flink.runtime.jobmaster.JobMaster [] - Using >> application-defined state backend: File State Backend (checkpoints: >> 's3://flink/checkpoints', savepoints: 's3://flink/savepoints', >> asynchronous: TRUE, fileStateThreshold: 20480) 2021-01-15 16:52:06,865 >> INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No >> checkpoint found during restore. 2021-01-15 16:52:06,866 INFO >> org.apache.flink.runtime.jobmaster.JobMaster [] - Using failover >> strategy >> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@39b9ad24 >> for collect (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 >> 16:52:06,866 INFO >> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl [] - >> JobManager runner for job collect (84c9f12fe943bc7f32ee637666ed3bc1) >> was granted leadership with session id >> 00000000-0000-0000-0000-000000000000 at >> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_68. >> 2021-01-15 16:52:06,866 INFO >> org.apache.flink.runtime.jobmaster.JobMaster [] - Starting execution >> of job collect (84c9f12fe943bc7f32ee637666ed3bc1) under job master id >> 00000000000000000000000000000000. 2021-01-15 16:52:06,866 INFO >> org.apache.flink.runtime.jobmaster.JobMaster [] - Starting scheduling >> with scheduling strategy >> [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy] >> 2021-01-15 16:52:06,866 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job >> collect (84c9f12fe943bc7f32ee637666ed3bc1) switched from state CREATED >> to RUNNING. 2021-01-15 16:52:06,866 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: >> JdbcTableSource(msg_id, hostname, last_updated) -> >> SourceConversion(table=[default_catalog.default_database.log_counts, >> source: [JdbcTableSource(msg_id, hostname, last_updated)]], >> fields=[msg_id, hostname, last_updated]) -> >> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated - >> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, >> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1) >> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from CREATED to SCHEDULED. >> 2021-01-15 16:52:06,866 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - >> GroupWindowAggregate(groupBy=[msg_id, hostname], >> window=[TumblingGroupWindow('w, last_updated, 300000)], >> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) -> >> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) -> >> SinkConversionToTuple2 -> Sink: Select table sink (1/1) >> (09cee06206ad355b327cb8487773cd39) switched from CREATED to SCHEDULED. >> 2021-01-15 16:52:06,866 INFO >> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Cannot >> serve slot request, no ResourceManager connected. Adding as pending >> request [SlotRequestId{b3f1b6b4eaab427305ec15f92a39531e}] 2021-01-15 >> 16:52:06,866 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - >> Connecting to ResourceManager >> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000) >> 2021-01-15 16:52:06,867 INFO >> org.apache.flink.runtime.jobmaster.JobMaster [] - Resolved >> ResourceManager address, beginning registration 2021-01-15 >> 16:52:06,867 INFO >> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] >> - Registering job manager >> [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68 >> for job 84c9f12fe943bc7f32ee637666ed3bc1. 2021-01-15 16:52:06,867 INFO >> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] >> - Registered job manager >> [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68 >> for job 84c9f12fe943bc7f32ee637666ed3bc1. 2021-01-15 16:52:06,867 INFO >> org.apache.flink.runtime.jobmaster.JobMaster [] - JobManager >> successfully registered at ResourceManager, leader id: >> 00000000000000000000000000000000. 2021-01-15 16:52:06,867 INFO >> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - >> Requesting new slot [SlotRequestId{b3f1b6b4eaab427305ec15f92a39531e}] >> and profile ResourceProfile{UNKNOWN} with allocation id >> b0559997a428b1d31d9e57d6532e026b from resource manager. 2021-01-15 >> 16:52:06,868 INFO >> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] >> - Request slot with profile ResourceProfile{UNKNOWN} for job >> 84c9f12fe943bc7f32ee637666ed3bc1 with allocation id >> b0559997a428b1d31d9e57d6532e026b. 2021-01-15 16:52:06,874 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: >> JdbcTableSource(msg_id, hostname, last_updated) -> >> SourceConversion(table=[default_catalog.default_database.log_counts, >> source: [JdbcTableSource(msg_id, hostname, last_updated)]], >> fields=[msg_id, hostname, last_updated]) -> >> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated - >> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, >> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1) >> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from SCHEDULED to >> DEPLOYING. 2021-01-15 16:52:06,882 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying >> Source: JdbcTableSource(msg_id, hostname, last_updated) -> >> SourceConversion(table=[default_catalog.default_database.log_counts, >> source: [JdbcTableSource(msg_id, hostname, last_updated)]], >> fields=[msg_id, hostname, last_updated]) -> >> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated - >> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, >> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1) (attempt #0) >> with attempt id d22b3ac56f07e182ba5b74d68fa74fb1 to >> 10.42.1.148:6122-9b9553 @ 10.42.1.148 (dataPort=40391) with allocation >> id b0559997a428b1d31d9e57d6532e026b 2021-01-15 16:52:06,883 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - >> GroupWindowAggregate(groupBy=[msg_id, hostname], >> window=[TumblingGroupWindow('w, last_updated, 300000)], >> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) -> >> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) -> >> SinkConversionToTuple2 -> Sink: Select table sink (1/1) >> (09cee06206ad355b327cb8487773cd39) switched from SCHEDULED to >> DEPLOYING. 2021-01-15 16:52:06,883 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying >> GroupWindowAggregate(groupBy=[msg_id, hostname], >> window=[TumblingGroupWindow('w, last_updated, 300000)], >> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) -> >> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) -> >> SinkConversionToTuple2 -> Sink: Select table sink (1/1) (attempt #0) >> with attempt id 09cee06206ad355b327cb8487773cd39 to >> 10.42.1.148:6122-9b9553 @ 10.42.1.148 (dataPort=40391) with allocation >> id b0559997a428b1d31d9e57d6532e026b 2021-01-15 16:52:07,038 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - >> GroupWindowAggregate(groupBy=[msg_id, hostname], >> window=[TumblingGroupWindow('w, last_updated, 300000)], >> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) -> >> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) -> >> SinkConversionToTuple2 -> Sink: Select table sink (1/1) >> (09cee06206ad355b327cb8487773cd39) switched from DEPLOYING to RUNNING. >> 2021-01-15 16:52:07,038 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: >> JdbcTableSource(msg_id, hostname, last_updated) -> >> SourceConversion(table=[default_catalog.default_database.log_counts, >> source: [JdbcTableSource(msg_id, hostname, last_updated)]], >> fields=[msg_id, hostname, last_updated]) -> >> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated - >> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, >> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1) >> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from DEPLOYING to RUNNING. >> 2021-01-15 16:52:07,057 INFO >> org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator >> [] - Received sink socket server address: /10.42.1.148:39303 >> <http://10.42.1.148:39303> 2021-01-15 16:52:07,060 WARN >> org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No >> hostname could be resolved for the IP address 10.42.1.148, using IP >> address as host name. Local input split assignment (such as for HDFS >> files) may be impacted. 2021-01-15 16:52:07,988 WARN >> org.apache.flink.client.deployment.application.DetachedApplicationRunner >> [] - Could not execute application: >> org.apache.flink.client.program.ProgramInvocationException: The main >> method caused an error: Failed to execute sql at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330) >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at >> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78) >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at >> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67) >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100) >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at >> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) >> [?:1.8.0_275] at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_275] >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> [?:1.8.0_275] at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >> [?:1.8.0_275] at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >> [?:1.8.0_275] at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> [?:1.8.0_275] at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> [?:1.8.0_275] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275] >> Caused by: org.apache.flink.table.api.TableException: Failed to >> execute sql at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:719) >> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at >> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570) >> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at >> io.cmdaa.streaming.java.jdbc.TableConsumer.main(TableConsumer.java:192) ~[?:?] >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> ~[?:1.8.0_275] at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> ~[?:1.8.0_275] at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> ~[?:1.8.0_275] at java.lang.reflect.Method.invoke(Method.java:498) >> ~[?:1.8.0_275] at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316) >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] ... 13 more Caused by: >> java.lang.IllegalArgumentException: Job client must be a >> CoordinationRequestGateway. This is a bug. at >> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:142) >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at >> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.setJobClient(CollectResultFetcher.java:95) >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at >> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.setJobClient(CollectResultIterator.java:98) >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at >> org.apache.flink.table.planner.sinks.SelectTableSinkBase$1.setJobClient(SelectTableSinkBase.java:93) >> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:709) >> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at >> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570) >> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at >> io.cmdaa.streaming.java.jdbc.TableConsumer.main(TableConsumer.java:192) ~[?:?] >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> ~[?:1.8.0_275] at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> ~[?:1.8.0_275] at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> ~[?:1.8.0_275] at java.lang.reflect.Method.invoke(Method.java:498) >> ~[?:1.8.0_275] at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316) >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] ... 13 more 2021-01-15 >> 16:52:07,989 ERROR >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - >> Exception occurred in REST handler: Could not execute application. >> 2021-01-15 16:52:08,462 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: >> JdbcTableSource(msg_id, hostname, last_updated) -> >> SourceConversion(table=[default_catalog.default_database.log_counts, >> source: [JdbcTableSource(msg_id, hostname, last_updated)]], >> fields=[msg_id, hostname, last_updated]) -> >> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated - >> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, >> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1) >> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from RUNNING to FINISHED. >> 2021-01-15 16:52:08,465 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - >> GroupWindowAggregate(groupBy=[msg_id, hostname], >> window=[TumblingGroupWindow('w, last_updated, 300000)], >> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) -> >> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) -> >> SinkConversionToTuple2 -> Sink: Select table sink (1/1) >> (09cee06206ad355b327cb8487773cd39) switched from RUNNING to FINISHED. >> 2021-01-15 16:52:08,465 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job >> collect (84c9f12fe943bc7f32ee637666ed3bc1) switched from state RUNNING >> to FINISHED. 2021-01-15 16:52:08,466 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - >> Stopping checkpoint coordinator for job >> 84c9f12fe943bc7f32ee637666ed3bc1. 2021-01-15 16:52:08,466 INFO >> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore >> [] - Shutting down 2021-01-15 16:52:08,466 INFO >> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job >> 84c9f12fe943bc7f32ee637666ed3bc1 reached globally terminal state >> FINISHED. 2021-01-15 16:52:08,467 INFO >> org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the >> JobMaster for job collect(84c9f12fe943bc7f32ee637666ed3bc1). >> 2021-01-15 16:52:08,467 INFO >> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - >> Suspending SlotPool. 2021-01-15 16:52:08,468 INFO >> org.apache.flink.runtime.jobmaster.JobMaster [] - Close >> ResourceManager connection a76a3c5321498f13d0552421928c6062: Stopping >> JobMaster for job collect(84c9f12fe943bc7f32ee637666ed3bc1).. >> 2021-01-15 16:52:08,468 INFO >> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Stopping >> SlotPool. 2021-01-15 16:52:08,468 INFO >> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] >> - Disconnect job manager >> [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68 >> for job 84c9f12fe943bc7f32ee637666ed3bc1 from the resource manager. | >> >> On Fri, Jan 15, 2021 at 11:32 AM Timo Walther [hidden email] >> <http://mailto:twalthr@...> wrote: >> >> Hi Robert, >> >> could you send us the error/stacktrace that is printed? >> >> An example how it should work is shown here: >> >> >> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java >> >> >> <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java> >> >> >> >> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java >> >> >> <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java> >> >> >> Regards, >> Timo >> >> On 15.01.21 16:46, Robert Cullen wrote: >> > I’m using a query to gather results: >> > >> > |Table log_counts = tEnv.from("log_counts") >> > .filter($("hostname").isNotNull() >> .and($("hostname").isNotEqual(""))) >> > .window(Tumble .over(lit(30).seconds()) >> .on($("last_updated")).as("w")) >> > .groupBy($("msg_id"), $("hostname"), $("w")) .select($("msg_id"), >> > $("hostname"), $("msg_id").count().as("cnt")); | >> > >> > I’d like to iterate over the results but using this causes an >> error: >> > >> > |log_counts.execute().collect(); >> > | >> > >> > Has there been an update in how to traverse results? >> > >> > >> > Robert Cullen >> > 240-475-4490 >> > >> >> -- >> Robert Cullen >> 240-475-4490 > |
I'm running Flink (1.12.0) on a Kubernetes standalone deployment (Kubernetes Cluster Session Mode). What's the link to Flink's JIRA? On Fri, Jan 15, 2021 at 12:19 PM Timo Walther <[hidden email]> wrote: maybe Godfrey in CC knows more? -- Robert Cullen 240-475-4490 |
Thanks for opening an issue
https://issues.apache.org/jira/browse/FLINK-20995 Feel free to ping me again in the issue if nobody responds. Thanks, Timo On 15.01.21 18:21, Robert Cullen wrote: > I'm running Flink (1.12.0) on a Kubernetes standalone deployment > (Kubernetes Cluster Session Mode). > > What's the link to Flink's JIRA? > > On Fri, Jan 15, 2021 at 12:19 PM Timo Walther <[hidden email] > <mailto:[hidden email]>> wrote: > > maybe Godfrey in CC knows more? > > > On 15.01.21 18:10, Timo Walther wrote: > > How are you running the Flink cluster? What is your deplyment? > > > > The exception clearly indicates that you found a bug. Could you > open an > > ticket in Flink's JIRA? We need details how to reproduce it. > > > > Thanks, > > Timo > > > > > > On 15.01.21 17:57, Robert Cullen wrote: > >> Timo, > >> > >> Here it is: > >> > >> |2021-01-15 16:52:00,628 WARN > >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > >> Configuring the job submission via query parameters is deprecated. > >> Please migrate to submitting a JSON request instead. 2021-01-15 > >> 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - Starting > >> program (detached: true) 2021-01-15 16:52:00,678 INFO > >> > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor > > >> [] - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted. 2021-01-15 > >> 16:52:00,678 INFO > >> > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor > > >> [] - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1. > >> 2021-01-15 16:52:00,830 INFO > >> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - > Received > >> JobGraph submission 84c9f12fe943bc7f32ee637666ed3bc1 (collect). > >> 2021-01-15 16:52:00,830 INFO > >> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - > >> Submitting job 84c9f12fe943bc7f32ee637666ed3bc1 (collect). > 2021-01-15 > >> 16:52:00,831 INFO > org.apache.flink.runtime.rpc.akka.AkkaRpcService [] > >> - Starting RPC endpoint for > >> org.apache.flink.runtime.jobmaster.JobMaster at > >> akka://flink/user/rpc/jobmanager_68 . 2021-01-15 16:52:00,831 INFO > >> org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job > >> collect (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 16:52:00,832 > >> INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using > restart > >> back off time strategy NoRestartBackoffTimeStrategy for collect > >> (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 16:52:00,832 INFO > >> org.apache.flink.runtime.jobmaster.JobMaster [] - Running > >> initialization on master for job collect > >> (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 16:52:00,836 INFO > >> org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully ran > >> initialization on master in 3 ms. 2021-01-15 16:52:00,836 INFO > >> > org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] > >> - Built 1 pipelined regions in 0 ms 2021-01-15 16:52:00,836 INFO > >> org.apache.flink.runtime.jobmaster.JobMaster [] - Using job/cluster > >> config to configure application-defined state backend: File State > >> Backend (checkpoints: 's3://flink/checkpoints', savepoints: > >> 's3://flink/savepoints', asynchronous: TRUE, fileStateThreshold: > >> 20480) 2021-01-15 16:52:00,836 INFO > >> org.apache.flink.runtime.jobmaster.JobMaster [] - Using > >> application-defined state backend: File State Backend (checkpoints: > >> 's3://flink/checkpoints', savepoints: 's3://flink/savepoints', > >> asynchronous: TRUE, fileStateThreshold: 20480) 2021-01-15 > 16:52:06,865 > >> INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator > [] - No > >> checkpoint found during restore. 2021-01-15 16:52:06,866 INFO > >> org.apache.flink.runtime.jobmaster.JobMaster [] - Using failover > >> strategy > >> > org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@39b9ad24 > > >> for collect (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 > >> 16:52:06,866 INFO > >> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl [] - > >> JobManager runner for job collect > (84c9f12fe943bc7f32ee637666ed3bc1) > >> was granted leadership with session id > >> 00000000-0000-0000-0000-000000000000 at > >> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_68. > >> 2021-01-15 16:52:06,866 INFO > >> org.apache.flink.runtime.jobmaster.JobMaster [] - Starting > execution > >> of job collect (84c9f12fe943bc7f32ee637666ed3bc1) under job > master id > >> 00000000000000000000000000000000. 2021-01-15 16:52:06,866 INFO > >> org.apache.flink.runtime.jobmaster.JobMaster [] - Starting > scheduling > >> with scheduling strategy > >> > [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy] > > >> 2021-01-15 16:52:06,866 INFO > >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job > >> collect (84c9f12fe943bc7f32ee637666ed3bc1) switched from state > CREATED > >> to RUNNING. 2021-01-15 16:52:06,866 INFO > >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > >> JdbcTableSource(msg_id, hostname, last_updated) -> > >> > SourceConversion(table=[default_catalog.default_database.log_counts, > >> source: [JdbcTableSource(msg_id, hostname, last_updated)]], > >> fields=[msg_id, hostname, last_updated]) -> > >> WatermarkAssigner(rowtime=[last_updated], > watermark=[(last_updated - > >> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, > >> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1) > >> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from CREATED to > SCHEDULED. > >> 2021-01-15 16:52:06,866 INFO > >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > >> GroupWindowAggregate(groupBy=[msg_id, hostname], > >> window=[TumblingGroupWindow('w, last_updated, 300000)], > >> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) -> > >> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) -> > >> SinkConversionToTuple2 -> Sink: Select table sink (1/1) > >> (09cee06206ad355b327cb8487773cd39) switched from CREATED to > SCHEDULED. > >> 2021-01-15 16:52:06,866 INFO > >> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - > Cannot > >> serve slot request, no ResourceManager connected. Adding as pending > >> request [SlotRequestId{b3f1b6b4eaab427305ec15f92a39531e}] > 2021-01-15 > >> 16:52:06,866 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - > >> Connecting to ResourceManager > >> > akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000) > > >> 2021-01-15 16:52:06,867 INFO > >> org.apache.flink.runtime.jobmaster.JobMaster [] - Resolved > >> ResourceManager address, beginning registration 2021-01-15 > >> 16:52:06,867 INFO > >> > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] > >> - Registering job manager > >> > [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68 > > >> for job 84c9f12fe943bc7f32ee637666ed3bc1. 2021-01-15 > 16:52:06,867 INFO > >> > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] > >> - Registered job manager > >> > [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68 > > >> for job 84c9f12fe943bc7f32ee637666ed3bc1. 2021-01-15 > 16:52:06,867 INFO > >> org.apache.flink.runtime.jobmaster.JobMaster [] - JobManager > >> successfully registered at ResourceManager, leader id: > >> 00000000000000000000000000000000. 2021-01-15 16:52:06,867 INFO > >> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - > >> Requesting new slot > [SlotRequestId{b3f1b6b4eaab427305ec15f92a39531e}] > >> and profile ResourceProfile{UNKNOWN} with allocation id > >> b0559997a428b1d31d9e57d6532e026b from resource manager. 2021-01-15 > >> 16:52:06,868 INFO > >> > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] > >> - Request slot with profile ResourceProfile{UNKNOWN} for job > >> 84c9f12fe943bc7f32ee637666ed3bc1 with allocation id > >> b0559997a428b1d31d9e57d6532e026b. 2021-01-15 16:52:06,874 INFO > >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > >> JdbcTableSource(msg_id, hostname, last_updated) -> > >> > SourceConversion(table=[default_catalog.default_database.log_counts, > >> source: [JdbcTableSource(msg_id, hostname, last_updated)]], > >> fields=[msg_id, hostname, last_updated]) -> > >> WatermarkAssigner(rowtime=[last_updated], > watermark=[(last_updated - > >> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, > >> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1) > >> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from SCHEDULED to > >> DEPLOYING. 2021-01-15 16:52:06,882 INFO > >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > Deploying > >> Source: JdbcTableSource(msg_id, hostname, last_updated) -> > >> > SourceConversion(table=[default_catalog.default_database.log_counts, > >> source: [JdbcTableSource(msg_id, hostname, last_updated)]], > >> fields=[msg_id, hostname, last_updated]) -> > >> WatermarkAssigner(rowtime=[last_updated], > watermark=[(last_updated - > >> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, > >> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1) (attempt > #0) > >> with attempt id d22b3ac56f07e182ba5b74d68fa74fb1 to > >> 10.42.1.148:6122-9b9553 @ 10.42.1.148 (dataPort=40391) with > allocation > >> id b0559997a428b1d31d9e57d6532e026b 2021-01-15 16:52:06,883 INFO > >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > >> GroupWindowAggregate(groupBy=[msg_id, hostname], > >> window=[TumblingGroupWindow('w, last_updated, 300000)], > >> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) -> > >> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) -> > >> SinkConversionToTuple2 -> Sink: Select table sink (1/1) > >> (09cee06206ad355b327cb8487773cd39) switched from SCHEDULED to > >> DEPLOYING. 2021-01-15 16:52:06,883 INFO > >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > Deploying > >> GroupWindowAggregate(groupBy=[msg_id, hostname], > >> window=[TumblingGroupWindow('w, last_updated, 300000)], > >> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) -> > >> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) -> > >> SinkConversionToTuple2 -> Sink: Select table sink (1/1) (attempt > #0) > >> with attempt id 09cee06206ad355b327cb8487773cd39 to > >> 10.42.1.148:6122-9b9553 @ 10.42.1.148 (dataPort=40391) with > allocation > >> id b0559997a428b1d31d9e57d6532e026b 2021-01-15 16:52:07,038 INFO > >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > >> GroupWindowAggregate(groupBy=[msg_id, hostname], > >> window=[TumblingGroupWindow('w, last_updated, 300000)], > >> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) -> > >> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) -> > >> SinkConversionToTuple2 -> Sink: Select table sink (1/1) > >> (09cee06206ad355b327cb8487773cd39) switched from DEPLOYING to > RUNNING. > >> 2021-01-15 16:52:07,038 INFO > >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > >> JdbcTableSource(msg_id, hostname, last_updated) -> > >> > SourceConversion(table=[default_catalog.default_database.log_counts, > >> source: [JdbcTableSource(msg_id, hostname, last_updated)]], > >> fields=[msg_id, hostname, last_updated]) -> > >> WatermarkAssigner(rowtime=[last_updated], > watermark=[(last_updated - > >> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, > >> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1) > >> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from DEPLOYING to > RUNNING. > >> 2021-01-15 16:52:07,057 INFO > >> > org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator > > >> [] - Received sink socket server address: /10.42.1.148:39303 > <http://10.42.1.148:39303> > >> <http://10.42.1.148:39303 <http://10.42.1.148:39303>> 2021-01-15 > 16:52:07,060 WARN > >> org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No > >> hostname could be resolved for the IP address 10.42.1.148, using IP > >> address as host name. Local input split assignment (such as for > HDFS > >> files) may be impacted. 2021-01-15 16:52:07,988 WARN > >> > org.apache.flink.client.deployment.application.DetachedApplicationRunner > > >> [] - Could not execute application: > >> org.apache.flink.client.program.ProgramInvocationException: The > main > >> method caused an error: Failed to execute sql at > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330) > > >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at > >> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > > >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at > >> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > > >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at > >> > org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78) > > >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at > >> > org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67) > > >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at > >> > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100) > > >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at > >> > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) > > >> [?:1.8.0_275] at > >> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [?:1.8.0_275] > >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) > >> [?:1.8.0_275] at > >> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > > >> [?:1.8.0_275] at > >> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > > >> [?:1.8.0_275] at > >> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > >> [?:1.8.0_275] at > >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > >> [?:1.8.0_275] at java.lang.Thread.run(Thread.java:748) > [?:1.8.0_275] > >> Caused by: org.apache.flink.table.api.TableException: Failed to > >> execute sql at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:719) > > >> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at > >> > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570) > > >> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at > >> > io.cmdaa.streaming.java.jdbc.TableConsumer.main(TableConsumer.java:192) > ~[?:?] > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >> ~[?:1.8.0_275] at > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > >> ~[?:1.8.0_275] at > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > >> ~[?:1.8.0_275] at java.lang.reflect.Method.invoke(Method.java:498) > >> ~[?:1.8.0_275] at > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316) > > >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] ... 13 more Caused by: > >> java.lang.IllegalArgumentException: Job client must be a > >> CoordinationRequestGateway. This is a bug. at > >> > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:142) > > >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at > >> > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.setJobClient(CollectResultFetcher.java:95) > > >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at > >> > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.setJobClient(CollectResultIterator.java:98) > > >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at > >> > org.apache.flink.table.planner.sinks.SelectTableSinkBase$1.setJobClient(SelectTableSinkBase.java:93) > > >> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:709) > > >> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at > >> > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570) > > >> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at > >> > io.cmdaa.streaming.java.jdbc.TableConsumer.main(TableConsumer.java:192) > ~[?:?] > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >> ~[?:1.8.0_275] at > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > >> ~[?:1.8.0_275] at > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > >> ~[?:1.8.0_275] at java.lang.reflect.Method.invoke(Method.java:498) > >> ~[?:1.8.0_275] at > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316) > > >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] ... 13 more 2021-01-15 > >> 16:52:07,989 ERROR > >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > >> Exception occurred in REST handler: Could not execute application. > >> 2021-01-15 16:52:08,462 INFO > >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > >> JdbcTableSource(msg_id, hostname, last_updated) -> > >> > SourceConversion(table=[default_catalog.default_database.log_counts, > >> source: [JdbcTableSource(msg_id, hostname, last_updated)]], > >> fields=[msg_id, hostname, last_updated]) -> > >> WatermarkAssigner(rowtime=[last_updated], > watermark=[(last_updated - > >> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, > >> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1) > >> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from RUNNING to > FINISHED. > >> 2021-01-15 16:52:08,465 INFO > >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > >> GroupWindowAggregate(groupBy=[msg_id, hostname], > >> window=[TumblingGroupWindow('w, last_updated, 300000)], > >> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) -> > >> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) -> > >> SinkConversionToTuple2 -> Sink: Select table sink (1/1) > >> (09cee06206ad355b327cb8487773cd39) switched from RUNNING to > FINISHED. > >> 2021-01-15 16:52:08,465 INFO > >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job > >> collect (84c9f12fe943bc7f32ee637666ed3bc1) switched from state > RUNNING > >> to FINISHED. 2021-01-15 16:52:08,466 INFO > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - > >> Stopping checkpoint coordinator for job > >> 84c9f12fe943bc7f32ee637666ed3bc1. 2021-01-15 16:52:08,466 INFO > >> > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore > >> [] - Shutting down 2021-01-15 16:52:08,466 INFO > >> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job > >> 84c9f12fe943bc7f32ee637666ed3bc1 reached globally terminal state > >> FINISHED. 2021-01-15 16:52:08,467 INFO > >> org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the > >> JobMaster for job collect(84c9f12fe943bc7f32ee637666ed3bc1). > >> 2021-01-15 16:52:08,467 INFO > >> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - > >> Suspending SlotPool. 2021-01-15 16:52:08,468 INFO > >> org.apache.flink.runtime.jobmaster.JobMaster [] - Close > >> ResourceManager connection a76a3c5321498f13d0552421928c6062: > Stopping > >> JobMaster for job collect(84c9f12fe943bc7f32ee637666ed3bc1).. > >> 2021-01-15 16:52:08,468 INFO > >> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - > Stopping > >> SlotPool. 2021-01-15 16:52:08,468 INFO > >> > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] > >> - Disconnect job manager > >> > [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68 > > >> for job 84c9f12fe943bc7f32ee637666ed3bc1 from the resource > manager. | > >> > >> On Fri, Jan 15, 2021 at 11:32 AM Timo Walther [hidden email] > <mailto:[hidden email]> > >> <http://mailto:twalthr@... > <http://mailto:twalthr@...>> wrote: > >> > >> Hi Robert, > >> > >> could you send us the error/stacktrace that is printed? > >> > >> An example how it should work is shown here: > >> > >> > >> > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java > <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java> > > >> > >> > >> > <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java > <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java>> > > >> > >> > >> > >> > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java > <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java> > > >> > >> > >> > <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java > <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java>> > > >> > >> > >> Regards, > >> Timo > >> > >> On 15.01.21 16:46, Robert Cullen wrote: > >> > I’m using a query to gather results: > >> > > >> > |Table log_counts = tEnv.from("log_counts") > >> > .filter($("hostname").isNotNull() > >> .and($("hostname").isNotEqual(""))) > >> > .window(Tumble .over(lit(30).seconds()) > >> .on($("last_updated")).as("w")) > >> > .groupBy($("msg_id"), $("hostname"), $("w")) > .select($("msg_id"), > >> > $("hostname"), $("msg_id").count().as("cnt")); | > >> > > >> > I’d like to iterate over the results but using this > causes an > >> error: > >> > > >> > |log_counts.execute().collect(); > >> > | > >> > > >> > Has there been an update in how to traverse results? > >> > > >> > > >> > Robert Cullen > >> > 240-475-4490 > >> > > >> > >> -- > >> Robert Cullen > >> 240-475-4490 > > > > > > -- > Robert Cullen > 240-475-4490 |
Free forum by Nabble | Edit this page |