How to Iterate over results in Table API version 1.12.0

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

How to Iterate over results in Table API version 1.12.0

Robert Cullen

I’m using a query to gather results:

Table log_counts = tEnv.from("log_counts")
        .filter($("hostname").isNotNull()
                .and($("hostname").isNotEqual("")))
        .window(Tumble
                .over(lit(30).seconds())
                .on($("last_updated")).as("w"))
        .groupBy($("msg_id"), $("hostname"), $("w"))
        .select($("msg_id"),
                $("hostname"),
                $("msg_id").count().as("cnt"));

I’d like to iterate over the results but using this causes an error:

log_counts.execute().collect();

Has there been an update in how to traverse results?


Robert Cullen
240-475-4490

Reply | Threaded
Open this post in threaded view
|

Re: How to Iterate over results in Table API version 1.12.0

Timo Walther
Hi Robert,

could you send us the error/stacktrace that is printed?

An example how it should work is shown here:

https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java

https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java

Regards,
Timo

On 15.01.21 16:46, Robert Cullen wrote:

> I’m using a query to gather results:
>
> |Table log_counts = tEnv.from("log_counts")
> .filter($("hostname").isNotNull() .and($("hostname").isNotEqual("")))
> .window(Tumble .over(lit(30).seconds()) .on($("last_updated")).as("w"))
> .groupBy($("msg_id"), $("hostname"), $("w")) .select($("msg_id"),
> $("hostname"), $("msg_id").count().as("cnt")); |
>
> I’d like to iterate over the results but using this causes an error:
>
> |log_counts.execute().collect();
> |
>
> Has there been an update in how to traverse results?
>
>
> Robert Cullen
> 240-475-4490
>

Reply | Threaded
Open this post in threaded view
|

Re: How to Iterate over results in Table API version 1.12.0

Robert Cullen

Timo,

Here it is:

2021-01-15 16:52:00,628 WARN  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
2021-01-15 16:52:00,640 INFO  org.apache.flink.client.ClientUtils                          [] - Starting program (detached: true)
2021-01-15 16:52:00,678 INFO  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted.
2021-01-15 16:52:00,678 INFO  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1.
2021-01-15 16:52:00,830 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received JobGraph submission 84c9f12fe943bc7f32ee637666ed3bc1 (collect).
2021-01-15 16:52:00,830 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Submitting job 84c9f12fe943bc7f32ee637666ed3bc1 (collect).
2021-01-15 16:52:00,831 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_68 .
2021-01-15 16:52:00,831 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Initializing job collect (84c9f12fe943bc7f32ee637666ed3bc1).
2021-01-15 16:52:00,832 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using restart back off time strategy NoRestartBackoffTimeStrategy for collect (84c9f12fe943bc7f32ee637666ed3bc1).
2021-01-15 16:52:00,832 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Running initialization on master for job collect (84c9f12fe943bc7f32ee637666ed3bc1).
2021-01-15 16:52:00,836 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Successfully ran initialization on master in 3 ms.
2021-01-15 16:52:00,836 INFO  org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 pipelined regions in 0 ms
2021-01-15 16:52:00,836 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using job/cluster config to configure application-defined state backend: File State Backend (checkpoints: 's3://flink/checkpoints', savepoints: 's3://flink/savepoints', asynchronous: TRUE, fileStateThreshold: 20480)
2021-01-15 16:52:00,836 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using application-defined state backend: File State Backend (checkpoints: 's3://flink/checkpoints', savepoints: 's3://flink/savepoints', asynchronous: TRUE, fileStateThreshold: 20480)
2021-01-15 16:52:06,865 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No checkpoint found during restore.
2021-01-15 16:52:06,866 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@39b9ad24 for collect (84c9f12fe943bc7f32ee637666ed3bc1).
2021-01-15 16:52:06,866 INFO  org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl      [] - JobManager runner for job collect (84c9f12fe943bc7f32ee637666ed3bc1) was granted leadership with session id 00000000-0000-0000-0000-000000000000 at akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_68.
2021-01-15 16:52:06,866 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting execution of job collect (84c9f12fe943bc7f32ee637666ed3bc1) under job master id 00000000000000000000000000000000.
2021-01-15 16:52:06,866 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
2021-01-15 16:52:06,866 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job collect (84c9f12fe943bc7f32ee637666ed3bc1) switched from state CREATED to RUNNING.
2021-01-15 16:52:06,866 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: JdbcTableSource(msg_id, hostname, last_updated) -> SourceConversion(table=[default_catalog.default_database.log_counts, source: [JdbcTableSource(msg_id, hostname, last_updated)]], fields=[msg_id, hostname, last_updated]) -> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated - 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1) (d22b3ac56f07e182ba5b74d68fa74fb1) switched from CREATED to SCHEDULED.
2021-01-15 16:52:06,866 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - GroupWindowAggregate(groupBy=[msg_id, hostname], window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink (1/1) (09cee06206ad355b327cb8487773cd39) switched from CREATED to SCHEDULED.
2021-01-15 16:52:06,866 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     [] - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{b3f1b6b4eaab427305ec15f92a39531e}]
2021-01-15 16:52:06,866 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Connecting to ResourceManager akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)
2021-01-15 16:52:06,867 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Resolved ResourceManager address, beginning registration
2021-01-15 16:52:06,867 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registering job manager [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68 for job 84c9f12fe943bc7f32ee637666ed3bc1.
2021-01-15 16:52:06,867 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registered job manager [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68 for job 84c9f12fe943bc7f32ee637666ed3bc1.
2021-01-15 16:52:06,867 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000.
2021-01-15 16:52:06,867 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     [] - Requesting new slot [SlotRequestId{b3f1b6b4eaab427305ec15f92a39531e}] and profile ResourceProfile{UNKNOWN} with allocation id b0559997a428b1d31d9e57d6532e026b from resource manager.
2021-01-15 16:52:06,868 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Request slot with profile ResourceProfile{UNKNOWN} for job 84c9f12fe943bc7f32ee637666ed3bc1 with allocation id b0559997a428b1d31d9e57d6532e026b.
2021-01-15 16:52:06,874 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: JdbcTableSource(msg_id, hostname, last_updated) -> SourceConversion(table=[default_catalog.default_database.log_counts, source: [JdbcTableSource(msg_id, hostname, last_updated)]], fields=[msg_id, hostname, last_updated]) -> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated - 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1) (d22b3ac56f07e182ba5b74d68fa74fb1) switched from SCHEDULED to DEPLOYING.
2021-01-15 16:52:06,882 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying Source: JdbcTableSource(msg_id, hostname, last_updated) -> SourceConversion(table=[default_catalog.default_database.log_counts, source: [JdbcTableSource(msg_id, hostname, last_updated)]], fields=[msg_id, hostname, last_updated]) -> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated - 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1) (attempt #0) with attempt id d22b3ac56f07e182ba5b74d68fa74fb1 to 10.42.1.148:6122-9b9553 @ 10.42.1.148 (dataPort=40391) with allocation id b0559997a428b1d31d9e57d6532e026b
2021-01-15 16:52:06,883 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - GroupWindowAggregate(groupBy=[msg_id, hostname], window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink (1/1) (09cee06206ad355b327cb8487773cd39) switched from SCHEDULED to DEPLOYING.
2021-01-15 16:52:06,883 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying GroupWindowAggregate(groupBy=[msg_id, hostname], window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink (1/1) (attempt #0) with attempt id 09cee06206ad355b327cb8487773cd39 to 10.42.1.148:6122-9b9553 @ 10.42.1.148 (dataPort=40391) with allocation id b0559997a428b1d31d9e57d6532e026b
2021-01-15 16:52:07,038 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - GroupWindowAggregate(groupBy=[msg_id, hostname], window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink (1/1) (09cee06206ad355b327cb8487773cd39) switched from DEPLOYING to RUNNING.
2021-01-15 16:52:07,038 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: JdbcTableSource(msg_id, hostname, last_updated) -> SourceConversion(table=[default_catalog.default_database.log_counts, source: [JdbcTableSource(msg_id, hostname, last_updated)]], fields=[msg_id, hostname, last_updated]) -> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated - 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1) (d22b3ac56f07e182ba5b74d68fa74fb1) switched from DEPLOYING to RUNNING.
2021-01-15 16:52:07,057 INFO  org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator [] - Received sink socket server address: /10.42.1.148:39303
2021-01-15 16:52:07,060 WARN  org.apache.flink.runtime.taskmanager.TaskManagerLocation     [] - No hostname could be resolved for the IP address 10.42.1.148, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.
2021-01-15 16:52:07,988 WARN  org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - Could not execute application: 
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute sql
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) [?:1.8.0_275]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_275]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_275]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_275]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_275]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_275]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_275]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:719) ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570) ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
    at io.cmdaa.streaming.java.jdbc.TableConsumer.main(TableConsumer.java:192) ~[?:?]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_275]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_275]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_275]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_275]
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    ... 13 more
Caused by: java.lang.IllegalArgumentException: Job client must be a CoordinationRequestGateway. This is a bug.
    at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:142) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.setJobClient(CollectResultFetcher.java:95) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.setJobClient(CollectResultIterator.java:98) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.table.planner.sinks.SelectTableSinkBase$1.setJobClient(SelectTableSinkBase.java:93) ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:709) ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570) ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
    at io.cmdaa.streaming.java.jdbc.TableConsumer.main(TableConsumer.java:192) ~[?:?]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_275]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_275]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_275]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_275]
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    ... 13 more
2021-01-15 16:52:07,989 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Exception occurred in REST handler: Could not execute application.
2021-01-15 16:52:08,462 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: JdbcTableSource(msg_id, hostname, last_updated) -> SourceConversion(table=[default_catalog.default_database.log_counts, source: [JdbcTableSource(msg_id, hostname, last_updated)]], fields=[msg_id, hostname, last_updated]) -> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated - 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1) (d22b3ac56f07e182ba5b74d68fa74fb1) switched from RUNNING to FINISHED.
2021-01-15 16:52:08,465 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - GroupWindowAggregate(groupBy=[msg_id, hostname], window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink (1/1) (09cee06206ad355b327cb8487773cd39) switched from RUNNING to FINISHED.
2021-01-15 16:52:08,465 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job collect (84c9f12fe943bc7f32ee637666ed3bc1) switched from state RUNNING to FINISHED.
2021-01-15 16:52:08,466 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Stopping checkpoint coordinator for job 84c9f12fe943bc7f32ee637666ed3bc1.
2021-01-15 16:52:08,466 INFO  org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - Shutting down
2021-01-15 16:52:08,466 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job 84c9f12fe943bc7f32ee637666ed3bc1 reached globally terminal state FINISHED.
2021-01-15 16:52:08,467 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Stopping the JobMaster for job collect(84c9f12fe943bc7f32ee637666ed3bc1).
2021-01-15 16:52:08,467 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     [] - Suspending SlotPool.
2021-01-15 16:52:08,468 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Close ResourceManager connection a76a3c5321498f13d0552421928c6062: Stopping JobMaster for job collect(84c9f12fe943bc7f32ee637666ed3bc1)..
2021-01-15 16:52:08,468 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     [] - Stopping SlotPool.
2021-01-15 16:52:08,468 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Disconnect job manager [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68 for job 84c9f12fe943bc7f32ee637666ed3bc1 from the resource manager.

On Fri, Jan 15, 2021 at 11:32 AM Timo Walther twalthr@... wrote:

Hi Robert,

could you send us the error/stacktrace that is printed?

An example how it should work is shown here:

https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java

https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java

Regards,
Timo

On 15.01.21 16:46, Robert Cullen wrote:
> I’m using a query to gather results:
>
> |Table log_counts = tEnv.from("log_counts")
> .filter($("hostname").isNotNull() .and($("hostname").isNotEqual("")))
> .window(Tumble .over(lit(30).seconds()) .on($("last_updated")).as("w"))
> .groupBy($("msg_id"), $("hostname"), $("w")) .select($("msg_id"),
> $("hostname"), $("msg_id").count().as("cnt")); |
>
> I’d like to iterate over the results but using this causes an error:
>
> |log_counts.execute().collect();
> |
>
> Has there been an update in how to traverse results?
>
>
> Robert Cullen
> 240-475-4490
>

--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: How to Iterate over results in Table API version 1.12.0

Timo Walther
How are you running the Flink cluster? What is your deplyment?

The exception clearly indicates that you found a bug. Could you open an
ticket in Flink's JIRA? We need details how to reproduce it.

Thanks,
Timo


On 15.01.21 17:57, Robert Cullen wrote:

> Timo,
>
> Here it is:
>
> |2021-01-15 16:52:00,628 WARN
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] -
> Configuring the job submission via query parameters is deprecated.
> Please migrate to submitting a JSON request instead. 2021-01-15
> 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - Starting
> program (detached: true) 2021-01-15 16:52:00,678 INFO
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor []
> - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted. 2021-01-15
> 16:52:00,678 INFO
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor []
> - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1. 2021-01-15
> 16:52:00,830 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received
> JobGraph submission 84c9f12fe943bc7f32ee637666ed3bc1 (collect).
> 2021-01-15 16:52:00,830 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting
> job 84c9f12fe943bc7f32ee637666ed3bc1 (collect). 2021-01-15 16:52:00,831
> INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC
> endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
> akka://flink/user/rpc/jobmanager_68 . 2021-01-15 16:52:00,831 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job
> collect (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 16:52:00,832 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart back off
> time strategy NoRestartBackoffTimeStrategy for collect
> (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 16:52:00,832 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - Running initialization
> on master for job collect (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15
> 16:52:00,836 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
> Successfully ran initialization on master in 3 ms. 2021-01-15
> 16:52:00,836 INFO
> org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] -
> Built 1 pipelined regions in 0 ms 2021-01-15 16:52:00,836 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - Using job/cluster
> config to configure application-defined state backend: File State
> Backend (checkpoints: 's3://flink/checkpoints', savepoints:
> 's3://flink/savepoints', asynchronous: TRUE, fileStateThreshold: 20480)
> 2021-01-15 16:52:00,836 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - Using
> application-defined state backend: File State Backend (checkpoints:
> 's3://flink/checkpoints', savepoints: 's3://flink/savepoints',
> asynchronous: TRUE, fileStateThreshold: 20480) 2021-01-15 16:52:06,865
> INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No
> checkpoint found during restore. 2021-01-15 16:52:06,866 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - Using failover
> strategy
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@39b9ad24
> for collect (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 16:52:06,866
> INFO org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl [] -
> JobManager runner for job collect (84c9f12fe943bc7f32ee637666ed3bc1) was
> granted leadership with session id 00000000-0000-0000-0000-000000000000
> at akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_68.
> 2021-01-15 16:52:06,866 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - Starting execution of
> job collect (84c9f12fe943bc7f32ee637666ed3bc1) under job master id
> 00000000000000000000000000000000. 2021-01-15 16:52:06,866 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - Starting scheduling
> with scheduling strategy
> [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
> 2021-01-15 16:52:06,866 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job collect
> (84c9f12fe943bc7f32ee637666ed3bc1) switched from state CREATED to
> RUNNING. 2021-01-15 16:52:06,866 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
> JdbcTableSource(msg_id, hostname, last_updated) ->
> SourceConversion(table=[default_catalog.default_database.log_counts,
> source: [JdbcTableSource(msg_id, hostname, last_updated)]],
> fields=[msg_id, hostname, last_updated]) ->
> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated -
> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, last_updated],
> where=[(hostname <> _UTF-16LE'')]) (1/1)
> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from CREATED to SCHEDULED.
> 2021-01-15 16:52:06,866 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
> GroupWindowAggregate(groupBy=[msg_id, hostname],
> window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id,
> hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname,
> EXPR$0 AS cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink
> (1/1) (09cee06206ad355b327cb8487773cd39) switched from CREATED to
> SCHEDULED. 2021-01-15 16:52:06,866 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Cannot
> serve slot request, no ResourceManager connected. Adding as pending
> request [SlotRequestId{b3f1b6b4eaab427305ec15f92a39531e}] 2021-01-15
> 16:52:06,866 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
> Connecting to ResourceManager
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)
> 2021-01-15 16:52:06,867 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - Resolved
> ResourceManager address, beginning registration 2021-01-15 16:52:06,867
> INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager
> [] - Registering job manager
> [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68
> for job 84c9f12fe943bc7f32ee637666ed3bc1. 2021-01-15 16:52:06,867 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Registered job manager
> [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68
> for job 84c9f12fe943bc7f32ee637666ed3bc1. 2021-01-15 16:52:06,867 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - JobManager
> successfully registered at ResourceManager, leader id:
> 00000000000000000000000000000000. 2021-01-15 16:52:06,867 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Requesting
> new slot [SlotRequestId{b3f1b6b4eaab427305ec15f92a39531e}] and profile
> ResourceProfile{UNKNOWN} with allocation id
> b0559997a428b1d31d9e57d6532e026b from resource manager. 2021-01-15
> 16:52:06,868 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Request slot with profile ResourceProfile{UNKNOWN} for job
> 84c9f12fe943bc7f32ee637666ed3bc1 with allocation id
> b0559997a428b1d31d9e57d6532e026b. 2021-01-15 16:52:06,874 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
> JdbcTableSource(msg_id, hostname, last_updated) ->
> SourceConversion(table=[default_catalog.default_database.log_counts,
> source: [JdbcTableSource(msg_id, hostname, last_updated)]],
> fields=[msg_id, hostname, last_updated]) ->
> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated -
> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, last_updated],
> where=[(hostname <> _UTF-16LE'')]) (1/1)
> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from SCHEDULED to DEPLOYING.
> 2021-01-15 16:52:06,882 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying
> Source: JdbcTableSource(msg_id, hostname, last_updated) ->
> SourceConversion(table=[default_catalog.default_database.log_counts,
> source: [JdbcTableSource(msg_id, hostname, last_updated)]],
> fields=[msg_id, hostname, last_updated]) ->
> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated -
> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, last_updated],
> where=[(hostname <> _UTF-16LE'')]) (1/1) (attempt #0) with attempt id
> d22b3ac56f07e182ba5b74d68fa74fb1 to 10.42.1.148:6122-9b9553 @
> 10.42.1.148 (dataPort=40391) with allocation id
> b0559997a428b1d31d9e57d6532e026b 2021-01-15 16:52:06,883 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
> GroupWindowAggregate(groupBy=[msg_id, hostname],
> window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id,
> hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname,
> EXPR$0 AS cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink
> (1/1) (09cee06206ad355b327cb8487773cd39) switched from SCHEDULED to
> DEPLOYING. 2021-01-15 16:52:06,883 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying
> GroupWindowAggregate(groupBy=[msg_id, hostname],
> window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id,
> hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname,
> EXPR$0 AS cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink
> (1/1) (attempt #0) with attempt id 09cee06206ad355b327cb8487773cd39 to
> 10.42.1.148:6122-9b9553 @ 10.42.1.148 (dataPort=40391) with allocation
> id b0559997a428b1d31d9e57d6532e026b 2021-01-15 16:52:07,038 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
> GroupWindowAggregate(groupBy=[msg_id, hostname],
> window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id,
> hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname,
> EXPR$0 AS cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink
> (1/1) (09cee06206ad355b327cb8487773cd39) switched from DEPLOYING to
> RUNNING. 2021-01-15 16:52:07,038 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
> JdbcTableSource(msg_id, hostname, last_updated) ->
> SourceConversion(table=[default_catalog.default_database.log_counts,
> source: [JdbcTableSource(msg_id, hostname, last_updated)]],
> fields=[msg_id, hostname, last_updated]) ->
> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated -
> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, last_updated],
> where=[(hostname <> _UTF-16LE'')]) (1/1)
> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from DEPLOYING to RUNNING.
> 2021-01-15 16:52:07,057 INFO
> org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator
> [] - Received sink socket server address: /10.42.1.148:39303
> <http://10.42.1.148:39303> 2021-01-15 16:52:07,060 WARN
> org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> hostname could be resolved for the IP address 10.42.1.148, using IP
> address as host name. Local input split assignment (such as for HDFS
> files) may be impacted. 2021-01-15 16:52:07,988 WARN
> org.apache.flink.client.deployment.application.DetachedApplicationRunner
> [] - Could not execute application:
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Failed to execute sql at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> [?:1.8.0_275] at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [?:1.8.0_275] at
> java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_275]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> [?:1.8.0_275] at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> [?:1.8.0_275] at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_275] at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_275] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
> Caused by: org.apache.flink.table.api.TableException: Failed to execute
> sql at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:719)
> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570) ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
> at
> io.cmdaa.streaming.java.jdbc.TableConsumer.main(TableConsumer.java:192)
> ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_275] at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_275] at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_275] at java.lang.reflect.Method.invoke(Method.java:498)
> ~[?:1.8.0_275] at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0] ... 13 more Caused by:
> java.lang.IllegalArgumentException: Job client must be a
> CoordinationRequestGateway. This is a bug. at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:142) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.setJobClient(CollectResultFetcher.java:95)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.setJobClient(CollectResultIterator.java:98)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
> org.apache.flink.table.planner.sinks.SelectTableSinkBase$1.setJobClient(SelectTableSinkBase.java:93)
> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:709)
> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570) ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
> at
> io.cmdaa.streaming.java.jdbc.TableConsumer.main(TableConsumer.java:192)
> ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_275] at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_275] at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_275] at java.lang.reflect.Method.invoke(Method.java:498)
> ~[?:1.8.0_275] at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0] ... 13 more 2021-01-15 16:52:07,989
> ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] -
> Exception occurred in REST handler: Could not execute application.
> 2021-01-15 16:52:08,462 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
> JdbcTableSource(msg_id, hostname, last_updated) ->
> SourceConversion(table=[default_catalog.default_database.log_counts,
> source: [JdbcTableSource(msg_id, hostname, last_updated)]],
> fields=[msg_id, hostname, last_updated]) ->
> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated -
> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname, last_updated],
> where=[(hostname <> _UTF-16LE'')]) (1/1)
> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from RUNNING to FINISHED.
> 2021-01-15 16:52:08,465 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
> GroupWindowAggregate(groupBy=[msg_id, hostname],
> window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id,
> hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname,
> EXPR$0 AS cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink
> (1/1) (09cee06206ad355b327cb8487773cd39) switched from RUNNING to
> FINISHED. 2021-01-15 16:52:08,465 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job collect
> (84c9f12fe943bc7f32ee637666ed3bc1) switched from state RUNNING to
> FINISHED. 2021-01-15 16:52:08,466 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping
> checkpoint coordinator for job 84c9f12fe943bc7f32ee637666ed3bc1.
> 2021-01-15 16:52:08,466 INFO
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore
> [] - Shutting down 2021-01-15 16:52:08,466 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
> 84c9f12fe943bc7f32ee637666ed3bc1 reached globally terminal state
> FINISHED. 2021-01-15 16:52:08,467 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the JobMaster
> for job collect(84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15
> 16:52:08,467 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Suspending
> SlotPool. 2021-01-15 16:52:08,468 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - Close ResourceManager
> connection a76a3c5321498f13d0552421928c6062: Stopping JobMaster for job
> collect(84c9f12fe943bc7f32ee637666ed3bc1).. 2021-01-15 16:52:08,468 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Stopping
> SlotPool. 2021-01-15 16:52:08,468 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Disconnect job manager
> [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68
> for job 84c9f12fe943bc7f32ee637666ed3bc1 from the resource manager. |
>
> On Fri, Jan 15, 2021 at 11:32 AM Timo Walther [hidden email]
> <http://mailto:twalthr@...> wrote:
>
>     Hi Robert,
>
>     could you send us the error/stacktrace that is printed?
>
>     An example how it should work is shown here:
>
>     https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java
>     <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java>
>
>     https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java
>     <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java>
>
>     Regards,
>     Timo
>
>     On 15.01.21 16:46, Robert Cullen wrote:
>      > I’m using a query to gather results:
>      >
>      > |Table log_counts = tEnv.from("log_counts")
>      > .filter($("hostname").isNotNull()
>     .and($("hostname").isNotEqual("")))
>      > .window(Tumble .over(lit(30).seconds())
>     .on($("last_updated")).as("w"))
>      > .groupBy($("msg_id"), $("hostname"), $("w")) .select($("msg_id"),
>      > $("hostname"), $("msg_id").count().as("cnt")); |
>      >
>      > I’d like to iterate over the results but using this causes an error:
>      >
>      > |log_counts.execute().collect();
>      > |
>      >
>      > Has there been an update in how to traverse results?
>      >
>      >
>      > Robert Cullen
>      > 240-475-4490
>      >
>
> --
> Robert Cullen
> 240-475-4490

Reply | Threaded
Open this post in threaded view
|

Re: How to Iterate over results in Table API version 1.12.0

Timo Walther
maybe Godfrey in CC knows more?


On 15.01.21 18:10, Timo Walther wrote:

> How are you running the Flink cluster? What is your deplyment?
>
> The exception clearly indicates that you found a bug. Could you open an
> ticket in Flink's JIRA? We need details how to reproduce it.
>
> Thanks,
> Timo
>
>
> On 15.01.21 17:57, Robert Cullen wrote:
>> Timo,
>>
>> Here it is:
>>
>> |2021-01-15 16:52:00,628 WARN
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] -
>> Configuring the job submission via query parameters is deprecated.
>> Please migrate to submitting a JSON request instead. 2021-01-15
>> 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - Starting
>> program (detached: true) 2021-01-15 16:52:00,678 INFO
>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
>> [] - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted. 2021-01-15
>> 16:52:00,678 INFO
>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
>> [] - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1.
>> 2021-01-15 16:52:00,830 INFO
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received
>> JobGraph submission 84c9f12fe943bc7f32ee637666ed3bc1 (collect).
>> 2021-01-15 16:52:00,830 INFO
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] -
>> Submitting job 84c9f12fe943bc7f32ee637666ed3bc1 (collect). 2021-01-15
>> 16:52:00,831 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService []
>> - Starting RPC endpoint for
>> org.apache.flink.runtime.jobmaster.JobMaster at
>> akka://flink/user/rpc/jobmanager_68 . 2021-01-15 16:52:00,831 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job
>> collect (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 16:52:00,832
>> INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart
>> back off time strategy NoRestartBackoffTimeStrategy for collect
>> (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 16:52:00,832 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Running
>> initialization on master for job collect
>> (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 16:52:00,836 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully ran
>> initialization on master in 3 ms. 2021-01-15 16:52:00,836 INFO
>> org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology []
>> - Built 1 pipelined regions in 0 ms 2021-01-15 16:52:00,836 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Using job/cluster
>> config to configure application-defined state backend: File State
>> Backend (checkpoints: 's3://flink/checkpoints', savepoints:
>> 's3://flink/savepoints', asynchronous: TRUE, fileStateThreshold:
>> 20480) 2021-01-15 16:52:00,836 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Using
>> application-defined state backend: File State Backend (checkpoints:
>> 's3://flink/checkpoints', savepoints: 's3://flink/savepoints',
>> asynchronous: TRUE, fileStateThreshold: 20480) 2021-01-15 16:52:06,865
>> INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No
>> checkpoint found during restore. 2021-01-15 16:52:06,866 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Using failover
>> strategy
>> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@39b9ad24
>> for collect (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15
>> 16:52:06,866 INFO
>> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl [] -
>> JobManager runner for job collect (84c9f12fe943bc7f32ee637666ed3bc1)
>> was granted leadership with session id
>> 00000000-0000-0000-0000-000000000000 at
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_68.
>> 2021-01-15 16:52:06,866 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Starting execution
>> of job collect (84c9f12fe943bc7f32ee637666ed3bc1) under job master id
>> 00000000000000000000000000000000. 2021-01-15 16:52:06,866 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Starting scheduling
>> with scheduling strategy
>> [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
>> 2021-01-15 16:52:06,866 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job
>> collect (84c9f12fe943bc7f32ee637666ed3bc1) switched from state CREATED
>> to RUNNING. 2021-01-15 16:52:06,866 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
>> JdbcTableSource(msg_id, hostname, last_updated) ->
>> SourceConversion(table=[default_catalog.default_database.log_counts,
>> source: [JdbcTableSource(msg_id, hostname, last_updated)]],
>> fields=[msg_id, hostname, last_updated]) ->
>> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated -
>> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname,
>> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
>> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from CREATED to SCHEDULED.
>> 2021-01-15 16:52:06,866 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
>> GroupWindowAggregate(groupBy=[msg_id, hostname],
>> window=[TumblingGroupWindow('w, last_updated, 300000)],
>> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) ->
>> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) ->
>> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
>> (09cee06206ad355b327cb8487773cd39) switched from CREATED to SCHEDULED.
>> 2021-01-15 16:52:06,866 INFO
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Cannot
>> serve slot request, no ResourceManager connected. Adding as pending
>> request [SlotRequestId{b3f1b6b4eaab427305ec15f92a39531e}] 2021-01-15
>> 16:52:06,866 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
>> Connecting to ResourceManager
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)
>> 2021-01-15 16:52:06,867 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Resolved
>> ResourceManager address, beginning registration 2021-01-15
>> 16:52:06,867 INFO
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager []
>> - Registering job manager
>> [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68
>> for job 84c9f12fe943bc7f32ee637666ed3bc1. 2021-01-15 16:52:06,867 INFO
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager []
>> - Registered job manager
>> [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68
>> for job 84c9f12fe943bc7f32ee637666ed3bc1. 2021-01-15 16:52:06,867 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - JobManager
>> successfully registered at ResourceManager, leader id:
>> 00000000000000000000000000000000. 2021-01-15 16:52:06,867 INFO
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] -
>> Requesting new slot [SlotRequestId{b3f1b6b4eaab427305ec15f92a39531e}]
>> and profile ResourceProfile{UNKNOWN} with allocation id
>> b0559997a428b1d31d9e57d6532e026b from resource manager. 2021-01-15
>> 16:52:06,868 INFO
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager []
>> - Request slot with profile ResourceProfile{UNKNOWN} for job
>> 84c9f12fe943bc7f32ee637666ed3bc1 with allocation id
>> b0559997a428b1d31d9e57d6532e026b. 2021-01-15 16:52:06,874 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
>> JdbcTableSource(msg_id, hostname, last_updated) ->
>> SourceConversion(table=[default_catalog.default_database.log_counts,
>> source: [JdbcTableSource(msg_id, hostname, last_updated)]],
>> fields=[msg_id, hostname, last_updated]) ->
>> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated -
>> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname,
>> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
>> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from SCHEDULED to
>> DEPLOYING. 2021-01-15 16:52:06,882 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying
>> Source: JdbcTableSource(msg_id, hostname, last_updated) ->
>> SourceConversion(table=[default_catalog.default_database.log_counts,
>> source: [JdbcTableSource(msg_id, hostname, last_updated)]],
>> fields=[msg_id, hostname, last_updated]) ->
>> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated -
>> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname,
>> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1) (attempt #0)
>> with attempt id d22b3ac56f07e182ba5b74d68fa74fb1 to
>> 10.42.1.148:6122-9b9553 @ 10.42.1.148 (dataPort=40391) with allocation
>> id b0559997a428b1d31d9e57d6532e026b 2021-01-15 16:52:06,883 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
>> GroupWindowAggregate(groupBy=[msg_id, hostname],
>> window=[TumblingGroupWindow('w, last_updated, 300000)],
>> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) ->
>> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) ->
>> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
>> (09cee06206ad355b327cb8487773cd39) switched from SCHEDULED to
>> DEPLOYING. 2021-01-15 16:52:06,883 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying
>> GroupWindowAggregate(groupBy=[msg_id, hostname],
>> window=[TumblingGroupWindow('w, last_updated, 300000)],
>> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) ->
>> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) ->
>> SinkConversionToTuple2 -> Sink: Select table sink (1/1) (attempt #0)
>> with attempt id 09cee06206ad355b327cb8487773cd39 to
>> 10.42.1.148:6122-9b9553 @ 10.42.1.148 (dataPort=40391) with allocation
>> id b0559997a428b1d31d9e57d6532e026b 2021-01-15 16:52:07,038 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
>> GroupWindowAggregate(groupBy=[msg_id, hostname],
>> window=[TumblingGroupWindow('w, last_updated, 300000)],
>> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) ->
>> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) ->
>> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
>> (09cee06206ad355b327cb8487773cd39) switched from DEPLOYING to RUNNING.
>> 2021-01-15 16:52:07,038 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
>> JdbcTableSource(msg_id, hostname, last_updated) ->
>> SourceConversion(table=[default_catalog.default_database.log_counts,
>> source: [JdbcTableSource(msg_id, hostname, last_updated)]],
>> fields=[msg_id, hostname, last_updated]) ->
>> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated -
>> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname,
>> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
>> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from DEPLOYING to RUNNING.
>> 2021-01-15 16:52:07,057 INFO
>> org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator
>> [] - Received sink socket server address: /10.42.1.148:39303
>> <http://10.42.1.148:39303> 2021-01-15 16:52:07,060 WARN
>> org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
>> hostname could be resolved for the IP address 10.42.1.148, using IP
>> address as host name. Local input split assignment (such as for HDFS
>> files) may be impacted. 2021-01-15 16:52:07,988 WARN
>> org.apache.flink.client.deployment.application.DetachedApplicationRunner
>> [] - Could not execute application:
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Failed to execute sql at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>> [?:1.8.0_275] at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_275]
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> [?:1.8.0_275] at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> [?:1.8.0_275] at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> [?:1.8.0_275] at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> [?:1.8.0_275] at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> [?:1.8.0_275] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
>> Caused by: org.apache.flink.table.api.TableException: Failed to
>> execute sql at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:719)
>> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at
>> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570)
>> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at
>> io.cmdaa.streaming.java.jdbc.TableConsumer.main(TableConsumer.java:192) ~[?:?]
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> ~[?:1.8.0_275] at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[?:1.8.0_275] at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_275] at java.lang.reflect.Method.invoke(Method.java:498)
>> ~[?:1.8.0_275] at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] ... 13 more Caused by:
>> java.lang.IllegalArgumentException: Job client must be a
>> CoordinationRequestGateway. This is a bug. at
>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:142)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.setJobClient(CollectResultFetcher.java:95)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.setJobClient(CollectResultIterator.java:98)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>> org.apache.flink.table.planner.sinks.SelectTableSinkBase$1.setJobClient(SelectTableSinkBase.java:93)
>> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:709)
>> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at
>> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570)
>> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at
>> io.cmdaa.streaming.java.jdbc.TableConsumer.main(TableConsumer.java:192) ~[?:?]
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> ~[?:1.8.0_275] at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[?:1.8.0_275] at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_275] at java.lang.reflect.Method.invoke(Method.java:498)
>> ~[?:1.8.0_275] at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] ... 13 more 2021-01-15
>> 16:52:07,989 ERROR
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] -
>> Exception occurred in REST handler: Could not execute application.
>> 2021-01-15 16:52:08,462 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
>> JdbcTableSource(msg_id, hostname, last_updated) ->
>> SourceConversion(table=[default_catalog.default_database.log_counts,
>> source: [JdbcTableSource(msg_id, hostname, last_updated)]],
>> fields=[msg_id, hostname, last_updated]) ->
>> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated -
>> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname,
>> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
>> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from RUNNING to FINISHED.
>> 2021-01-15 16:52:08,465 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
>> GroupWindowAggregate(groupBy=[msg_id, hostname],
>> window=[TumblingGroupWindow('w, last_updated, 300000)],
>> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) ->
>> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) ->
>> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
>> (09cee06206ad355b327cb8487773cd39) switched from RUNNING to FINISHED.
>> 2021-01-15 16:52:08,465 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job
>> collect (84c9f12fe943bc7f32ee637666ed3bc1) switched from state RUNNING
>> to FINISHED. 2021-01-15 16:52:08,466 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] -
>> Stopping checkpoint coordinator for job
>> 84c9f12fe943bc7f32ee637666ed3bc1. 2021-01-15 16:52:08,466 INFO
>> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore
>> [] - Shutting down 2021-01-15 16:52:08,466 INFO
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
>> 84c9f12fe943bc7f32ee637666ed3bc1 reached globally terminal state
>> FINISHED. 2021-01-15 16:52:08,467 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the
>> JobMaster for job collect(84c9f12fe943bc7f32ee637666ed3bc1).
>> 2021-01-15 16:52:08,467 INFO
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] -
>> Suspending SlotPool. 2021-01-15 16:52:08,468 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Close
>> ResourceManager connection a76a3c5321498f13d0552421928c6062: Stopping
>> JobMaster for job collect(84c9f12fe943bc7f32ee637666ed3bc1)..
>> 2021-01-15 16:52:08,468 INFO
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Stopping
>> SlotPool. 2021-01-15 16:52:08,468 INFO
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager []
>> - Disconnect job manager
>> [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68
>> for job 84c9f12fe943bc7f32ee637666ed3bc1 from the resource manager. |
>>
>> On Fri, Jan 15, 2021 at 11:32 AM Timo Walther [hidden email]
>> <http://mailto:twalthr@...> wrote:
>>
>>     Hi Robert,
>>
>>     could you send us the error/stacktrace that is printed?
>>
>>     An example how it should work is shown here:
>>
>>    
>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java 
>>
>>    
>> <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java>
>>
>>
>>    
>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java 
>>
>>    
>> <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java>
>>
>>
>>     Regards,
>>     Timo
>>
>>     On 15.01.21 16:46, Robert Cullen wrote:
>>      > I’m using a query to gather results:
>>      >
>>      > |Table log_counts = tEnv.from("log_counts")
>>      > .filter($("hostname").isNotNull()
>>     .and($("hostname").isNotEqual("")))
>>      > .window(Tumble .over(lit(30).seconds())
>>     .on($("last_updated")).as("w"))
>>      > .groupBy($("msg_id"), $("hostname"), $("w")) .select($("msg_id"),
>>      > $("hostname"), $("msg_id").count().as("cnt")); |
>>      >
>>      > I’d like to iterate over the results but using this causes an
>> error:
>>      >
>>      > |log_counts.execute().collect();
>>      > |
>>      >
>>      > Has there been an update in how to traverse results?
>>      >
>>      >
>>      > Robert Cullen
>>      > 240-475-4490
>>      >
>>
>> --
>> Robert Cullen
>> 240-475-4490
>

Reply | Threaded
Open this post in threaded view
|

Re: How to Iterate over results in Table API version 1.12.0

Robert Cullen
I'm running Flink (1.12.0) on a Kubernetes standalone deployment (Kubernetes Cluster Session Mode).

What's the link to Flink's JIRA?

On Fri, Jan 15, 2021 at 12:19 PM Timo Walther <[hidden email]> wrote:
maybe Godfrey in CC knows more?


On 15.01.21 18:10, Timo Walther wrote:
> How are you running the Flink cluster? What is your deplyment?
>
> The exception clearly indicates that you found a bug. Could you open an
> ticket in Flink's JIRA? We need details how to reproduce it.
>
> Thanks,
> Timo
>
>
> On 15.01.21 17:57, Robert Cullen wrote:
>> Timo,
>>
>> Here it is:
>>
>> |2021-01-15 16:52:00,628 WARN
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] -
>> Configuring the job submission via query parameters is deprecated.
>> Please migrate to submitting a JSON request instead. 2021-01-15
>> 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - Starting
>> program (detached: true) 2021-01-15 16:52:00,678 INFO
>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
>> [] - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted. 2021-01-15
>> 16:52:00,678 INFO
>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
>> [] - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1.
>> 2021-01-15 16:52:00,830 INFO
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received
>> JobGraph submission 84c9f12fe943bc7f32ee637666ed3bc1 (collect).
>> 2021-01-15 16:52:00,830 INFO
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] -
>> Submitting job 84c9f12fe943bc7f32ee637666ed3bc1 (collect). 2021-01-15
>> 16:52:00,831 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService []
>> - Starting RPC endpoint for
>> org.apache.flink.runtime.jobmaster.JobMaster at
>> akka://flink/user/rpc/jobmanager_68 . 2021-01-15 16:52:00,831 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job
>> collect (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 16:52:00,832
>> INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart
>> back off time strategy NoRestartBackoffTimeStrategy for collect
>> (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 16:52:00,832 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Running
>> initialization on master for job collect
>> (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 16:52:00,836 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully ran
>> initialization on master in 3 ms. 2021-01-15 16:52:00,836 INFO
>> org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology []
>> - Built 1 pipelined regions in 0 ms 2021-01-15 16:52:00,836 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Using job/cluster
>> config to configure application-defined state backend: File State
>> Backend (checkpoints: 's3://flink/checkpoints', savepoints:
>> 's3://flink/savepoints', asynchronous: TRUE, fileStateThreshold:
>> 20480) 2021-01-15 16:52:00,836 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Using
>> application-defined state backend: File State Backend (checkpoints:
>> 's3://flink/checkpoints', savepoints: 's3://flink/savepoints',
>> asynchronous: TRUE, fileStateThreshold: 20480) 2021-01-15 16:52:06,865
>> INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No
>> checkpoint found during restore. 2021-01-15 16:52:06,866 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Using failover
>> strategy
>> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@39b9ad24
>> for collect (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15
>> 16:52:06,866 INFO
>> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl [] -
>> JobManager runner for job collect (84c9f12fe943bc7f32ee637666ed3bc1)
>> was granted leadership with session id
>> 00000000-0000-0000-0000-000000000000 at
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_68.
>> 2021-01-15 16:52:06,866 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Starting execution
>> of job collect (84c9f12fe943bc7f32ee637666ed3bc1) under job master id
>> 00000000000000000000000000000000. 2021-01-15 16:52:06,866 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Starting scheduling
>> with scheduling strategy
>> [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
>> 2021-01-15 16:52:06,866 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job
>> collect (84c9f12fe943bc7f32ee637666ed3bc1) switched from state CREATED
>> to RUNNING. 2021-01-15 16:52:06,866 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
>> JdbcTableSource(msg_id, hostname, last_updated) ->
>> SourceConversion(table=[default_catalog.default_database.log_counts,
>> source: [JdbcTableSource(msg_id, hostname, last_updated)]],
>> fields=[msg_id, hostname, last_updated]) ->
>> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated -
>> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname,
>> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
>> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from CREATED to SCHEDULED.
>> 2021-01-15 16:52:06,866 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
>> GroupWindowAggregate(groupBy=[msg_id, hostname],
>> window=[TumblingGroupWindow('w, last_updated, 300000)],
>> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) ->
>> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) ->
>> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
>> (09cee06206ad355b327cb8487773cd39) switched from CREATED to SCHEDULED.
>> 2021-01-15 16:52:06,866 INFO
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Cannot
>> serve slot request, no ResourceManager connected. Adding as pending
>> request [SlotRequestId{b3f1b6b4eaab427305ec15f92a39531e}] 2021-01-15
>> 16:52:06,866 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
>> Connecting to ResourceManager
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)
>> 2021-01-15 16:52:06,867 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Resolved
>> ResourceManager address, beginning registration 2021-01-15
>> 16:52:06,867 INFO
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager []
>> - Registering job manager
>> [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68
>> for job 84c9f12fe943bc7f32ee637666ed3bc1. 2021-01-15 16:52:06,867 INFO
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager []
>> - Registered job manager
>> [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68
>> for job 84c9f12fe943bc7f32ee637666ed3bc1. 2021-01-15 16:52:06,867 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - JobManager
>> successfully registered at ResourceManager, leader id:
>> 00000000000000000000000000000000. 2021-01-15 16:52:06,867 INFO
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] -
>> Requesting new slot [SlotRequestId{b3f1b6b4eaab427305ec15f92a39531e}]
>> and profile ResourceProfile{UNKNOWN} with allocation id
>> b0559997a428b1d31d9e57d6532e026b from resource manager. 2021-01-15
>> 16:52:06,868 INFO
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager []
>> - Request slot with profile ResourceProfile{UNKNOWN} for job
>> 84c9f12fe943bc7f32ee637666ed3bc1 with allocation id
>> b0559997a428b1d31d9e57d6532e026b. 2021-01-15 16:52:06,874 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
>> JdbcTableSource(msg_id, hostname, last_updated) ->
>> SourceConversion(table=[default_catalog.default_database.log_counts,
>> source: [JdbcTableSource(msg_id, hostname, last_updated)]],
>> fields=[msg_id, hostname, last_updated]) ->
>> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated -
>> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname,
>> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
>> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from SCHEDULED to
>> DEPLOYING. 2021-01-15 16:52:06,882 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying
>> Source: JdbcTableSource(msg_id, hostname, last_updated) ->
>> SourceConversion(table=[default_catalog.default_database.log_counts,
>> source: [JdbcTableSource(msg_id, hostname, last_updated)]],
>> fields=[msg_id, hostname, last_updated]) ->
>> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated -
>> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname,
>> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1) (attempt #0)
>> with attempt id d22b3ac56f07e182ba5b74d68fa74fb1 to
>> 10.42.1.148:6122-9b9553 @ 10.42.1.148 (dataPort=40391) with allocation
>> id b0559997a428b1d31d9e57d6532e026b 2021-01-15 16:52:06,883 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
>> GroupWindowAggregate(groupBy=[msg_id, hostname],
>> window=[TumblingGroupWindow('w, last_updated, 300000)],
>> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) ->
>> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) ->
>> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
>> (09cee06206ad355b327cb8487773cd39) switched from SCHEDULED to
>> DEPLOYING. 2021-01-15 16:52:06,883 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying
>> GroupWindowAggregate(groupBy=[msg_id, hostname],
>> window=[TumblingGroupWindow('w, last_updated, 300000)],
>> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) ->
>> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) ->
>> SinkConversionToTuple2 -> Sink: Select table sink (1/1) (attempt #0)
>> with attempt id 09cee06206ad355b327cb8487773cd39 to
>> 10.42.1.148:6122-9b9553 @ 10.42.1.148 (dataPort=40391) with allocation
>> id b0559997a428b1d31d9e57d6532e026b 2021-01-15 16:52:07,038 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
>> GroupWindowAggregate(groupBy=[msg_id, hostname],
>> window=[TumblingGroupWindow('w, last_updated, 300000)],
>> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) ->
>> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) ->
>> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
>> (09cee06206ad355b327cb8487773cd39) switched from DEPLOYING to RUNNING.
>> 2021-01-15 16:52:07,038 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
>> JdbcTableSource(msg_id, hostname, last_updated) ->
>> SourceConversion(table=[default_catalog.default_database.log_counts,
>> source: [JdbcTableSource(msg_id, hostname, last_updated)]],
>> fields=[msg_id, hostname, last_updated]) ->
>> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated -
>> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname,
>> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
>> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from DEPLOYING to RUNNING.
>> 2021-01-15 16:52:07,057 INFO
>> org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator
>> [] - Received sink socket server address: /10.42.1.148:39303
>> <http://10.42.1.148:39303> 2021-01-15 16:52:07,060 WARN
>> org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
>> hostname could be resolved for the IP address 10.42.1.148, using IP
>> address as host name. Local input split assignment (such as for HDFS
>> files) may be impacted. 2021-01-15 16:52:07,988 WARN
>> org.apache.flink.client.deployment.application.DetachedApplicationRunner
>> [] - Could not execute application:
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Failed to execute sql at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>> [?:1.8.0_275] at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_275]
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> [?:1.8.0_275] at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> [?:1.8.0_275] at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> [?:1.8.0_275] at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> [?:1.8.0_275] at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> [?:1.8.0_275] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
>> Caused by: org.apache.flink.table.api.TableException: Failed to
>> execute sql at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:719)
>> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at
>> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570)
>> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at
>> io.cmdaa.streaming.java.jdbc.TableConsumer.main(TableConsumer.java:192) ~[?:?]
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> ~[?:1.8.0_275] at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[?:1.8.0_275] at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_275] at java.lang.reflect.Method.invoke(Method.java:498)
>> ~[?:1.8.0_275] at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] ... 13 more Caused by:
>> java.lang.IllegalArgumentException: Job client must be a
>> CoordinationRequestGateway. This is a bug. at
>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:142)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.setJobClient(CollectResultFetcher.java:95)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.setJobClient(CollectResultIterator.java:98)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>> org.apache.flink.table.planner.sinks.SelectTableSinkBase$1.setJobClient(SelectTableSinkBase.java:93)
>> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:709)
>> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at
>> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570)
>> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at
>> io.cmdaa.streaming.java.jdbc.TableConsumer.main(TableConsumer.java:192) ~[?:?]
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> ~[?:1.8.0_275] at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[?:1.8.0_275] at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_275] at java.lang.reflect.Method.invoke(Method.java:498)
>> ~[?:1.8.0_275] at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] ... 13 more 2021-01-15
>> 16:52:07,989 ERROR
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] -
>> Exception occurred in REST handler: Could not execute application.
>> 2021-01-15 16:52:08,462 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
>> JdbcTableSource(msg_id, hostname, last_updated) ->
>> SourceConversion(table=[default_catalog.default_database.log_counts,
>> source: [JdbcTableSource(msg_id, hostname, last_updated)]],
>> fields=[msg_id, hostname, last_updated]) ->
>> WatermarkAssigner(rowtime=[last_updated], watermark=[(last_updated -
>> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname,
>> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
>> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from RUNNING to FINISHED.
>> 2021-01-15 16:52:08,465 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
>> GroupWindowAggregate(groupBy=[msg_id, hostname],
>> window=[TumblingGroupWindow('w, last_updated, 300000)],
>> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) ->
>> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) ->
>> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
>> (09cee06206ad355b327cb8487773cd39) switched from RUNNING to FINISHED.
>> 2021-01-15 16:52:08,465 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job
>> collect (84c9f12fe943bc7f32ee637666ed3bc1) switched from state RUNNING
>> to FINISHED. 2021-01-15 16:52:08,466 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] -
>> Stopping checkpoint coordinator for job
>> 84c9f12fe943bc7f32ee637666ed3bc1. 2021-01-15 16:52:08,466 INFO
>> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore
>> [] - Shutting down 2021-01-15 16:52:08,466 INFO
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
>> 84c9f12fe943bc7f32ee637666ed3bc1 reached globally terminal state
>> FINISHED. 2021-01-15 16:52:08,467 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the
>> JobMaster for job collect(84c9f12fe943bc7f32ee637666ed3bc1).
>> 2021-01-15 16:52:08,467 INFO
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] -
>> Suspending SlotPool. 2021-01-15 16:52:08,468 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Close
>> ResourceManager connection a76a3c5321498f13d0552421928c6062: Stopping
>> JobMaster for job collect(84c9f12fe943bc7f32ee637666ed3bc1)..
>> 2021-01-15 16:52:08,468 INFO
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Stopping
>> SlotPool. 2021-01-15 16:52:08,468 INFO
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager []
>> - Disconnect job manager
>> [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68
>> for job 84c9f12fe943bc7f32ee637666ed3bc1 from the resource manager. |
>>
>> On Fri, Jan 15, 2021 at 11:32 AM Timo Walther [hidden email]
>> <http://mailto:twalthr@...> wrote:
>>
>>     Hi Robert,
>>
>>     could you send us the error/stacktrace that is printed?
>>
>>     An example how it should work is shown here:
>>
>>     
>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java
>>
>>     
>> <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java>
>>
>>
>>     
>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java
>>
>>     
>> <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java>
>>
>>
>>     Regards,
>>     Timo
>>
>>     On 15.01.21 16:46, Robert Cullen wrote:
>>      > I’m using a query to gather results:
>>      >
>>      > |Table log_counts = tEnv.from("log_counts")
>>      > .filter($("hostname").isNotNull()
>>     .and($("hostname").isNotEqual("")))
>>      > .window(Tumble .over(lit(30).seconds())
>>     .on($("last_updated")).as("w"))
>>      > .groupBy($("msg_id"), $("hostname"), $("w")) .select($("msg_id"),
>>      > $("hostname"), $("msg_id").count().as("cnt")); |
>>      >
>>      > I’d like to iterate over the results but using this causes an
>> error:
>>      >
>>      > |log_counts.execute().collect();
>>      > |
>>      >
>>      > Has there been an update in how to traverse results?
>>      >
>>      >
>>      > Robert Cullen
>>      > 240-475-4490
>>      >
>>
>> --
>> Robert Cullen
>> 240-475-4490
>



--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: How to Iterate over results in Table API version 1.12.0

Timo Walther
Thanks for opening an issue

https://issues.apache.org/jira/browse/FLINK-20995

Feel free to ping me again in the issue if nobody responds.

Thanks,
Timo


On 15.01.21 18:21, Robert Cullen wrote:

> I'm running Flink (1.12.0) on a Kubernetes standalone deployment
> (Kubernetes Cluster Session Mode).
>
> What's the link to Flink's JIRA?
>
> On Fri, Jan 15, 2021 at 12:19 PM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     maybe Godfrey in CC knows more?
>
>
>     On 15.01.21 18:10, Timo Walther wrote:
>      > How are you running the Flink cluster? What is your deplyment?
>      >
>      > The exception clearly indicates that you found a bug. Could you
>     open an
>      > ticket in Flink's JIRA? We need details how to reproduce it.
>      >
>      > Thanks,
>      > Timo
>      >
>      >
>      > On 15.01.21 17:57, Robert Cullen wrote:
>      >> Timo,
>      >>
>      >> Here it is:
>      >>
>      >> |2021-01-15 16:52:00,628 WARN
>      >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] -
>      >> Configuring the job submission via query parameters is deprecated.
>      >> Please migrate to submitting a JSON request instead. 2021-01-15
>      >> 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - Starting
>      >> program (detached: true) 2021-01-15 16:52:00,678 INFO
>      >>
>     org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
>
>      >> [] - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted. 2021-01-15
>      >> 16:52:00,678 INFO
>      >>
>     org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
>
>      >> [] - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1.
>      >> 2021-01-15 16:52:00,830 INFO
>      >> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] -
>     Received
>      >> JobGraph submission 84c9f12fe943bc7f32ee637666ed3bc1 (collect).
>      >> 2021-01-15 16:52:00,830 INFO
>      >> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] -
>      >> Submitting job 84c9f12fe943bc7f32ee637666ed3bc1 (collect).
>     2021-01-15
>      >> 16:52:00,831 INFO
>     org.apache.flink.runtime.rpc.akka.AkkaRpcService []
>      >> - Starting RPC endpoint for
>      >> org.apache.flink.runtime.jobmaster.JobMaster at
>      >> akka://flink/user/rpc/jobmanager_68 . 2021-01-15 16:52:00,831 INFO
>      >> org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job
>      >> collect (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 16:52:00,832
>      >> INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using
>     restart
>      >> back off time strategy NoRestartBackoffTimeStrategy for collect
>      >> (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 16:52:00,832 INFO
>      >> org.apache.flink.runtime.jobmaster.JobMaster [] - Running
>      >> initialization on master for job collect
>      >> (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15 16:52:00,836 INFO
>      >> org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully ran
>      >> initialization on master in 3 ms. 2021-01-15 16:52:00,836 INFO
>      >>
>     org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology []
>      >> - Built 1 pipelined regions in 0 ms 2021-01-15 16:52:00,836 INFO
>      >> org.apache.flink.runtime.jobmaster.JobMaster [] - Using job/cluster
>      >> config to configure application-defined state backend: File State
>      >> Backend (checkpoints: 's3://flink/checkpoints', savepoints:
>      >> 's3://flink/savepoints', asynchronous: TRUE, fileStateThreshold:
>      >> 20480) 2021-01-15 16:52:00,836 INFO
>      >> org.apache.flink.runtime.jobmaster.JobMaster [] - Using
>      >> application-defined state backend: File State Backend (checkpoints:
>      >> 's3://flink/checkpoints', savepoints: 's3://flink/savepoints',
>      >> asynchronous: TRUE, fileStateThreshold: 20480) 2021-01-15
>     16:52:06,865
>      >> INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>     [] - No
>      >> checkpoint found during restore. 2021-01-15 16:52:06,866 INFO
>      >> org.apache.flink.runtime.jobmaster.JobMaster [] - Using failover
>      >> strategy
>      >>
>     org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@39b9ad24
>
>      >> for collect (84c9f12fe943bc7f32ee637666ed3bc1). 2021-01-15
>      >> 16:52:06,866 INFO
>      >> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl [] -
>      >> JobManager runner for job collect
>     (84c9f12fe943bc7f32ee637666ed3bc1)
>      >> was granted leadership with session id
>      >> 00000000-0000-0000-0000-000000000000 at
>      >> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_68.
>      >> 2021-01-15 16:52:06,866 INFO
>      >> org.apache.flink.runtime.jobmaster.JobMaster [] - Starting
>     execution
>      >> of job collect (84c9f12fe943bc7f32ee637666ed3bc1) under job
>     master id
>      >> 00000000000000000000000000000000. 2021-01-15 16:52:06,866 INFO
>      >> org.apache.flink.runtime.jobmaster.JobMaster [] - Starting
>     scheduling
>      >> with scheduling strategy
>      >>
>     [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
>
>      >> 2021-01-15 16:52:06,866 INFO
>      >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job
>      >> collect (84c9f12fe943bc7f32ee637666ed3bc1) switched from state
>     CREATED
>      >> to RUNNING. 2021-01-15 16:52:06,866 INFO
>      >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
>      >> JdbcTableSource(msg_id, hostname, last_updated) ->
>      >>
>     SourceConversion(table=[default_catalog.default_database.log_counts,
>      >> source: [JdbcTableSource(msg_id, hostname, last_updated)]],
>      >> fields=[msg_id, hostname, last_updated]) ->
>      >> WatermarkAssigner(rowtime=[last_updated],
>     watermark=[(last_updated -
>      >> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname,
>      >> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
>      >> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from CREATED to
>     SCHEDULED.
>      >> 2021-01-15 16:52:06,866 INFO
>      >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
>      >> GroupWindowAggregate(groupBy=[msg_id, hostname],
>      >> window=[TumblingGroupWindow('w, last_updated, 300000)],
>      >> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) ->
>      >> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) ->
>      >> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
>      >> (09cee06206ad355b327cb8487773cd39) switched from CREATED to
>     SCHEDULED.
>      >> 2021-01-15 16:52:06,866 INFO
>      >> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] -
>     Cannot
>      >> serve slot request, no ResourceManager connected. Adding as pending
>      >> request [SlotRequestId{b3f1b6b4eaab427305ec15f92a39531e}]
>     2021-01-15
>      >> 16:52:06,866 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
>      >> Connecting to ResourceManager
>      >>
>     akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)
>
>      >> 2021-01-15 16:52:06,867 INFO
>      >> org.apache.flink.runtime.jobmaster.JobMaster [] - Resolved
>      >> ResourceManager address, beginning registration 2021-01-15
>      >> 16:52:06,867 INFO
>      >>
>     org.apache.flink.runtime.resourcemanager.StandaloneResourceManager []
>      >> - Registering job manager
>      >>
>     [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68
>
>      >> for job 84c9f12fe943bc7f32ee637666ed3bc1. 2021-01-15
>     16:52:06,867 INFO
>      >>
>     org.apache.flink.runtime.resourcemanager.StandaloneResourceManager []
>      >> - Registered job manager
>      >>
>     [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68
>
>      >> for job 84c9f12fe943bc7f32ee637666ed3bc1. 2021-01-15
>     16:52:06,867 INFO
>      >> org.apache.flink.runtime.jobmaster.JobMaster [] - JobManager
>      >> successfully registered at ResourceManager, leader id:
>      >> 00000000000000000000000000000000. 2021-01-15 16:52:06,867 INFO
>      >> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] -
>      >> Requesting new slot
>     [SlotRequestId{b3f1b6b4eaab427305ec15f92a39531e}]
>      >> and profile ResourceProfile{UNKNOWN} with allocation id
>      >> b0559997a428b1d31d9e57d6532e026b from resource manager. 2021-01-15
>      >> 16:52:06,868 INFO
>      >>
>     org.apache.flink.runtime.resourcemanager.StandaloneResourceManager []
>      >> - Request slot with profile ResourceProfile{UNKNOWN} for job
>      >> 84c9f12fe943bc7f32ee637666ed3bc1 with allocation id
>      >> b0559997a428b1d31d9e57d6532e026b. 2021-01-15 16:52:06,874 INFO
>      >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
>      >> JdbcTableSource(msg_id, hostname, last_updated) ->
>      >>
>     SourceConversion(table=[default_catalog.default_database.log_counts,
>      >> source: [JdbcTableSource(msg_id, hostname, last_updated)]],
>      >> fields=[msg_id, hostname, last_updated]) ->
>      >> WatermarkAssigner(rowtime=[last_updated],
>     watermark=[(last_updated -
>      >> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname,
>      >> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
>      >> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from SCHEDULED to
>      >> DEPLOYING. 2021-01-15 16:52:06,882 INFO
>      >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
>     Deploying
>      >> Source: JdbcTableSource(msg_id, hostname, last_updated) ->
>      >>
>     SourceConversion(table=[default_catalog.default_database.log_counts,
>      >> source: [JdbcTableSource(msg_id, hostname, last_updated)]],
>      >> fields=[msg_id, hostname, last_updated]) ->
>      >> WatermarkAssigner(rowtime=[last_updated],
>     watermark=[(last_updated -
>      >> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname,
>      >> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1) (attempt
>     #0)
>      >> with attempt id d22b3ac56f07e182ba5b74d68fa74fb1 to
>      >> 10.42.1.148:6122-9b9553 @ 10.42.1.148 (dataPort=40391) with
>     allocation
>      >> id b0559997a428b1d31d9e57d6532e026b 2021-01-15 16:52:06,883 INFO
>      >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
>      >> GroupWindowAggregate(groupBy=[msg_id, hostname],
>      >> window=[TumblingGroupWindow('w, last_updated, 300000)],
>      >> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) ->
>      >> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) ->
>      >> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
>      >> (09cee06206ad355b327cb8487773cd39) switched from SCHEDULED to
>      >> DEPLOYING. 2021-01-15 16:52:06,883 INFO
>      >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
>     Deploying
>      >> GroupWindowAggregate(groupBy=[msg_id, hostname],
>      >> window=[TumblingGroupWindow('w, last_updated, 300000)],
>      >> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) ->
>      >> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) ->
>      >> SinkConversionToTuple2 -> Sink: Select table sink (1/1) (attempt
>     #0)
>      >> with attempt id 09cee06206ad355b327cb8487773cd39 to
>      >> 10.42.1.148:6122-9b9553 @ 10.42.1.148 (dataPort=40391) with
>     allocation
>      >> id b0559997a428b1d31d9e57d6532e026b 2021-01-15 16:52:07,038 INFO
>      >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
>      >> GroupWindowAggregate(groupBy=[msg_id, hostname],
>      >> window=[TumblingGroupWindow('w, last_updated, 300000)],
>      >> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) ->
>      >> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) ->
>      >> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
>      >> (09cee06206ad355b327cb8487773cd39) switched from DEPLOYING to
>     RUNNING.
>      >> 2021-01-15 16:52:07,038 INFO
>      >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
>      >> JdbcTableSource(msg_id, hostname, last_updated) ->
>      >>
>     SourceConversion(table=[default_catalog.default_database.log_counts,
>      >> source: [JdbcTableSource(msg_id, hostname, last_updated)]],
>      >> fields=[msg_id, hostname, last_updated]) ->
>      >> WatermarkAssigner(rowtime=[last_updated],
>     watermark=[(last_updated -
>      >> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname,
>      >> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
>      >> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from DEPLOYING to
>     RUNNING.
>      >> 2021-01-15 16:52:07,057 INFO
>      >>
>     org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator
>
>      >> [] - Received sink socket server address: /10.42.1.148:39303
>     <http://10.42.1.148:39303>
>      >> <http://10.42.1.148:39303 <http://10.42.1.148:39303>> 2021-01-15
>     16:52:07,060 WARN
>      >> org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
>      >> hostname could be resolved for the IP address 10.42.1.148, using IP
>      >> address as host name. Local input split assignment (such as for
>     HDFS
>      >> files) may be impacted. 2021-01-15 16:52:07,988 WARN
>      >>
>     org.apache.flink.client.deployment.application.DetachedApplicationRunner
>
>      >> [] - Could not execute application:
>      >> org.apache.flink.client.program.ProgramInvocationException: The
>     main
>      >> method caused an error: Failed to execute sql at
>      >>
>     org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
>
>      >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>      >>
>     org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>
>      >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>      >>
>     org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>
>      >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>      >>
>     org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
>
>      >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>      >>
>     org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
>
>      >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>      >>
>     org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
>
>      >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>      >>
>     java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>
>      >> [?:1.8.0_275] at
>      >>
>     java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     [?:1.8.0_275]
>      >> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>      >> [?:1.8.0_275] at
>      >>
>     java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>
>      >> [?:1.8.0_275] at
>      >>
>     java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>
>      >> [?:1.8.0_275] at
>      >>
>     java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>      >> [?:1.8.0_275] at
>      >>
>     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>      >> [?:1.8.0_275] at java.lang.Thread.run(Thread.java:748)
>     [?:1.8.0_275]
>      >> Caused by: org.apache.flink.table.api.TableException: Failed to
>      >> execute sql at
>      >>
>     org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:719)
>
>      >> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at
>      >>
>     org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570)
>
>      >> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at
>      >>
>     io.cmdaa.streaming.java.jdbc.TableConsumer.main(TableConsumer.java:192)
>     ~[?:?]
>      >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>      >> ~[?:1.8.0_275] at
>      >>
>     sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>      >> ~[?:1.8.0_275] at
>      >>
>     sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>      >> ~[?:1.8.0_275] at java.lang.reflect.Method.invoke(Method.java:498)
>      >> ~[?:1.8.0_275] at
>      >>
>     org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
>
>      >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] ... 13 more Caused by:
>      >> java.lang.IllegalArgumentException: Job client must be a
>      >> CoordinationRequestGateway. This is a bug. at
>      >>
>     org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:142)
>
>      >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>      >>
>     org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.setJobClient(CollectResultFetcher.java:95)
>
>      >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>      >>
>     org.apache.flink.streaming.api.operators.collect.CollectResultIterator.setJobClient(CollectResultIterator.java:98)
>
>      >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
>      >>
>     org.apache.flink.table.planner.sinks.SelectTableSinkBase$1.setJobClient(SelectTableSinkBase.java:93)
>
>      >> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at
>      >>
>     org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:709)
>
>      >> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at
>      >>
>     org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570)
>
>      >> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0] at
>      >>
>     io.cmdaa.streaming.java.jdbc.TableConsumer.main(TableConsumer.java:192)
>     ~[?:?]
>      >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>      >> ~[?:1.8.0_275] at
>      >>
>     sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>      >> ~[?:1.8.0_275] at
>      >>
>     sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>      >> ~[?:1.8.0_275] at java.lang.reflect.Method.invoke(Method.java:498)
>      >> ~[?:1.8.0_275] at
>      >>
>     org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
>
>      >> ~[flink-dist_2.12-1.12.0.jar:1.12.0] ... 13 more 2021-01-15
>      >> 16:52:07,989 ERROR
>      >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] -
>      >> Exception occurred in REST handler: Could not execute application.
>      >> 2021-01-15 16:52:08,462 INFO
>      >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
>      >> JdbcTableSource(msg_id, hostname, last_updated) ->
>      >>
>     SourceConversion(table=[default_catalog.default_database.log_counts,
>      >> source: [JdbcTableSource(msg_id, hostname, last_updated)]],
>      >> fields=[msg_id, hostname, last_updated]) ->
>      >> WatermarkAssigner(rowtime=[last_updated],
>     watermark=[(last_updated -
>      >> 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id, hostname,
>      >> last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
>      >> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from RUNNING to
>     FINISHED.
>      >> 2021-01-15 16:52:08,465 INFO
>      >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
>      >> GroupWindowAggregate(groupBy=[msg_id, hostname],
>      >> window=[TumblingGroupWindow('w, last_updated, 300000)],
>      >> select=[msg_id, hostname, COUNT(msg_id) AS EXPR$0]) ->
>      >> Calc(select=[msg_id, hostname, EXPR$0 AS cnt]) ->
>      >> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
>      >> (09cee06206ad355b327cb8487773cd39) switched from RUNNING to
>     FINISHED.
>      >> 2021-01-15 16:52:08,465 INFO
>      >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job
>      >> collect (84c9f12fe943bc7f32ee637666ed3bc1) switched from state
>     RUNNING
>      >> to FINISHED. 2021-01-15 16:52:08,466 INFO
>      >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] -
>      >> Stopping checkpoint coordinator for job
>      >> 84c9f12fe943bc7f32ee637666ed3bc1. 2021-01-15 16:52:08,466 INFO
>      >>
>     org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore
>      >> [] - Shutting down 2021-01-15 16:52:08,466 INFO
>      >> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
>      >> 84c9f12fe943bc7f32ee637666ed3bc1 reached globally terminal state
>      >> FINISHED. 2021-01-15 16:52:08,467 INFO
>      >> org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the
>      >> JobMaster for job collect(84c9f12fe943bc7f32ee637666ed3bc1).
>      >> 2021-01-15 16:52:08,467 INFO
>      >> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] -
>      >> Suspending SlotPool. 2021-01-15 16:52:08,468 INFO
>      >> org.apache.flink.runtime.jobmaster.JobMaster [] - Close
>      >> ResourceManager connection a76a3c5321498f13d0552421928c6062:
>     Stopping
>      >> JobMaster for job collect(84c9f12fe943bc7f32ee637666ed3bc1)..
>      >> 2021-01-15 16:52:08,468 INFO
>      >> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] -
>     Stopping
>      >> SlotPool. 2021-01-15 16:52:08,468 INFO
>      >>
>     org.apache.flink.runtime.resourcemanager.StandaloneResourceManager []
>      >> - Disconnect job manager
>      >>
>     [hidden email]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68
>
>      >> for job 84c9f12fe943bc7f32ee637666ed3bc1 from the resource
>     manager. |
>      >>
>      >> On Fri, Jan 15, 2021 at 11:32 AM Timo Walther [hidden email]
>     <mailto:[hidden email]>
>      >> <http://mailto:twalthr@...
>     <http://mailto:twalthr@...>> wrote:
>      >>
>      >>     Hi Robert,
>      >>
>      >>     could you send us the error/stacktrace that is printed?
>      >>
>      >>     An example how it should work is shown here:
>      >>
>      >>
>      >>
>     https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java
>     <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java>
>
>      >>
>      >>
>      >>
>     <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java
>     <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java>>
>
>      >>
>      >>
>      >>
>      >>
>     https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java
>     <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java>
>
>      >>
>      >>
>      >>
>     <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java
>     <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java>>
>
>      >>
>      >>
>      >>     Regards,
>      >>     Timo
>      >>
>      >>     On 15.01.21 16:46, Robert Cullen wrote:
>      >>      > I’m using a query to gather results:
>      >>      >
>      >>      > |Table log_counts = tEnv.from("log_counts")
>      >>      > .filter($("hostname").isNotNull()
>      >>     .and($("hostname").isNotEqual("")))
>      >>      > .window(Tumble .over(lit(30).seconds())
>      >>     .on($("last_updated")).as("w"))
>      >>      > .groupBy($("msg_id"), $("hostname"), $("w"))
>     .select($("msg_id"),
>      >>      > $("hostname"), $("msg_id").count().as("cnt")); |
>      >>      >
>      >>      > I’d like to iterate over the results but using this
>     causes an
>      >> error:
>      >>      >
>      >>      > |log_counts.execute().collect();
>      >>      > |
>      >>      >
>      >>      > Has there been an update in how to traverse results?
>      >>      >
>      >>      >
>      >>      > Robert Cullen
>      >>      > 240-475-4490
>      >>      >
>      >>
>      >> --
>      >> Robert Cullen
>      >> 240-475-4490
>      >
>
>
>
> --
> Robert Cullen
> 240-475-4490