How can I use the Table [Print SQL connector][1]? I tried the
following (batch mode) but it does not give any output: EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); final LocalDateTime DATE_TIME = LocalDateTime.of(2020, 1, 1, 0, 0); Table transactions = tEnv.fromValues( DataTypes.ROW( DataTypes.FIELD("account_id", DataTypes.BIGINT()), DataTypes.FIELD("amount", DataTypes.BIGINT()), DataTypes.FIELD("transaction_time", DataTypes.TIMESTAMP(3))), Row.of(1, 188, DATE_TIME.plusMinutes(12)), Row.of(2, 374, DATE_TIME.plusMinutes(47)), Row.of(3, 112, DATE_TIME.plusMinutes(36)), Row.of(4, 478, DATE_TIME.plusMinutes(3)), Row.of(5, 208, DATE_TIME.plusMinutes(8)), Row.of(1, 379, DATE_TIME.plusMinutes(53)), Row.of(2, 351, DATE_TIME.plusMinutes(32)), Row.of(3, 320, DATE_TIME.plusMinutes(31)), Row.of(4, 259, DATE_TIME.plusMinutes(19)), Row.of(5, 273, DATE_TIME.plusMinutes(42))); tEnv.executeSql("CREATE TABLE print_table(account_id BIGINT, amount BIGINT, transaction_time TIMESTAMP) WITH ('connector' = 'print')"); transactions.executeInsert("print_table"); I can "materialize" the result manually and print them out with : for (Row row : materialize(transactions.execute())) { System.out.println(row); } private static List<Row> materialize(TableResult results) { try (CloseableIterator<Row> resultIterator = results.collect()) { return StreamSupport .stream(Spliterators.spliteratorUnknownSize(resultIterator, Spliterator.ORDERED), false) .collect(Collectors.toList()); } catch (Exception e) { throw new RuntimeException("Failed to materialize results", e); } } But I would like to know why I can't just use the Print sink. I've tried with `.inBatchMode()` and with `inStreamingMode()`, so I don't thinks it's that. Does anybody know of any working example involving the print connector? [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html -- /Rubén |
You should be able to use the "print" sink. Remember though that the
"print" sink prints into the stdout/stderr of TaskManagers, not the Client, where you submit the query. This is different from the TableResult, which collects results in the client. BTW, for printing you can use TableResult#print, which will nicely format your results. Best, Dawid On 29/10/2020 16:13, Ruben Laguna wrote: > How can I use the Table [Print SQL connector][1]? I tried the > following (batch mode) but it does not give any output: > > > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment tEnv = TableEnvironment.create(settings); > > final LocalDateTime DATE_TIME = LocalDateTime.of(2020, 1, 1, 0, 0); > > Table transactions = > tEnv.fromValues( > DataTypes.ROW( > DataTypes.FIELD("account_id", DataTypes.BIGINT()), > DataTypes.FIELD("amount", DataTypes.BIGINT()), > DataTypes.FIELD("transaction_time", > DataTypes.TIMESTAMP(3))), > Row.of(1, 188, DATE_TIME.plusMinutes(12)), > Row.of(2, 374, DATE_TIME.plusMinutes(47)), > Row.of(3, 112, DATE_TIME.plusMinutes(36)), > Row.of(4, 478, DATE_TIME.plusMinutes(3)), > Row.of(5, 208, DATE_TIME.plusMinutes(8)), > Row.of(1, 379, DATE_TIME.plusMinutes(53)), > Row.of(2, 351, DATE_TIME.plusMinutes(32)), > Row.of(3, 320, DATE_TIME.plusMinutes(31)), > Row.of(4, 259, DATE_TIME.plusMinutes(19)), > Row.of(5, 273, DATE_TIME.plusMinutes(42))); > tEnv.executeSql("CREATE TABLE print_table(account_id BIGINT, amount > BIGINT, transaction_time TIMESTAMP) WITH ('connector' = 'print')"); > > transactions.executeInsert("print_table"); > > > I can "materialize" the result manually and print them out with : > > for (Row row : materialize(transactions.execute())) { > System.out.println(row); > } > > private static List<Row> materialize(TableResult results) { > try (CloseableIterator<Row> resultIterator = results.collect()) { > return StreamSupport > > .stream(Spliterators.spliteratorUnknownSize(resultIterator, > Spliterator.ORDERED), false) > .collect(Collectors.toList()); > } catch (Exception e) { > throw new RuntimeException("Failed to materialize results", e); > } > } > > > But I would like to know why I can't just use the Print sink. > > I've tried with `.inBatchMode()` and with `inStreamingMode()`, so I > don't thinks it's that. > > Does anybody know of any working example involving the print connector? > > > > [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html signature.asc (849 bytes) Download Attachment |
Hi,
Using `mytable.execute().print()` is exactly what I wanted, thanks. But I'm still curious. I'm just running this locally, in a junit test case (not using a flink cluster) just like in [flink-playgrounds SpendReportTest][1] so in this scenario where does the task manager (if there is taskmanager) output go? I just added src/test/resources/log4j.properties with # Root logger option log4j.rootLogger=INFO, stdout # Direct log messages to stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n and still I don't see anything from the print sink, and I even run it with the debugger and I can see that although PrintSink#getSinkRuntimeProvider is called , the RowDataPrintFunction#invoke is never called. SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/ecerulm/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/ecerulm/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 2020-10-29 18:18:32 INFO MiniCluster:253 - Starting Flink Mini Cluster 2020-10-29 18:18:32 INFO MiniCluster:262 - Starting Metrics Registry 2020-10-29 18:18:32 INFO MetricRegistryImpl:122 - No metrics reporter configured, no metrics will be exposed/reported. 2020-10-29 18:18:32 INFO MiniCluster:266 - Starting RPC Service(s) 2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:247 - Trying to start local actor system 2020-10-29 18:18:32 INFO Slf4jLogger:92 - Slf4jLogger started 2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:278 - Actor system started at akka://flink 2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:247 - Trying to start local actor system 2020-10-29 18:18:32 INFO Slf4jLogger:92 - Slf4jLogger started 2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:278 - Actor system started at akka://flink-metrics 2020-10-29 18:18:32 INFO AkkaRpcService:225 - Starting RPC endpoint for org.apache.flink.runtime.metrics.dump.MetricQueryService at akka://flink-metrics/user/rpc/MetricQueryService . 2020-10-29 18:18:32 INFO MiniCluster:432 - Starting high-availability services 2020-10-29 18:18:32 INFO BlobServer:143 - Created BLOB server storage directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-30bb2435-c664-4d1e-8c74-80cb54157860 2020-10-29 18:18:32 INFO BlobServer:207 - Started BLOB server at 0.0.0.0:50965 - max concurrent requests: 50 - max backlog: 1000 2020-10-29 18:18:32 INFO PermanentBlobCache:107 - Created BLOB cache storage directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-7a0bd0fc-2864-4a66-afd6-5f117d515b07 2020-10-29 18:18:32 INFO TransientBlobCache:107 - Created BLOB cache storage directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-fd96dacc-4ce4-447a-ba3f-d2ce453a1d30 2020-10-29 18:18:32 INFO MiniCluster:519 - Starting 1 TaskManger(s) 2020-10-29 18:18:32 INFO TaskManagerRunner:412 - Starting TaskManager with ResourceID: 2358fbac-908d-4aa2-b643-c32d44b40193 2020-10-29 18:18:32 INFO TaskManagerServices:411 - Temporary file directory '/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T': total 233 GB, usable 25 GB (10.73% usable) 2020-10-29 18:18:32 INFO FileChannelManagerImpl:97 - FileChannelManager uses directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-io-9aa7adae-549f-461c-95d8-6fe47f31b695 for spill files. 2020-10-29 18:18:32 INFO FileChannelManagerImpl:97 - FileChannelManager uses directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-netty-shuffle-c8e6cc04-6f93-4f55-a2c5-88891c9e9e49 for spill files. 2020-10-29 18:18:32 INFO NetworkBufferPool:139 - Allocated 64 MB for network buffer pool (number of memory segments: 2048, bytes per segment: 32768). 2020-10-29 18:18:32 INFO NettyShuffleEnvironment:293 - Starting the network environment and its components. 2020-10-29 18:18:32 INFO KvStateService:89 - Starting the kvState service and its components. 2020-10-29 18:18:32 INFO AkkaRpcService:225 - Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at akka://flink/user/rpc/taskmanager_0 . 2020-10-29 18:18:32 INFO DefaultJobLeaderService:116 - Start job leader service. 2020-10-29 18:18:32 INFO FileCache:107 - User file cache uses directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-dist-cache-4ca7b653-8aa5-4967-b23e-574c43ab7b52 2020-10-29 18:18:32 INFO DispatcherRestEndpoint:140 - Starting rest endpoint. 2020-10-29 18:18:32 INFO DispatcherRestEndpoint:126 - Failed to load web based job submission extension. Probable reason: flink-runtime-web is not in the classpath. 2020-10-29 18:18:32 WARN WebMonitorUtils:85 - Log file environment variable 'log.file' is not set. 2020-10-29 18:18:32 WARN WebMonitorUtils:91 - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'web.log.path'. 2020-10-29 18:18:33 INFO DispatcherRestEndpoint:236 - Rest endpoint listening at localhost:50966 2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing leadership to contender http://localhost:50966 2020-10-29 18:18:33 INFO DispatcherRestEndpoint:821 - http://localhost:50966 was granted leadership with leaderSessionID=5d1c56ab-6894-42d9-bc76-716ea59bd473 2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received confirmation of leadership for leader http://localhost:50966 , session=5d1c56ab-6894-42d9-bc76-716ea59bd473 2020-10-29 18:18:33 INFO AkkaRpcService:225 - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/rpc/resourcemanager_1 . 2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing leadership to contender LeaderContender: DefaultDispatcherRunner 2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing leadership to contender LeaderContender: StandaloneResourceManager 2020-10-29 18:18:33 INFO StandaloneResourceManager:1026 - ResourceManager akka://flink/user/rpc/resourcemanager_1 was granted leadership with fencing token a8e96f157326e3f60a2df92bb1364f97 2020-10-29 18:18:33 INFO MiniCluster:372 - Flink Mini Cluster started successfully 2020-10-29 18:18:33 INFO SlotManagerImpl:284 - Starting the SlotManager. 2020-10-29 18:18:33 INFO SessionDispatcherLeaderProcess:102 - Start SessionDispatcherLeaderProcess. 2020-10-29 18:18:33 INFO SessionDispatcherLeaderProcess:120 - Recover all persisted job graphs. 2020-10-29 18:18:33 INFO SessionDispatcherLeaderProcess:128 - Successfully recovered 0 persisted job graphs. 2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received confirmation of leadership for leader akka://flink/user/rpc/resourcemanager_1 , session=0a2df92b-b136-4f97-a8e9-6f157326e3f6 2020-10-29 18:18:33 INFO TaskExecutor:1128 - Connecting to ResourceManager akka://flink/user/rpc/resourcemanager_1(a8e96f157326e3f60a2df92bb1364f97). 2020-10-29 18:18:33 INFO AkkaRpcService:225 - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/rpc/dispatcher_2 . 2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received confirmation of leadership for leader akka://flink/user/rpc/dispatcher_2 , session=3ce5d1bf-2d6d-49d6-b315-717f66748938 2020-10-29 18:18:33 INFO TaskExecutor:155 - Resolved ResourceManager address, beginning registration 2020-10-29 18:18:33 INFO StandaloneResourceManager:821 - Registering TaskManager with ResourceID 2358fbac-908d-4aa2-b643-c32d44b40193 (akka://flink/user/rpc/taskmanager_0) at ResourceManager 2020-10-29 18:18:33 INFO TaskExecutor:84 - Successful registration at resource manager akka://flink/user/rpc/resourcemanager_1 under registration id 0be76fda7dfc8204153de66b83ba5621. 2020-10-29 18:18:33 INFO StandaloneDispatcher:295 - Received JobGraph submission 1fd25ab4d3d51009542ebda2bbadb55d (insert-into_default_catalog.default_database.print_table). 2020-10-29 18:18:33 INFO StandaloneDispatcher:352 - Submitting job 1fd25ab4d3d51009542ebda2bbadb55d (insert-into_default_catalog.default_database.print_table). 2020-10-29 18:18:33 INFO AkkaRpcService:225 - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_3 . 2020-10-29 18:18:33 INFO JobMaster:288 - Initializing job insert-into_default_catalog.default_database.print_table (1fd25ab4d3d51009542ebda2bbadb55d). 2020-10-29 18:18:33 INFO JobMaster:84 - Using restart back off time strategy NoRestartBackoffTimeStrategy for insert-into_default_catalog.default_database.print_table (1fd25ab4d3d51009542ebda2bbadb55d). 2020-10-29 18:18:33 INFO JobMaster:211 - Running initialization on master for job insert-into_default_catalog.default_database.print_table (1fd25ab4d3d51009542ebda2bbadb55d). 2020-10-29 18:18:33 INFO JobMaster:229 - Successfully ran initialization on master in 3 ms. 2020-10-29 18:18:33 INFO DefaultExecutionTopology:111 - Built 1 pipelined regions in 0 ms 2020-10-29 18:18:33 INFO JobMaster:231 - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 2020-10-29 18:18:33 INFO JobMaster:165 - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5da7a3b6 for insert-into_default_catalog.default_database.print_table (1fd25ab4d3d51009542ebda2bbadb55d). 2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing leadership to contender akka://flink/user/rpc/jobmanager_3 2020-10-29 18:18:33 INFO JobManagerRunnerImpl:305 - JobManager runner for job insert-into_default_catalog.default_database.print_table (1fd25ab4d3d51009542ebda2bbadb55d) was granted leadership with session id 47a72e91-c0bf-4b40-8b9f-639f07344b3f at akka://flink/user/rpc/jobmanager_3. 2020-10-29 18:18:33 INFO JobMaster:799 - Starting execution of job insert-into_default_catalog.default_database.print_table (1fd25ab4d3d51009542ebda2bbadb55d) under job master id 8b9f639f07344b3f47a72e91c0bf4b40. 2020-10-29 18:18:33 INFO JobMaster:197 - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy] 2020-10-29 18:18:33 INFO ExecutionGraph:1253 - Job insert-into_default_catalog.default_database.print_table (1fd25ab4d3d51009542ebda2bbadb55d) switched from state CREATED to RUNNING. 2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Source: Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01 00:12:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount, CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount, CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount, CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount, CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount, CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount, CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount, CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount, CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount, CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time])) (1/1) (1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0) switched from CREATED to SCHEDULED. 2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Sink: Sink(table=[default_catalog.default_database.print_table], fields=[account_id, amount, transaction_time]) (1/1) (1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0) switched from CREATED to SCHEDULED. 2020-10-29 18:18:33 INFO SlotPoolImpl:385 - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{3e5c5fdf21b3314a9289b257aeec8c2d}] 2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received confirmation of leadership for leader akka://flink/user/rpc/jobmanager_3 , session=47a72e91-c0bf-4b40-8b9f-639f07344b3f 2020-10-29 18:18:33 INFO JobMaster:1031 - Connecting to ResourceManager akka://flink/user/rpc/resourcemanager_1(a8e96f157326e3f60a2df92bb1364f97) 2020-10-29 18:18:33 INFO JobMaster:155 - Resolved ResourceManager address, beginning registration 2020-10-29 18:18:33 INFO StandaloneResourceManager:330 - Registering job manager 8b9f639f07344b3f47a72e91c0bf4b40@akka://flink/user/rpc/jobmanager_3 for job 1fd25ab4d3d51009542ebda2bbadb55d. 2020-10-29 18:18:33 INFO StandaloneResourceManager:765 - Registered job manager 8b9f639f07344b3f47a72e91c0bf4b40@akka://flink/user/rpc/jobmanager_3 for job 1fd25ab4d3d51009542ebda2bbadb55d. 2020-10-29 18:18:33 INFO JobMaster:1053 - JobManager successfully registered at ResourceManager, leader id: a8e96f157326e3f60a2df92bb1364f97. 2020-10-29 18:18:33 INFO SlotPoolImpl:347 - Requesting new slot [SlotRequestId{3e5c5fdf21b3314a9289b257aeec8c2d}] and profile ResourceProfile{UNKNOWN} with allocation id 45ea6506d06f73f61a36db764ce07ba7 from resource manager. 2020-10-29 18:18:33 INFO StandaloneResourceManager:464 - Request slot with profile ResourceProfile{UNKNOWN} for job 1fd25ab4d3d51009542ebda2bbadb55d with allocation id 45ea6506d06f73f61a36db764ce07ba7. 2020-10-29 18:18:33 INFO TaskExecutor:908 - Receive slot request 45ea6506d06f73f61a36db764ce07ba7 for job 1fd25ab4d3d51009542ebda2bbadb55d from resource manager with leader id a8e96f157326e3f60a2df92bb1364f97. 2020-10-29 18:18:33 INFO TaskExecutor:976 - Allocated slot for 45ea6506d06f73f61a36db764ce07ba7. 2020-10-29 18:18:33 INFO DefaultJobLeaderService:172 - Add job 1fd25ab4d3d51009542ebda2bbadb55d for job leader monitoring. 2020-10-29 18:18:33 INFO DefaultJobLeaderService:314 - Try to register at job manager akka://flink/user/rpc/jobmanager_3 with leader id 47a72e91-c0bf-4b40-8b9f-639f07344b3f. 2020-10-29 18:18:33 INFO DefaultJobLeaderService:155 - Resolved JobManager address, beginning registration 2020-10-29 18:18:33 INFO DefaultJobLeaderService:369 - Successful registration at job manager akka://flink/user/rpc/jobmanager_3 for job 1fd25ab4d3d51009542ebda2bbadb55d. 2020-10-29 18:18:33 INFO TaskExecutor:1379 - Establish JobManager connection for job 1fd25ab4d3d51009542ebda2bbadb55d. 2020-10-29 18:18:33 INFO TaskExecutor:1278 - Offer reserved slots to the leader of job 1fd25ab4d3d51009542ebda2bbadb55d. 2020-10-29 18:18:33 INFO PermanentBlobCache:251 - Shutting down BLOB cache 2020-10-29 18:18:33 INFO TaskExecutorLocalStateStoresManager:213 - Shutting down TaskExecutorLocalStateStoresManager. 2020-10-29 18:18:33 INFO TransientBlobCache:251 - Shutting down BLOB cache 2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Source: Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01 00:12:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount, CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount, CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount, CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount, CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount, CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount, CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount, CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount, CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount, CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time])) (1/1) (1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0) switched from SCHEDULED to DEPLOYING. 2020-10-29 18:18:33 INFO ExecutionGraph:725 - Deploying Source: Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01 00:12:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount, CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount, CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount, CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount, CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount, CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount, CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount, CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount, CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]), Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount, CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time])) (1/1) (attempt #0) with attempt id 1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0 to 2358fbac-908d-4aa2-b643-c32d44b40193 @ localhost (dataPort=-1) with allocation id 45ea6506d06f73f61a36db764ce07ba7 2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Sink: Sink(table=[default_catalog.default_database.print_table], fields=[account_id, amount, transaction_time]) (1/1) (1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0) switched from SCHEDULED to DEPLOYING. 2020-10-29 18:18:33 INFO ExecutionGraph:725 - Deploying Sink: Sink(table=[default_catalog.default_database.print_table], fields=[account_id, amount, transaction_time]) (1/1) (attempt #0) with attempt id 1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0 to 2358fbac-908d-4aa2-b643-c32d44b40193 @ localhost (dataPort=-1) with allocation id 45ea6506d06f73f61a36db764ce07ba7 2020-10-29 18:18:33 INFO TaskSlotTableImpl:361 - Activate slot 45ea6506d06f73f61a36db764ce07ba7. 2020-10-29 18:18:33 INFO FileChannelManagerImpl:146 - FileChannelManager removed spill file directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-io-9aa7adae-549f-461c-95d8-6fe47f31b695 2020-10-29 18:18:33 INFO BlobServer:348 - Stopped BLOB server at 0.0.0.0:50965 2020-10-29 18:18:33 INFO FileChannelManagerImpl:146 - FileChannelManager removed spill file directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-netty-shuffle-c8e6cc04-6f93-4f55-a2c5-88891c9e9e49 2020-10-29 18:18:33 INFO FileCache:153 - removed file cache directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-dist-cache-4ca7b653-8aa5-4967-b23e-574c43ab7b52 Process finished with exit code 0 [1]: https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/src/test/java/org/apache/flink/playgrounds/spendreport/SpendReportTest.java On Thu, Oct 29, 2020 at 4:53 PM Dawid Wysakowicz <[hidden email]> wrote: > > You should be able to use the "print" sink. Remember though that the > "print" sink prints into the stdout/stderr of TaskManagers, not the > Client, where you submit the query. This is different from the > TableResult, which collects results in the client. BTW, for printing you > can use TableResult#print, which will nicely format your results. > > Best, > > Dawid > > On 29/10/2020 16:13, Ruben Laguna wrote: > > How can I use the Table [Print SQL connector][1]? I tried the > > following (batch mode) but it does not give any output: > > > > > > EnvironmentSettings settings = > > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > > TableEnvironment tEnv = TableEnvironment.create(settings); > > > > final LocalDateTime DATE_TIME = LocalDateTime.of(2020, 1, 1, 0, 0); > > > > Table transactions = > > tEnv.fromValues( > > DataTypes.ROW( > > DataTypes.FIELD("account_id", DataTypes.BIGINT()), > > DataTypes.FIELD("amount", DataTypes.BIGINT()), > > DataTypes.FIELD("transaction_time", > > DataTypes.TIMESTAMP(3))), > > Row.of(1, 188, DATE_TIME.plusMinutes(12)), > > Row.of(2, 374, DATE_TIME.plusMinutes(47)), > > Row.of(3, 112, DATE_TIME.plusMinutes(36)), > > Row.of(4, 478, DATE_TIME.plusMinutes(3)), > > Row.of(5, 208, DATE_TIME.plusMinutes(8)), > > Row.of(1, 379, DATE_TIME.plusMinutes(53)), > > Row.of(2, 351, DATE_TIME.plusMinutes(32)), > > Row.of(3, 320, DATE_TIME.plusMinutes(31)), > > Row.of(4, 259, DATE_TIME.plusMinutes(19)), > > Row.of(5, 273, DATE_TIME.plusMinutes(42))); > > tEnv.executeSql("CREATE TABLE print_table(account_id BIGINT, amount > > BIGINT, transaction_time TIMESTAMP) WITH ('connector' = 'print')"); > > > > transactions.executeInsert("print_table"); > > > > > > I can "materialize" the result manually and print them out with : > > > > for (Row row : materialize(transactions.execute())) { > > System.out.println(row); > > } > > > > private static List<Row> materialize(TableResult results) { > > try (CloseableIterator<Row> resultIterator = results.collect()) { > > return StreamSupport > > > > .stream(Spliterators.spliteratorUnknownSize(resultIterator, > > Spliterator.ORDERED), false) > > .collect(Collectors.toList()); > > } catch (Exception e) { > > throw new RuntimeException("Failed to materialize results", e); > > } > > } > > > > > > But I would like to know why I can't just use the Print sink. > > > > I've tried with `.inBatchMode()` and with `inStreamingMode()`, so I > > don't thinks it's that. > > > > Does anybody know of any working example involving the print connector? > > > > > > > > [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html > -- /Rubén |
Hi,
The problem in your case is that you exit before anything is printed out. The method executeInsert executes the query, but it does not wait for the query to finish. Therefore your main/test method returns, bringing down the local cluster, before anything is printed out. You can e.g. add TableResult result = transactions.executeInsert("print_table"); result.await(); which will wait for the insert to finish. The print sink prints into the stdout/stderr directly, therefore none of the logging configurations apply in this case. Best, Dawid On 29/10/2020 18:29, Ruben Laguna wrote: > Hi, > > Using `mytable.execute().print()` is exactly what I wanted, thanks. > > But I'm still curious. I'm just running this locally, in a junit test > case (not using a flink > cluster) just like in [flink-playgrounds SpendReportTest][1] so in > this scenario where does the task manager (if there is taskmanager) > output go? > > I just added src/test/resources/log4j.properties with > > # Root logger option > log4j.rootLogger=INFO, stdout > > # Direct log messages to stdout > log4j.appender.stdout=org.apache.log4j.ConsoleAppender > log4j.appender.stdout.Target=System.out > log4j.appender.stdout.layout=org.apache.log4j.PatternLayout > log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} > %-5p %c{1}:%L - %m%n > > and still I don't see anything from the print sink, and I even run it > with the debugger and I can see that although > PrintSink#getSinkRuntimeProvider is called , the > RowDataPrintFunction#invoke is never called. > > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/Users/ecerulm/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/Users/ecerulm/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The > configuration option taskmanager.cpu.cores required for local > execution is not set, setting it to the maximal possible value. > 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The > configuration option taskmanager.memory.task.heap.size required for > local execution is not set, setting it to the maximal possible value. > 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The > configuration option taskmanager.memory.task.off-heap.size required > for local execution is not set, setting it to the maximal possible > value. > 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The > configuration option taskmanager.memory.network.min required for local > execution is not set, setting it to its default value 64 mb. > 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The > configuration option taskmanager.memory.network.max required for local > execution is not set, setting it to its default value 64 mb. > 2020-10-29 18:18:32 INFO TaskExecutorResourceUtils:188 - The > configuration option taskmanager.memory.managed.size required for > local execution is not set, setting it to its default value 128 mb. > 2020-10-29 18:18:32 INFO MiniCluster:253 - Starting Flink Mini Cluster > 2020-10-29 18:18:32 INFO MiniCluster:262 - Starting Metrics Registry > 2020-10-29 18:18:32 INFO MetricRegistryImpl:122 - No metrics reporter > configured, no metrics will be exposed/reported. > 2020-10-29 18:18:32 INFO MiniCluster:266 - Starting RPC Service(s) > 2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:247 - Trying to start > local actor system > 2020-10-29 18:18:32 INFO Slf4jLogger:92 - Slf4jLogger started > 2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:278 - Actor system > started at akka://flink > 2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:247 - Trying to start > local actor system > 2020-10-29 18:18:32 INFO Slf4jLogger:92 - Slf4jLogger started > 2020-10-29 18:18:32 INFO AkkaRpcServiceUtils:278 - Actor system > started at akka://flink-metrics > 2020-10-29 18:18:32 INFO AkkaRpcService:225 - Starting RPC endpoint > for org.apache.flink.runtime.metrics.dump.MetricQueryService at > akka://flink-metrics/user/rpc/MetricQueryService . > 2020-10-29 18:18:32 INFO MiniCluster:432 - Starting high-availability services > 2020-10-29 18:18:32 INFO BlobServer:143 - Created BLOB server storage > directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-30bb2435-c664-4d1e-8c74-80cb54157860 > 2020-10-29 18:18:32 INFO BlobServer:207 - Started BLOB server at > 0.0.0.0:50965 - max concurrent requests: 50 - max backlog: 1000 > 2020-10-29 18:18:32 INFO PermanentBlobCache:107 - Created BLOB cache > storage directory > /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-7a0bd0fc-2864-4a66-afd6-5f117d515b07 > 2020-10-29 18:18:32 INFO TransientBlobCache:107 - Created BLOB cache > storage directory > /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-fd96dacc-4ce4-447a-ba3f-d2ce453a1d30 > 2020-10-29 18:18:32 INFO MiniCluster:519 - Starting 1 TaskManger(s) > 2020-10-29 18:18:32 INFO TaskManagerRunner:412 - Starting TaskManager > with ResourceID: 2358fbac-908d-4aa2-b643-c32d44b40193 > 2020-10-29 18:18:32 INFO TaskManagerServices:411 - Temporary file > directory '/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T': total > 233 GB, usable 25 GB (10.73% usable) > 2020-10-29 18:18:32 INFO FileChannelManagerImpl:97 - > FileChannelManager uses directory > /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-io-9aa7adae-549f-461c-95d8-6fe47f31b695 > for spill files. > 2020-10-29 18:18:32 INFO FileChannelManagerImpl:97 - > FileChannelManager uses directory > /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-netty-shuffle-c8e6cc04-6f93-4f55-a2c5-88891c9e9e49 > for spill files. > 2020-10-29 18:18:32 INFO NetworkBufferPool:139 - Allocated 64 MB for > network buffer pool (number of memory segments: 2048, bytes per > segment: 32768). > 2020-10-29 18:18:32 INFO NettyShuffleEnvironment:293 - Starting the > network environment and its components. > 2020-10-29 18:18:32 INFO KvStateService:89 - Starting the kvState > service and its components. > 2020-10-29 18:18:32 INFO AkkaRpcService:225 - Starting RPC endpoint > for org.apache.flink.runtime.taskexecutor.TaskExecutor at > akka://flink/user/rpc/taskmanager_0 . > 2020-10-29 18:18:32 INFO DefaultJobLeaderService:116 - Start job > leader service. > 2020-10-29 18:18:32 INFO FileCache:107 - User file cache uses > directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-dist-cache-4ca7b653-8aa5-4967-b23e-574c43ab7b52 > 2020-10-29 18:18:32 INFO DispatcherRestEndpoint:140 - Starting rest endpoint. > 2020-10-29 18:18:32 INFO DispatcherRestEndpoint:126 - Failed to load > web based job submission extension. Probable reason: flink-runtime-web > is not in the classpath. > 2020-10-29 18:18:32 WARN WebMonitorUtils:85 - Log file environment > variable 'log.file' is not set. > 2020-10-29 18:18:32 WARN WebMonitorUtils:91 - JobManager log files > are unavailable in the web dashboard. Log file location not found in > environment variable 'log.file' or configuration key 'web.log.path'. > 2020-10-29 18:18:33 INFO DispatcherRestEndpoint:236 - Rest endpoint > listening at localhost:50966 > 2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing > leadership to contender http://localhost:50966 > 2020-10-29 18:18:33 INFO DispatcherRestEndpoint:821 - > http://localhost:50966 was granted leadership with > leaderSessionID=5d1c56ab-6894-42d9-bc76-716ea59bd473 > 2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received > confirmation of leadership for leader http://localhost:50966 , > session=5d1c56ab-6894-42d9-bc76-716ea59bd473 > 2020-10-29 18:18:33 INFO AkkaRpcService:225 - Starting RPC endpoint > for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager > at akka://flink/user/rpc/resourcemanager_1 . > 2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing > leadership to contender LeaderContender: DefaultDispatcherRunner > 2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing > leadership to contender LeaderContender: StandaloneResourceManager > 2020-10-29 18:18:33 INFO StandaloneResourceManager:1026 - > ResourceManager akka://flink/user/rpc/resourcemanager_1 was granted > leadership with fencing token a8e96f157326e3f60a2df92bb1364f97 > 2020-10-29 18:18:33 INFO MiniCluster:372 - Flink Mini Cluster started > successfully > 2020-10-29 18:18:33 INFO SlotManagerImpl:284 - Starting the SlotManager. > 2020-10-29 18:18:33 INFO SessionDispatcherLeaderProcess:102 - Start > SessionDispatcherLeaderProcess. > 2020-10-29 18:18:33 INFO SessionDispatcherLeaderProcess:120 - Recover > all persisted job graphs. > 2020-10-29 18:18:33 INFO SessionDispatcherLeaderProcess:128 - > Successfully recovered 0 persisted job graphs. > 2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received > confirmation of leadership for leader > akka://flink/user/rpc/resourcemanager_1 , > session=0a2df92b-b136-4f97-a8e9-6f157326e3f6 > 2020-10-29 18:18:33 INFO TaskExecutor:1128 - Connecting to > ResourceManager > akka://flink/user/rpc/resourcemanager_1(a8e96f157326e3f60a2df92bb1364f97). > 2020-10-29 18:18:33 INFO AkkaRpcService:225 - Starting RPC endpoint > for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at > akka://flink/user/rpc/dispatcher_2 . > 2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received > confirmation of leadership for leader > akka://flink/user/rpc/dispatcher_2 , > session=3ce5d1bf-2d6d-49d6-b315-717f66748938 > 2020-10-29 18:18:33 INFO TaskExecutor:155 - Resolved ResourceManager > address, beginning registration > 2020-10-29 18:18:33 INFO StandaloneResourceManager:821 - Registering > TaskManager with ResourceID 2358fbac-908d-4aa2-b643-c32d44b40193 > (akka://flink/user/rpc/taskmanager_0) at ResourceManager > 2020-10-29 18:18:33 INFO TaskExecutor:84 - Successful registration at > resource manager akka://flink/user/rpc/resourcemanager_1 under > registration id 0be76fda7dfc8204153de66b83ba5621. > 2020-10-29 18:18:33 INFO StandaloneDispatcher:295 - Received JobGraph > submission 1fd25ab4d3d51009542ebda2bbadb55d > (insert-into_default_catalog.default_database.print_table). > 2020-10-29 18:18:33 INFO StandaloneDispatcher:352 - Submitting job > 1fd25ab4d3d51009542ebda2bbadb55d > (insert-into_default_catalog.default_database.print_table). > 2020-10-29 18:18:33 INFO AkkaRpcService:225 - Starting RPC endpoint > for org.apache.flink.runtime.jobmaster.JobMaster at > akka://flink/user/rpc/jobmanager_3 . > 2020-10-29 18:18:33 INFO JobMaster:288 - Initializing job > insert-into_default_catalog.default_database.print_table > (1fd25ab4d3d51009542ebda2bbadb55d). > 2020-10-29 18:18:33 INFO JobMaster:84 - Using restart back off time > strategy NoRestartBackoffTimeStrategy for > insert-into_default_catalog.default_database.print_table > (1fd25ab4d3d51009542ebda2bbadb55d). > 2020-10-29 18:18:33 INFO JobMaster:211 - Running initialization on > master for job insert-into_default_catalog.default_database.print_table > (1fd25ab4d3d51009542ebda2bbadb55d). > 2020-10-29 18:18:33 INFO JobMaster:229 - Successfully ran > initialization on master in 3 ms. > 2020-10-29 18:18:33 INFO DefaultExecutionTopology:111 - Built 1 > pipelined regions in 0 ms > 2020-10-29 18:18:33 INFO JobMaster:231 - No state backend has been > configured, using default (Memory / JobManager) MemoryStateBackend > (data in heap memory / checkpoints to JobManager) (checkpoints: > 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) > 2020-10-29 18:18:33 INFO JobMaster:165 - Using failover strategy > org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5da7a3b6 > for insert-into_default_catalog.default_database.print_table > (1fd25ab4d3d51009542ebda2bbadb55d). > 2020-10-29 18:18:33 INFO EmbeddedLeaderService:302 - Proposing > leadership to contender akka://flink/user/rpc/jobmanager_3 > 2020-10-29 18:18:33 INFO JobManagerRunnerImpl:305 - JobManager runner > for job insert-into_default_catalog.default_database.print_table > (1fd25ab4d3d51009542ebda2bbadb55d) was granted leadership with session > id 47a72e91-c0bf-4b40-8b9f-639f07344b3f at > akka://flink/user/rpc/jobmanager_3. > 2020-10-29 18:18:33 INFO JobMaster:799 - Starting execution of job > insert-into_default_catalog.default_database.print_table > (1fd25ab4d3d51009542ebda2bbadb55d) under job master id > 8b9f639f07344b3f47a72e91c0bf4b40. > 2020-10-29 18:18:33 INFO JobMaster:197 - Starting scheduling with > scheduling strategy > [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy] > 2020-10-29 18:18:33 INFO ExecutionGraph:1253 - Job > insert-into_default_catalog.default_database.print_table > (1fd25ab4d3d51009542ebda2bbadb55d) switched from state CREATED to > RUNNING. > 2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Source: > Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS > account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01 > 00:12:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time])) > (1/1) (1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0) > switched from CREATED to SCHEDULED. > 2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Sink: > Sink(table=[default_catalog.default_database.print_table], > fields=[account_id, amount, transaction_time]) (1/1) > (1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0) > switched from CREATED to SCHEDULED. > 2020-10-29 18:18:33 INFO SlotPoolImpl:385 - Cannot serve slot > request, no ResourceManager connected. Adding as pending request > [SlotRequestId{3e5c5fdf21b3314a9289b257aeec8c2d}] > 2020-10-29 18:18:33 INFO EmbeddedLeaderService:252 - Received > confirmation of leadership for leader > akka://flink/user/rpc/jobmanager_3 , > session=47a72e91-c0bf-4b40-8b9f-639f07344b3f > 2020-10-29 18:18:33 INFO JobMaster:1031 - Connecting to > ResourceManager > akka://flink/user/rpc/resourcemanager_1(a8e96f157326e3f60a2df92bb1364f97) > 2020-10-29 18:18:33 INFO JobMaster:155 - Resolved ResourceManager > address, beginning registration > 2020-10-29 18:18:33 INFO StandaloneResourceManager:330 - Registering > job manager 8b9f639f07344b3f47a72e91c0bf4b40@akka://flink/user/rpc/jobmanager_3 > for job 1fd25ab4d3d51009542ebda2bbadb55d. > 2020-10-29 18:18:33 INFO StandaloneResourceManager:765 - Registered > job manager 8b9f639f07344b3f47a72e91c0bf4b40@akka://flink/user/rpc/jobmanager_3 > for job 1fd25ab4d3d51009542ebda2bbadb55d. > 2020-10-29 18:18:33 INFO JobMaster:1053 - JobManager successfully > registered at ResourceManager, leader id: > a8e96f157326e3f60a2df92bb1364f97. > 2020-10-29 18:18:33 INFO SlotPoolImpl:347 - Requesting new slot > [SlotRequestId{3e5c5fdf21b3314a9289b257aeec8c2d}] and profile > ResourceProfile{UNKNOWN} with allocation id > 45ea6506d06f73f61a36db764ce07ba7 from resource manager. > 2020-10-29 18:18:33 INFO StandaloneResourceManager:464 - Request slot > with profile ResourceProfile{UNKNOWN} for job > 1fd25ab4d3d51009542ebda2bbadb55d with allocation id > 45ea6506d06f73f61a36db764ce07ba7. > 2020-10-29 18:18:33 INFO TaskExecutor:908 - Receive slot request > 45ea6506d06f73f61a36db764ce07ba7 for job > 1fd25ab4d3d51009542ebda2bbadb55d from resource manager with leader id > a8e96f157326e3f60a2df92bb1364f97. > 2020-10-29 18:18:33 INFO TaskExecutor:976 - Allocated slot for > 45ea6506d06f73f61a36db764ce07ba7. > 2020-10-29 18:18:33 INFO DefaultJobLeaderService:172 - Add job > 1fd25ab4d3d51009542ebda2bbadb55d for job leader monitoring. > 2020-10-29 18:18:33 INFO DefaultJobLeaderService:314 - Try to > register at job manager akka://flink/user/rpc/jobmanager_3 with leader > id 47a72e91-c0bf-4b40-8b9f-639f07344b3f. > 2020-10-29 18:18:33 INFO DefaultJobLeaderService:155 - Resolved > JobManager address, beginning registration > 2020-10-29 18:18:33 INFO DefaultJobLeaderService:369 - Successful > registration at job manager akka://flink/user/rpc/jobmanager_3 for job > 1fd25ab4d3d51009542ebda2bbadb55d. > 2020-10-29 18:18:33 INFO TaskExecutor:1379 - Establish JobManager > connection for job 1fd25ab4d3d51009542ebda2bbadb55d. > 2020-10-29 18:18:33 INFO TaskExecutor:1278 - Offer reserved slots to > the leader of job 1fd25ab4d3d51009542ebda2bbadb55d. > 2020-10-29 18:18:33 INFO PermanentBlobCache:251 - Shutting down BLOB cache > 2020-10-29 18:18:33 INFO TaskExecutorLocalStateStoresManager:213 - > Shutting down TaskExecutorLocalStateStoresManager. > 2020-10-29 18:18:33 INFO TransientBlobCache:251 - Shutting down BLOB cache > 2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Source: > Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS > account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01 > 00:12:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time])) > (1/1) (1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0) > switched from SCHEDULED to DEPLOYING. > 2020-10-29 18:18:33 INFO ExecutionGraph:725 - Deploying Source: > Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS > account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01 > 00:12:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]), > Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount, > CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time])) > (1/1) (attempt #0) with attempt id > 1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0 > to 2358fbac-908d-4aa2-b643-c32d44b40193 @ localhost (dataPort=-1) with > allocation id 45ea6506d06f73f61a36db764ce07ba7 > 2020-10-29 18:18:33 INFO ExecutionGraph:1584 - Sink: > Sink(table=[default_catalog.default_database.print_table], > fields=[account_id, amount, transaction_time]) (1/1) > (1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0) > switched from SCHEDULED to DEPLOYING. > 2020-10-29 18:18:33 INFO ExecutionGraph:725 - Deploying Sink: > Sink(table=[default_catalog.default_database.print_table], > fields=[account_id, amount, transaction_time]) (1/1) (attempt #0) with > attempt id 1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0 > to 2358fbac-908d-4aa2-b643-c32d44b40193 @ localhost (dataPort=-1) with > allocation id 45ea6506d06f73f61a36db764ce07ba7 > 2020-10-29 18:18:33 INFO TaskSlotTableImpl:361 - Activate slot > 45ea6506d06f73f61a36db764ce07ba7. > 2020-10-29 18:18:33 INFO FileChannelManagerImpl:146 - > FileChannelManager removed spill file directory > /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-io-9aa7adae-549f-461c-95d8-6fe47f31b695 > 2020-10-29 18:18:33 INFO BlobServer:348 - Stopped BLOB server at 0.0.0.0:50965 > 2020-10-29 18:18:33 INFO FileChannelManagerImpl:146 - > FileChannelManager removed spill file directory > /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-netty-shuffle-c8e6cc04-6f93-4f55-a2c5-88891c9e9e49 > 2020-10-29 18:18:33 INFO FileCache:153 - removed file cache directory > /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-dist-cache-4ca7b653-8aa5-4967-b23e-574c43ab7b52 > > Process finished with exit code 0 > > > [1]: https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/src/test/java/org/apache/flink/playgrounds/spendreport/SpendReportTest.java > > On Thu, Oct 29, 2020 at 4:53 PM Dawid Wysakowicz <[hidden email]> wrote: >> You should be able to use the "print" sink. Remember though that the >> "print" sink prints into the stdout/stderr of TaskManagers, not the >> Client, where you submit the query. This is different from the >> TableResult, which collects results in the client. BTW, for printing you >> can use TableResult#print, which will nicely format your results. >> >> Best, >> >> Dawid >> >> On 29/10/2020 16:13, Ruben Laguna wrote: >>> How can I use the Table [Print SQL connector][1]? I tried the >>> following (batch mode) but it does not give any output: >>> >>> >>> EnvironmentSettings settings = >>> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); >>> TableEnvironment tEnv = TableEnvironment.create(settings); >>> >>> final LocalDateTime DATE_TIME = LocalDateTime.of(2020, 1, 1, 0, 0); >>> >>> Table transactions = >>> tEnv.fromValues( >>> DataTypes.ROW( >>> DataTypes.FIELD("account_id", DataTypes.BIGINT()), >>> DataTypes.FIELD("amount", DataTypes.BIGINT()), >>> DataTypes.FIELD("transaction_time", >>> DataTypes.TIMESTAMP(3))), >>> Row.of(1, 188, DATE_TIME.plusMinutes(12)), >>> Row.of(2, 374, DATE_TIME.plusMinutes(47)), >>> Row.of(3, 112, DATE_TIME.plusMinutes(36)), >>> Row.of(4, 478, DATE_TIME.plusMinutes(3)), >>> Row.of(5, 208, DATE_TIME.plusMinutes(8)), >>> Row.of(1, 379, DATE_TIME.plusMinutes(53)), >>> Row.of(2, 351, DATE_TIME.plusMinutes(32)), >>> Row.of(3, 320, DATE_TIME.plusMinutes(31)), >>> Row.of(4, 259, DATE_TIME.plusMinutes(19)), >>> Row.of(5, 273, DATE_TIME.plusMinutes(42))); >>> tEnv.executeSql("CREATE TABLE print_table(account_id BIGINT, amount >>> BIGINT, transaction_time TIMESTAMP) WITH ('connector' = 'print')"); >>> >>> transactions.executeInsert("print_table"); >>> >>> >>> I can "materialize" the result manually and print them out with : >>> >>> for (Row row : materialize(transactions.execute())) { >>> System.out.println(row); >>> } >>> >>> private static List<Row> materialize(TableResult results) { >>> try (CloseableIterator<Row> resultIterator = results.collect()) { >>> return StreamSupport >>> >>> .stream(Spliterators.spliteratorUnknownSize(resultIterator, >>> Spliterator.ORDERED), false) >>> .collect(Collectors.toList()); >>> } catch (Exception e) { >>> throw new RuntimeException("Failed to materialize results", e); >>> } >>> } >>> >>> >>> But I would like to know why I can't just use the Print sink. >>> >>> I've tried with `.inBatchMode()` and with `inStreamingMode()`, so I >>> don't thinks it's that. >>> >>> Does anybody know of any working example involving the print connector? >>> >>> >>> >>> [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html > signature.asc (849 bytes) Download Attachment |
Free forum by Nabble | Edit this page |