Table Print SQL Connector

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Table Print SQL Connector

Ruben Laguna
How can I use the Table [Print SQL connector][1]? I tried the
following (batch mode)  but it does not give any output:


EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);

final LocalDateTime DATE_TIME = LocalDateTime.of(2020, 1, 1, 0, 0);

Table transactions =
        tEnv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("account_id", DataTypes.BIGINT()),
                        DataTypes.FIELD("amount", DataTypes.BIGINT()),
                        DataTypes.FIELD("transaction_time",
DataTypes.TIMESTAMP(3))),
                Row.of(1, 188, DATE_TIME.plusMinutes(12)),
                Row.of(2, 374, DATE_TIME.plusMinutes(47)),
                Row.of(3, 112, DATE_TIME.plusMinutes(36)),
                Row.of(4, 478, DATE_TIME.plusMinutes(3)),
                Row.of(5, 208, DATE_TIME.plusMinutes(8)),
                Row.of(1, 379, DATE_TIME.plusMinutes(53)),
                Row.of(2, 351, DATE_TIME.plusMinutes(32)),
                Row.of(3, 320, DATE_TIME.plusMinutes(31)),
                Row.of(4, 259, DATE_TIME.plusMinutes(19)),
                Row.of(5, 273, DATE_TIME.plusMinutes(42)));
tEnv.executeSql("CREATE TABLE print_table(account_id BIGINT, amount
BIGINT, transaction_time TIMESTAMP) WITH ('connector' = 'print')");

transactions.executeInsert("print_table");


I can "materialize" the result manually and print them out with :

for (Row row : materialize(transactions.execute())) {
    System.out.println(row);
}

private static List<Row> materialize(TableResult results) {
    try (CloseableIterator<Row> resultIterator = results.collect()) {
        return StreamSupport

.stream(Spliterators.spliteratorUnknownSize(resultIterator,
Spliterator.ORDERED), false)
                .collect(Collectors.toList());
    } catch (Exception e) {
        throw new RuntimeException("Failed to materialize results", e);
    }
}


But I would like to know why I can't just use the Print sink.

I've tried with  `.inBatchMode()` and with `inStreamingMode()`, so I
don't thinks it's that.

Does anybody know of any working example involving the print connector?



[1]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html
--
/Rubén
Reply | Threaded
Open this post in threaded view
|

Re: Table Print SQL Connector

Dawid Wysakowicz-2
You should be able to use the "print" sink. Remember though that the
"print" sink prints into the stdout/stderr of TaskManagers, not the
Client, where you submit the query. This is different from the
TableResult, which collects results in the client. BTW, for printing you
can use TableResult#print, which will nicely format your results.

Best,

Dawid

On 29/10/2020 16:13, Ruben Laguna wrote:

> How can I use the Table [Print SQL connector][1]? I tried the
> following (batch mode)  but it does not give any output:
>
>
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> TableEnvironment tEnv = TableEnvironment.create(settings);
>
> final LocalDateTime DATE_TIME = LocalDateTime.of(2020, 1, 1, 0, 0);
>
> Table transactions =
>         tEnv.fromValues(
>                 DataTypes.ROW(
>                         DataTypes.FIELD("account_id", DataTypes.BIGINT()),
>                         DataTypes.FIELD("amount", DataTypes.BIGINT()),
>                         DataTypes.FIELD("transaction_time",
> DataTypes.TIMESTAMP(3))),
>                 Row.of(1, 188, DATE_TIME.plusMinutes(12)),
>                 Row.of(2, 374, DATE_TIME.plusMinutes(47)),
>                 Row.of(3, 112, DATE_TIME.plusMinutes(36)),
>                 Row.of(4, 478, DATE_TIME.plusMinutes(3)),
>                 Row.of(5, 208, DATE_TIME.plusMinutes(8)),
>                 Row.of(1, 379, DATE_TIME.plusMinutes(53)),
>                 Row.of(2, 351, DATE_TIME.plusMinutes(32)),
>                 Row.of(3, 320, DATE_TIME.plusMinutes(31)),
>                 Row.of(4, 259, DATE_TIME.plusMinutes(19)),
>                 Row.of(5, 273, DATE_TIME.plusMinutes(42)));
> tEnv.executeSql("CREATE TABLE print_table(account_id BIGINT, amount
> BIGINT, transaction_time TIMESTAMP) WITH ('connector' = 'print')");
>
> transactions.executeInsert("print_table");
>
>
> I can "materialize" the result manually and print them out with :
>
> for (Row row : materialize(transactions.execute())) {
>     System.out.println(row);
> }
>
> private static List<Row> materialize(TableResult results) {
>     try (CloseableIterator<Row> resultIterator = results.collect()) {
>         return StreamSupport
>
> .stream(Spliterators.spliteratorUnknownSize(resultIterator,
> Spliterator.ORDERED), false)
>                 .collect(Collectors.toList());
>     } catch (Exception e) {
>         throw new RuntimeException("Failed to materialize results", e);
>     }
> }
>
>
> But I would like to know why I can't just use the Print sink.
>
> I've tried with  `.inBatchMode()` and with `inStreamingMode()`, so I
> don't thinks it's that.
>
> Does anybody know of any working example involving the print connector?
>
>
>
> [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Table Print SQL Connector

Ruben Laguna
Hi,

Using `mytable.execute().print()` is exactly what I wanted, thanks.

But I'm still curious. I'm just running this locally, in a junit test
case (not using a flink
cluster) just like in [flink-playgrounds SpendReportTest][1] so in
this scenario where does the task manager (if there is taskmanager)
 output go?

I just added src/test/resources/log4j.properties with

# Root logger option
log4j.rootLogger=INFO, stdout

# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss}
%-5p %c{1}:%L - %m%n

and still I don't see anything from the print sink, and I even run it
with the debugger and I can see that although
PrintSink#getSinkRuntimeProvider is called , the
RowDataPrintFunction#invoke is never called.

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/Users/ecerulm/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/Users/ecerulm/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
configuration option taskmanager.cpu.cores required for local
execution is not set, setting it to the maximal possible value.
2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
configuration option taskmanager.memory.task.heap.size required for
local execution is not set, setting it to the maximal possible value.
2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
configuration option taskmanager.memory.task.off-heap.size required
for local execution is not set, setting it to the maximal possible
value.
2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
configuration option taskmanager.memory.network.min required for local
execution is not set, setting it to its default value 64 mb.
2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
configuration option taskmanager.memory.network.max required for local
execution is not set, setting it to its default value 64 mb.
2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
configuration option taskmanager.memory.managed.size required for
local execution is not set, setting it to its default value 128 mb.
2020-10-29 18:18:32 INFO  MiniCluster:253 - Starting Flink Mini Cluster
2020-10-29 18:18:32 INFO  MiniCluster:262 - Starting Metrics Registry
2020-10-29 18:18:32 INFO  MetricRegistryImpl:122 - No metrics reporter
configured, no metrics will be exposed/reported.
2020-10-29 18:18:32 INFO  MiniCluster:266 - Starting RPC Service(s)
2020-10-29 18:18:32 INFO  AkkaRpcServiceUtils:247 - Trying to start
local actor system
2020-10-29 18:18:32 INFO  Slf4jLogger:92 - Slf4jLogger started
2020-10-29 18:18:32 INFO  AkkaRpcServiceUtils:278 - Actor system
started at akka://flink
2020-10-29 18:18:32 INFO  AkkaRpcServiceUtils:247 - Trying to start
local actor system
2020-10-29 18:18:32 INFO  Slf4jLogger:92 - Slf4jLogger started
2020-10-29 18:18:32 INFO  AkkaRpcServiceUtils:278 - Actor system
started at akka://flink-metrics
2020-10-29 18:18:32 INFO  AkkaRpcService:225 - Starting RPC endpoint
for org.apache.flink.runtime.metrics.dump.MetricQueryService at
akka://flink-metrics/user/rpc/MetricQueryService .
2020-10-29 18:18:32 INFO  MiniCluster:432 - Starting high-availability services
2020-10-29 18:18:32 INFO  BlobServer:143 - Created BLOB server storage
directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-30bb2435-c664-4d1e-8c74-80cb54157860
2020-10-29 18:18:32 INFO  BlobServer:207 - Started BLOB server at
0.0.0.0:50965 - max concurrent requests: 50 - max backlog: 1000
2020-10-29 18:18:32 INFO  PermanentBlobCache:107 - Created BLOB cache
storage directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-7a0bd0fc-2864-4a66-afd6-5f117d515b07
2020-10-29 18:18:32 INFO  TransientBlobCache:107 - Created BLOB cache
storage directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-fd96dacc-4ce4-447a-ba3f-d2ce453a1d30
2020-10-29 18:18:32 INFO  MiniCluster:519 - Starting 1 TaskManger(s)
2020-10-29 18:18:32 INFO  TaskManagerRunner:412 - Starting TaskManager
with ResourceID: 2358fbac-908d-4aa2-b643-c32d44b40193
2020-10-29 18:18:32 INFO  TaskManagerServices:411 - Temporary file
directory '/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T': total
233 GB, usable 25 GB (10.73% usable)
2020-10-29 18:18:32 INFO  FileChannelManagerImpl:97 -
FileChannelManager uses directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-io-9aa7adae-549f-461c-95d8-6fe47f31b695
for spill files.
2020-10-29 18:18:32 INFO  FileChannelManagerImpl:97 -
FileChannelManager uses directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-netty-shuffle-c8e6cc04-6f93-4f55-a2c5-88891c9e9e49
for spill files.
2020-10-29 18:18:32 INFO  NetworkBufferPool:139 - Allocated 64 MB for
network buffer pool (number of memory segments: 2048, bytes per
segment: 32768).
2020-10-29 18:18:32 INFO  NettyShuffleEnvironment:293 - Starting the
network environment and its components.
2020-10-29 18:18:32 INFO  KvStateService:89 - Starting the kvState
service and its components.
2020-10-29 18:18:32 INFO  AkkaRpcService:225 - Starting RPC endpoint
for org.apache.flink.runtime.taskexecutor.TaskExecutor at
akka://flink/user/rpc/taskmanager_0 .
2020-10-29 18:18:32 INFO  DefaultJobLeaderService:116 - Start job
leader service.
2020-10-29 18:18:32 INFO  FileCache:107 - User file cache uses
directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-dist-cache-4ca7b653-8aa5-4967-b23e-574c43ab7b52
2020-10-29 18:18:32 INFO  DispatcherRestEndpoint:140 - Starting rest endpoint.
2020-10-29 18:18:32 INFO  DispatcherRestEndpoint:126 - Failed to load
web based job submission extension. Probable reason: flink-runtime-web
is not in the classpath.
2020-10-29 18:18:32 WARN  WebMonitorUtils:85 - Log file environment
variable 'log.file' is not set.
2020-10-29 18:18:32 WARN  WebMonitorUtils:91 - JobManager log files
are unavailable in the web dashboard. Log file location not found in
environment variable 'log.file' or configuration key 'web.log.path'.
2020-10-29 18:18:33 INFO  DispatcherRestEndpoint:236 - Rest endpoint
listening at localhost:50966
2020-10-29 18:18:33 INFO  EmbeddedLeaderService:302 - Proposing
leadership to contender http://localhost:50966
2020-10-29 18:18:33 INFO  DispatcherRestEndpoint:821 -
http://localhost:50966 was granted leadership with
leaderSessionID=5d1c56ab-6894-42d9-bc76-716ea59bd473
2020-10-29 18:18:33 INFO  EmbeddedLeaderService:252 - Received
confirmation of leadership for leader http://localhost:50966 ,
session=5d1c56ab-6894-42d9-bc76-716ea59bd473
2020-10-29 18:18:33 INFO  AkkaRpcService:225 - Starting RPC endpoint
for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager
at akka://flink/user/rpc/resourcemanager_1 .
2020-10-29 18:18:33 INFO  EmbeddedLeaderService:302 - Proposing
leadership to contender LeaderContender: DefaultDispatcherRunner
2020-10-29 18:18:33 INFO  EmbeddedLeaderService:302 - Proposing
leadership to contender LeaderContender: StandaloneResourceManager
2020-10-29 18:18:33 INFO  StandaloneResourceManager:1026 -
ResourceManager akka://flink/user/rpc/resourcemanager_1 was granted
leadership with fencing token a8e96f157326e3f60a2df92bb1364f97
2020-10-29 18:18:33 INFO  MiniCluster:372 - Flink Mini Cluster started
successfully
2020-10-29 18:18:33 INFO  SlotManagerImpl:284 - Starting the SlotManager.
2020-10-29 18:18:33 INFO  SessionDispatcherLeaderProcess:102 - Start
SessionDispatcherLeaderProcess.
2020-10-29 18:18:33 INFO  SessionDispatcherLeaderProcess:120 - Recover
all persisted job graphs.
2020-10-29 18:18:33 INFO  SessionDispatcherLeaderProcess:128 -
Successfully recovered 0 persisted job graphs.
2020-10-29 18:18:33 INFO  EmbeddedLeaderService:252 - Received
confirmation of leadership for leader
akka://flink/user/rpc/resourcemanager_1 ,
session=0a2df92b-b136-4f97-a8e9-6f157326e3f6
2020-10-29 18:18:33 INFO  TaskExecutor:1128 - Connecting to
ResourceManager
akka://flink/user/rpc/resourcemanager_1(a8e96f157326e3f60a2df92bb1364f97).
2020-10-29 18:18:33 INFO  AkkaRpcService:225 - Starting RPC endpoint
for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
akka://flink/user/rpc/dispatcher_2 .
2020-10-29 18:18:33 INFO  EmbeddedLeaderService:252 - Received
confirmation of leadership for leader
akka://flink/user/rpc/dispatcher_2 ,
session=3ce5d1bf-2d6d-49d6-b315-717f66748938
2020-10-29 18:18:33 INFO  TaskExecutor:155 - Resolved ResourceManager
address, beginning registration
2020-10-29 18:18:33 INFO  StandaloneResourceManager:821 - Registering
TaskManager with ResourceID 2358fbac-908d-4aa2-b643-c32d44b40193
(akka://flink/user/rpc/taskmanager_0) at ResourceManager
2020-10-29 18:18:33 INFO  TaskExecutor:84 - Successful registration at
resource manager akka://flink/user/rpc/resourcemanager_1 under
registration id 0be76fda7dfc8204153de66b83ba5621.
2020-10-29 18:18:33 INFO  StandaloneDispatcher:295 - Received JobGraph
submission 1fd25ab4d3d51009542ebda2bbadb55d
(insert-into_default_catalog.default_database.print_table).
2020-10-29 18:18:33 INFO  StandaloneDispatcher:352 - Submitting job
1fd25ab4d3d51009542ebda2bbadb55d
(insert-into_default_catalog.default_database.print_table).
2020-10-29 18:18:33 INFO  AkkaRpcService:225 - Starting RPC endpoint
for org.apache.flink.runtime.jobmaster.JobMaster at
akka://flink/user/rpc/jobmanager_3 .
2020-10-29 18:18:33 INFO  JobMaster:288 - Initializing job
insert-into_default_catalog.default_database.print_table
(1fd25ab4d3d51009542ebda2bbadb55d).
2020-10-29 18:18:33 INFO  JobMaster:84 - Using restart back off time
strategy NoRestartBackoffTimeStrategy for
insert-into_default_catalog.default_database.print_table
(1fd25ab4d3d51009542ebda2bbadb55d).
2020-10-29 18:18:33 INFO  JobMaster:211 - Running initialization on
master for job insert-into_default_catalog.default_database.print_table
(1fd25ab4d3d51009542ebda2bbadb55d).
2020-10-29 18:18:33 INFO  JobMaster:229 - Successfully ran
initialization on master in 3 ms.
2020-10-29 18:18:33 INFO  DefaultExecutionTopology:111 - Built 1
pipelined regions in 0 ms
2020-10-29 18:18:33 INFO  JobMaster:231 - No state backend has been
configured, using default (Memory / JobManager) MemoryStateBackend
(data in heap memory / checkpoints to JobManager) (checkpoints:
'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
2020-10-29 18:18:33 INFO  JobMaster:165 - Using failover strategy
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5da7a3b6
for insert-into_default_catalog.default_database.print_table
(1fd25ab4d3d51009542ebda2bbadb55d).
2020-10-29 18:18:33 INFO  EmbeddedLeaderService:302 - Proposing
leadership to contender akka://flink/user/rpc/jobmanager_3
2020-10-29 18:18:33 INFO  JobManagerRunnerImpl:305 - JobManager runner
for job insert-into_default_catalog.default_database.print_table
(1fd25ab4d3d51009542ebda2bbadb55d) was granted leadership with session
id 47a72e91-c0bf-4b40-8b9f-639f07344b3f at
akka://flink/user/rpc/jobmanager_3.
2020-10-29 18:18:33 INFO  JobMaster:799 - Starting execution of job
insert-into_default_catalog.default_database.print_table
(1fd25ab4d3d51009542ebda2bbadb55d) under job master id
8b9f639f07344b3f47a72e91c0bf4b40.
2020-10-29 18:18:33 INFO  JobMaster:197 - Starting scheduling with
scheduling strategy
[org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
2020-10-29 18:18:33 INFO  ExecutionGraph:1253 - Job
insert-into_default_catalog.default_database.print_table
(1fd25ab4d3d51009542ebda2bbadb55d) switched from state CREATED to
RUNNING.
2020-10-29 18:18:33 INFO  ExecutionGraph:1584 - Source:
Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS
account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01
00:12:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time]))
(1/1) (1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0)
switched from CREATED to SCHEDULED.
2020-10-29 18:18:33 INFO  ExecutionGraph:1584 - Sink:
Sink(table=[default_catalog.default_database.print_table],
fields=[account_id, amount, transaction_time]) (1/1)
(1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0)
switched from CREATED to SCHEDULED.
2020-10-29 18:18:33 INFO  SlotPoolImpl:385 - Cannot serve slot
request, no ResourceManager connected. Adding as pending request
[SlotRequestId{3e5c5fdf21b3314a9289b257aeec8c2d}]
2020-10-29 18:18:33 INFO  EmbeddedLeaderService:252 - Received
confirmation of leadership for leader
akka://flink/user/rpc/jobmanager_3 ,
session=47a72e91-c0bf-4b40-8b9f-639f07344b3f
2020-10-29 18:18:33 INFO  JobMaster:1031 - Connecting to
ResourceManager
akka://flink/user/rpc/resourcemanager_1(a8e96f157326e3f60a2df92bb1364f97)
2020-10-29 18:18:33 INFO  JobMaster:155 - Resolved ResourceManager
address, beginning registration
2020-10-29 18:18:33 INFO  StandaloneResourceManager:330 - Registering
job manager 8b9f639f07344b3f47a72e91c0bf4b40@akka://flink/user/rpc/jobmanager_3
for job 1fd25ab4d3d51009542ebda2bbadb55d.
2020-10-29 18:18:33 INFO  StandaloneResourceManager:765 - Registered
job manager 8b9f639f07344b3f47a72e91c0bf4b40@akka://flink/user/rpc/jobmanager_3
for job 1fd25ab4d3d51009542ebda2bbadb55d.
2020-10-29 18:18:33 INFO  JobMaster:1053 - JobManager successfully
registered at ResourceManager, leader id:
a8e96f157326e3f60a2df92bb1364f97.
2020-10-29 18:18:33 INFO  SlotPoolImpl:347 - Requesting new slot
[SlotRequestId{3e5c5fdf21b3314a9289b257aeec8c2d}] and profile
ResourceProfile{UNKNOWN} with allocation id
45ea6506d06f73f61a36db764ce07ba7 from resource manager.
2020-10-29 18:18:33 INFO  StandaloneResourceManager:464 - Request slot
with profile ResourceProfile{UNKNOWN} for job
1fd25ab4d3d51009542ebda2bbadb55d with allocation id
45ea6506d06f73f61a36db764ce07ba7.
2020-10-29 18:18:33 INFO  TaskExecutor:908 - Receive slot request
45ea6506d06f73f61a36db764ce07ba7 for job
1fd25ab4d3d51009542ebda2bbadb55d from resource manager with leader id
a8e96f157326e3f60a2df92bb1364f97.
2020-10-29 18:18:33 INFO  TaskExecutor:976 - Allocated slot for
45ea6506d06f73f61a36db764ce07ba7.
2020-10-29 18:18:33 INFO  DefaultJobLeaderService:172 - Add job
1fd25ab4d3d51009542ebda2bbadb55d for job leader monitoring.
2020-10-29 18:18:33 INFO  DefaultJobLeaderService:314 - Try to
register at job manager akka://flink/user/rpc/jobmanager_3 with leader
id 47a72e91-c0bf-4b40-8b9f-639f07344b3f.
2020-10-29 18:18:33 INFO  DefaultJobLeaderService:155 - Resolved
JobManager address, beginning registration
2020-10-29 18:18:33 INFO  DefaultJobLeaderService:369 - Successful
registration at job manager akka://flink/user/rpc/jobmanager_3 for job
1fd25ab4d3d51009542ebda2bbadb55d.
2020-10-29 18:18:33 INFO  TaskExecutor:1379 - Establish JobManager
connection for job 1fd25ab4d3d51009542ebda2bbadb55d.
2020-10-29 18:18:33 INFO  TaskExecutor:1278 - Offer reserved slots to
the leader of job 1fd25ab4d3d51009542ebda2bbadb55d.
2020-10-29 18:18:33 INFO  PermanentBlobCache:251 - Shutting down BLOB cache
2020-10-29 18:18:33 INFO  TaskExecutorLocalStateStoresManager:213 -
Shutting down TaskExecutorLocalStateStoresManager.
2020-10-29 18:18:33 INFO  TransientBlobCache:251 - Shutting down BLOB cache
2020-10-29 18:18:33 INFO  ExecutionGraph:1584 - Source:
Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS
account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01
00:12:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time]))
(1/1) (1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0)
switched from SCHEDULED to DEPLOYING.
2020-10-29 18:18:33 INFO  ExecutionGraph:725 - Deploying Source:
Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS
account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01
00:12:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]),
Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount,
CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time]))
(1/1) (attempt #0) with attempt id
1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0
to 2358fbac-908d-4aa2-b643-c32d44b40193 @ localhost (dataPort=-1) with
allocation id 45ea6506d06f73f61a36db764ce07ba7
2020-10-29 18:18:33 INFO  ExecutionGraph:1584 - Sink:
Sink(table=[default_catalog.default_database.print_table],
fields=[account_id, amount, transaction_time]) (1/1)
(1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0)
switched from SCHEDULED to DEPLOYING.
2020-10-29 18:18:33 INFO  ExecutionGraph:725 - Deploying Sink:
Sink(table=[default_catalog.default_database.print_table],
fields=[account_id, amount, transaction_time]) (1/1) (attempt #0) with
attempt id 1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0
to 2358fbac-908d-4aa2-b643-c32d44b40193 @ localhost (dataPort=-1) with
allocation id 45ea6506d06f73f61a36db764ce07ba7
2020-10-29 18:18:33 INFO  TaskSlotTableImpl:361 - Activate slot
45ea6506d06f73f61a36db764ce07ba7.
2020-10-29 18:18:33 INFO  FileChannelManagerImpl:146 -
FileChannelManager removed spill file directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-io-9aa7adae-549f-461c-95d8-6fe47f31b695
2020-10-29 18:18:33 INFO  BlobServer:348 - Stopped BLOB server at 0.0.0.0:50965
2020-10-29 18:18:33 INFO  FileChannelManagerImpl:146 -
FileChannelManager removed spill file directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-netty-shuffle-c8e6cc04-6f93-4f55-a2c5-88891c9e9e49
2020-10-29 18:18:33 INFO  FileCache:153 - removed file cache directory
/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-dist-cache-4ca7b653-8aa5-4967-b23e-574c43ab7b52

Process finished with exit code 0


[1]: https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/src/test/java/org/apache/flink/playgrounds/spendreport/SpendReportTest.java

On Thu, Oct 29, 2020 at 4:53 PM Dawid Wysakowicz <[hidden email]> wrote:

>
> You should be able to use the "print" sink. Remember though that the
> "print" sink prints into the stdout/stderr of TaskManagers, not the
> Client, where you submit the query. This is different from the
> TableResult, which collects results in the client. BTW, for printing you
> can use TableResult#print, which will nicely format your results.
>
> Best,
>
> Dawid
>
> On 29/10/2020 16:13, Ruben Laguna wrote:
> > How can I use the Table [Print SQL connector][1]? I tried the
> > following (batch mode)  but it does not give any output:
> >
> >
> > EnvironmentSettings settings =
> > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> > TableEnvironment tEnv = TableEnvironment.create(settings);
> >
> > final LocalDateTime DATE_TIME = LocalDateTime.of(2020, 1, 1, 0, 0);
> >
> > Table transactions =
> >         tEnv.fromValues(
> >                 DataTypes.ROW(
> >                         DataTypes.FIELD("account_id", DataTypes.BIGINT()),
> >                         DataTypes.FIELD("amount", DataTypes.BIGINT()),
> >                         DataTypes.FIELD("transaction_time",
> > DataTypes.TIMESTAMP(3))),
> >                 Row.of(1, 188, DATE_TIME.plusMinutes(12)),
> >                 Row.of(2, 374, DATE_TIME.plusMinutes(47)),
> >                 Row.of(3, 112, DATE_TIME.plusMinutes(36)),
> >                 Row.of(4, 478, DATE_TIME.plusMinutes(3)),
> >                 Row.of(5, 208, DATE_TIME.plusMinutes(8)),
> >                 Row.of(1, 379, DATE_TIME.plusMinutes(53)),
> >                 Row.of(2, 351, DATE_TIME.plusMinutes(32)),
> >                 Row.of(3, 320, DATE_TIME.plusMinutes(31)),
> >                 Row.of(4, 259, DATE_TIME.plusMinutes(19)),
> >                 Row.of(5, 273, DATE_TIME.plusMinutes(42)));
> > tEnv.executeSql("CREATE TABLE print_table(account_id BIGINT, amount
> > BIGINT, transaction_time TIMESTAMP) WITH ('connector' = 'print')");
> >
> > transactions.executeInsert("print_table");
> >
> >
> > I can "materialize" the result manually and print them out with :
> >
> > for (Row row : materialize(transactions.execute())) {
> >     System.out.println(row);
> > }
> >
> > private static List<Row> materialize(TableResult results) {
> >     try (CloseableIterator<Row> resultIterator = results.collect()) {
> >         return StreamSupport
> >
> > .stream(Spliterators.spliteratorUnknownSize(resultIterator,
> > Spliterator.ORDERED), false)
> >                 .collect(Collectors.toList());
> >     } catch (Exception e) {
> >         throw new RuntimeException("Failed to materialize results", e);
> >     }
> > }
> >
> >
> > But I would like to know why I can't just use the Print sink.
> >
> > I've tried with  `.inBatchMode()` and with `inStreamingMode()`, so I
> > don't thinks it's that.
> >
> > Does anybody know of any working example involving the print connector?
> >
> >
> >
> > [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html
>


--
/Rubén
Reply | Threaded
Open this post in threaded view
|

Re: Table Print SQL Connector

Dawid Wysakowicz-2
Hi,

The problem in your case is that you exit before anything is printed
out. The method executeInsert executes the query, but it does not wait
for the query to finish. Therefore your main/test method returns,
bringing down the local cluster, before anything is printed out. You can
e.g. add

        TableResult result = transactions.executeInsert("print_table");
        result.await();

which will wait for the insert to finish.

The print sink prints into the stdout/stderr directly, therefore none of
the logging configurations apply in this case.

Best,

Dawid

On 29/10/2020 18:29, Ruben Laguna wrote:

> Hi,
>
> Using `mytable.execute().print()` is exactly what I wanted, thanks.
>
> But I'm still curious. I'm just running this locally, in a junit test
> case (not using a flink
> cluster) just like in [flink-playgrounds SpendReportTest][1] so in
> this scenario where does the task manager (if there is taskmanager)
>  output go?
>
> I just added src/test/resources/log4j.properties with
>
> # Root logger option
> log4j.rootLogger=INFO, stdout
>
> # Direct log messages to stdout
> log4j.appender.stdout=org.apache.log4j.ConsoleAppender
> log4j.appender.stdout.Target=System.out
> log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
> log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss}
> %-5p %c{1}:%L - %m%n
>
> and still I don't see anything from the print sink, and I even run it
> with the debugger and I can see that although
> PrintSink#getSinkRuntimeProvider is called , the
> RowDataPrintFunction#invoke is never called.
>
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/Users/ecerulm/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/Users/ecerulm/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
> configuration option taskmanager.cpu.cores required for local
> execution is not set, setting it to the maximal possible value.
> 2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
> configuration option taskmanager.memory.task.heap.size required for
> local execution is not set, setting it to the maximal possible value.
> 2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
> configuration option taskmanager.memory.task.off-heap.size required
> for local execution is not set, setting it to the maximal possible
> value.
> 2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
> configuration option taskmanager.memory.network.min required for local
> execution is not set, setting it to its default value 64 mb.
> 2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
> configuration option taskmanager.memory.network.max required for local
> execution is not set, setting it to its default value 64 mb.
> 2020-10-29 18:18:32 INFO  TaskExecutorResourceUtils:188 - The
> configuration option taskmanager.memory.managed.size required for
> local execution is not set, setting it to its default value 128 mb.
> 2020-10-29 18:18:32 INFO  MiniCluster:253 - Starting Flink Mini Cluster
> 2020-10-29 18:18:32 INFO  MiniCluster:262 - Starting Metrics Registry
> 2020-10-29 18:18:32 INFO  MetricRegistryImpl:122 - No metrics reporter
> configured, no metrics will be exposed/reported.
> 2020-10-29 18:18:32 INFO  MiniCluster:266 - Starting RPC Service(s)
> 2020-10-29 18:18:32 INFO  AkkaRpcServiceUtils:247 - Trying to start
> local actor system
> 2020-10-29 18:18:32 INFO  Slf4jLogger:92 - Slf4jLogger started
> 2020-10-29 18:18:32 INFO  AkkaRpcServiceUtils:278 - Actor system
> started at akka://flink
> 2020-10-29 18:18:32 INFO  AkkaRpcServiceUtils:247 - Trying to start
> local actor system
> 2020-10-29 18:18:32 INFO  Slf4jLogger:92 - Slf4jLogger started
> 2020-10-29 18:18:32 INFO  AkkaRpcServiceUtils:278 - Actor system
> started at akka://flink-metrics
> 2020-10-29 18:18:32 INFO  AkkaRpcService:225 - Starting RPC endpoint
> for org.apache.flink.runtime.metrics.dump.MetricQueryService at
> akka://flink-metrics/user/rpc/MetricQueryService .
> 2020-10-29 18:18:32 INFO  MiniCluster:432 - Starting high-availability services
> 2020-10-29 18:18:32 INFO  BlobServer:143 - Created BLOB server storage
> directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-30bb2435-c664-4d1e-8c74-80cb54157860
> 2020-10-29 18:18:32 INFO  BlobServer:207 - Started BLOB server at
> 0.0.0.0:50965 - max concurrent requests: 50 - max backlog: 1000
> 2020-10-29 18:18:32 INFO  PermanentBlobCache:107 - Created BLOB cache
> storage directory
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-7a0bd0fc-2864-4a66-afd6-5f117d515b07
> 2020-10-29 18:18:32 INFO  TransientBlobCache:107 - Created BLOB cache
> storage directory
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/blobStore-fd96dacc-4ce4-447a-ba3f-d2ce453a1d30
> 2020-10-29 18:18:32 INFO  MiniCluster:519 - Starting 1 TaskManger(s)
> 2020-10-29 18:18:32 INFO  TaskManagerRunner:412 - Starting TaskManager
> with ResourceID: 2358fbac-908d-4aa2-b643-c32d44b40193
> 2020-10-29 18:18:32 INFO  TaskManagerServices:411 - Temporary file
> directory '/var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T': total
> 233 GB, usable 25 GB (10.73% usable)
> 2020-10-29 18:18:32 INFO  FileChannelManagerImpl:97 -
> FileChannelManager uses directory
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-io-9aa7adae-549f-461c-95d8-6fe47f31b695
> for spill files.
> 2020-10-29 18:18:32 INFO  FileChannelManagerImpl:97 -
> FileChannelManager uses directory
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-netty-shuffle-c8e6cc04-6f93-4f55-a2c5-88891c9e9e49
> for spill files.
> 2020-10-29 18:18:32 INFO  NetworkBufferPool:139 - Allocated 64 MB for
> network buffer pool (number of memory segments: 2048, bytes per
> segment: 32768).
> 2020-10-29 18:18:32 INFO  NettyShuffleEnvironment:293 - Starting the
> network environment and its components.
> 2020-10-29 18:18:32 INFO  KvStateService:89 - Starting the kvState
> service and its components.
> 2020-10-29 18:18:32 INFO  AkkaRpcService:225 - Starting RPC endpoint
> for org.apache.flink.runtime.taskexecutor.TaskExecutor at
> akka://flink/user/rpc/taskmanager_0 .
> 2020-10-29 18:18:32 INFO  DefaultJobLeaderService:116 - Start job
> leader service.
> 2020-10-29 18:18:32 INFO  FileCache:107 - User file cache uses
> directory /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-dist-cache-4ca7b653-8aa5-4967-b23e-574c43ab7b52
> 2020-10-29 18:18:32 INFO  DispatcherRestEndpoint:140 - Starting rest endpoint.
> 2020-10-29 18:18:32 INFO  DispatcherRestEndpoint:126 - Failed to load
> web based job submission extension. Probable reason: flink-runtime-web
> is not in the classpath.
> 2020-10-29 18:18:32 WARN  WebMonitorUtils:85 - Log file environment
> variable 'log.file' is not set.
> 2020-10-29 18:18:32 WARN  WebMonitorUtils:91 - JobManager log files
> are unavailable in the web dashboard. Log file location not found in
> environment variable 'log.file' or configuration key 'web.log.path'.
> 2020-10-29 18:18:33 INFO  DispatcherRestEndpoint:236 - Rest endpoint
> listening at localhost:50966
> 2020-10-29 18:18:33 INFO  EmbeddedLeaderService:302 - Proposing
> leadership to contender http://localhost:50966
> 2020-10-29 18:18:33 INFO  DispatcherRestEndpoint:821 -
> http://localhost:50966 was granted leadership with
> leaderSessionID=5d1c56ab-6894-42d9-bc76-716ea59bd473
> 2020-10-29 18:18:33 INFO  EmbeddedLeaderService:252 - Received
> confirmation of leadership for leader http://localhost:50966 ,
> session=5d1c56ab-6894-42d9-bc76-716ea59bd473
> 2020-10-29 18:18:33 INFO  AkkaRpcService:225 - Starting RPC endpoint
> for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager
> at akka://flink/user/rpc/resourcemanager_1 .
> 2020-10-29 18:18:33 INFO  EmbeddedLeaderService:302 - Proposing
> leadership to contender LeaderContender: DefaultDispatcherRunner
> 2020-10-29 18:18:33 INFO  EmbeddedLeaderService:302 - Proposing
> leadership to contender LeaderContender: StandaloneResourceManager
> 2020-10-29 18:18:33 INFO  StandaloneResourceManager:1026 -
> ResourceManager akka://flink/user/rpc/resourcemanager_1 was granted
> leadership with fencing token a8e96f157326e3f60a2df92bb1364f97
> 2020-10-29 18:18:33 INFO  MiniCluster:372 - Flink Mini Cluster started
> successfully
> 2020-10-29 18:18:33 INFO  SlotManagerImpl:284 - Starting the SlotManager.
> 2020-10-29 18:18:33 INFO  SessionDispatcherLeaderProcess:102 - Start
> SessionDispatcherLeaderProcess.
> 2020-10-29 18:18:33 INFO  SessionDispatcherLeaderProcess:120 - Recover
> all persisted job graphs.
> 2020-10-29 18:18:33 INFO  SessionDispatcherLeaderProcess:128 -
> Successfully recovered 0 persisted job graphs.
> 2020-10-29 18:18:33 INFO  EmbeddedLeaderService:252 - Received
> confirmation of leadership for leader
> akka://flink/user/rpc/resourcemanager_1 ,
> session=0a2df92b-b136-4f97-a8e9-6f157326e3f6
> 2020-10-29 18:18:33 INFO  TaskExecutor:1128 - Connecting to
> ResourceManager
> akka://flink/user/rpc/resourcemanager_1(a8e96f157326e3f60a2df92bb1364f97).
> 2020-10-29 18:18:33 INFO  AkkaRpcService:225 - Starting RPC endpoint
> for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
> akka://flink/user/rpc/dispatcher_2 .
> 2020-10-29 18:18:33 INFO  EmbeddedLeaderService:252 - Received
> confirmation of leadership for leader
> akka://flink/user/rpc/dispatcher_2 ,
> session=3ce5d1bf-2d6d-49d6-b315-717f66748938
> 2020-10-29 18:18:33 INFO  TaskExecutor:155 - Resolved ResourceManager
> address, beginning registration
> 2020-10-29 18:18:33 INFO  StandaloneResourceManager:821 - Registering
> TaskManager with ResourceID 2358fbac-908d-4aa2-b643-c32d44b40193
> (akka://flink/user/rpc/taskmanager_0) at ResourceManager
> 2020-10-29 18:18:33 INFO  TaskExecutor:84 - Successful registration at
> resource manager akka://flink/user/rpc/resourcemanager_1 under
> registration id 0be76fda7dfc8204153de66b83ba5621.
> 2020-10-29 18:18:33 INFO  StandaloneDispatcher:295 - Received JobGraph
> submission 1fd25ab4d3d51009542ebda2bbadb55d
> (insert-into_default_catalog.default_database.print_table).
> 2020-10-29 18:18:33 INFO  StandaloneDispatcher:352 - Submitting job
> 1fd25ab4d3d51009542ebda2bbadb55d
> (insert-into_default_catalog.default_database.print_table).
> 2020-10-29 18:18:33 INFO  AkkaRpcService:225 - Starting RPC endpoint
> for org.apache.flink.runtime.jobmaster.JobMaster at
> akka://flink/user/rpc/jobmanager_3 .
> 2020-10-29 18:18:33 INFO  JobMaster:288 - Initializing job
> insert-into_default_catalog.default_database.print_table
> (1fd25ab4d3d51009542ebda2bbadb55d).
> 2020-10-29 18:18:33 INFO  JobMaster:84 - Using restart back off time
> strategy NoRestartBackoffTimeStrategy for
> insert-into_default_catalog.default_database.print_table
> (1fd25ab4d3d51009542ebda2bbadb55d).
> 2020-10-29 18:18:33 INFO  JobMaster:211 - Running initialization on
> master for job insert-into_default_catalog.default_database.print_table
> (1fd25ab4d3d51009542ebda2bbadb55d).
> 2020-10-29 18:18:33 INFO  JobMaster:229 - Successfully ran
> initialization on master in 3 ms.
> 2020-10-29 18:18:33 INFO  DefaultExecutionTopology:111 - Built 1
> pipelined regions in 0 ms
> 2020-10-29 18:18:33 INFO  JobMaster:231 - No state backend has been
> configured, using default (Memory / JobManager) MemoryStateBackend
> (data in heap memory / checkpoints to JobManager) (checkpoints:
> 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
> 2020-10-29 18:18:33 INFO  JobMaster:165 - Using failover strategy
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5da7a3b6
> for insert-into_default_catalog.default_database.print_table
> (1fd25ab4d3d51009542ebda2bbadb55d).
> 2020-10-29 18:18:33 INFO  EmbeddedLeaderService:302 - Proposing
> leadership to contender akka://flink/user/rpc/jobmanager_3
> 2020-10-29 18:18:33 INFO  JobManagerRunnerImpl:305 - JobManager runner
> for job insert-into_default_catalog.default_database.print_table
> (1fd25ab4d3d51009542ebda2bbadb55d) was granted leadership with session
> id 47a72e91-c0bf-4b40-8b9f-639f07344b3f at
> akka://flink/user/rpc/jobmanager_3.
> 2020-10-29 18:18:33 INFO  JobMaster:799 - Starting execution of job
> insert-into_default_catalog.default_database.print_table
> (1fd25ab4d3d51009542ebda2bbadb55d) under job master id
> 8b9f639f07344b3f47a72e91c0bf4b40.
> 2020-10-29 18:18:33 INFO  JobMaster:197 - Starting scheduling with
> scheduling strategy
> [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
> 2020-10-29 18:18:33 INFO  ExecutionGraph:1253 - Job
> insert-into_default_catalog.default_database.print_table
> (1fd25ab4d3d51009542ebda2bbadb55d) switched from state CREATED to
> RUNNING.
> 2020-10-29 18:18:33 INFO  ExecutionGraph:1584 - Source:
> Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS
> account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01
> 00:12:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time]))
> (1/1) (1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0)
> switched from CREATED to SCHEDULED.
> 2020-10-29 18:18:33 INFO  ExecutionGraph:1584 - Sink:
> Sink(table=[default_catalog.default_database.print_table],
> fields=[account_id, amount, transaction_time]) (1/1)
> (1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0)
> switched from CREATED to SCHEDULED.
> 2020-10-29 18:18:33 INFO  SlotPoolImpl:385 - Cannot serve slot
> request, no ResourceManager connected. Adding as pending request
> [SlotRequestId{3e5c5fdf21b3314a9289b257aeec8c2d}]
> 2020-10-29 18:18:33 INFO  EmbeddedLeaderService:252 - Received
> confirmation of leadership for leader
> akka://flink/user/rpc/jobmanager_3 ,
> session=47a72e91-c0bf-4b40-8b9f-639f07344b3f
> 2020-10-29 18:18:33 INFO  JobMaster:1031 - Connecting to
> ResourceManager
> akka://flink/user/rpc/resourcemanager_1(a8e96f157326e3f60a2df92bb1364f97)
> 2020-10-29 18:18:33 INFO  JobMaster:155 - Resolved ResourceManager
> address, beginning registration
> 2020-10-29 18:18:33 INFO  StandaloneResourceManager:330 - Registering
> job manager 8b9f639f07344b3f47a72e91c0bf4b40@akka://flink/user/rpc/jobmanager_3
> for job 1fd25ab4d3d51009542ebda2bbadb55d.
> 2020-10-29 18:18:33 INFO  StandaloneResourceManager:765 - Registered
> job manager 8b9f639f07344b3f47a72e91c0bf4b40@akka://flink/user/rpc/jobmanager_3
> for job 1fd25ab4d3d51009542ebda2bbadb55d.
> 2020-10-29 18:18:33 INFO  JobMaster:1053 - JobManager successfully
> registered at ResourceManager, leader id:
> a8e96f157326e3f60a2df92bb1364f97.
> 2020-10-29 18:18:33 INFO  SlotPoolImpl:347 - Requesting new slot
> [SlotRequestId{3e5c5fdf21b3314a9289b257aeec8c2d}] and profile
> ResourceProfile{UNKNOWN} with allocation id
> 45ea6506d06f73f61a36db764ce07ba7 from resource manager.
> 2020-10-29 18:18:33 INFO  StandaloneResourceManager:464 - Request slot
> with profile ResourceProfile{UNKNOWN} for job
> 1fd25ab4d3d51009542ebda2bbadb55d with allocation id
> 45ea6506d06f73f61a36db764ce07ba7.
> 2020-10-29 18:18:33 INFO  TaskExecutor:908 - Receive slot request
> 45ea6506d06f73f61a36db764ce07ba7 for job
> 1fd25ab4d3d51009542ebda2bbadb55d from resource manager with leader id
> a8e96f157326e3f60a2df92bb1364f97.
> 2020-10-29 18:18:33 INFO  TaskExecutor:976 - Allocated slot for
> 45ea6506d06f73f61a36db764ce07ba7.
> 2020-10-29 18:18:33 INFO  DefaultJobLeaderService:172 - Add job
> 1fd25ab4d3d51009542ebda2bbadb55d for job leader monitoring.
> 2020-10-29 18:18:33 INFO  DefaultJobLeaderService:314 - Try to
> register at job manager akka://flink/user/rpc/jobmanager_3 with leader
> id 47a72e91-c0bf-4b40-8b9f-639f07344b3f.
> 2020-10-29 18:18:33 INFO  DefaultJobLeaderService:155 - Resolved
> JobManager address, beginning registration
> 2020-10-29 18:18:33 INFO  DefaultJobLeaderService:369 - Successful
> registration at job manager akka://flink/user/rpc/jobmanager_3 for job
> 1fd25ab4d3d51009542ebda2bbadb55d.
> 2020-10-29 18:18:33 INFO  TaskExecutor:1379 - Establish JobManager
> connection for job 1fd25ab4d3d51009542ebda2bbadb55d.
> 2020-10-29 18:18:33 INFO  TaskExecutor:1278 - Offer reserved slots to
> the leader of job 1fd25ab4d3d51009542ebda2bbadb55d.
> 2020-10-29 18:18:33 INFO  PermanentBlobCache:251 - Shutting down BLOB cache
> 2020-10-29 18:18:33 INFO  TaskExecutorLocalStateStoresManager:213 -
> Shutting down TaskExecutorLocalStateStoresManager.
> 2020-10-29 18:18:33 INFO  TransientBlobCache:251 - Shutting down BLOB cache
> 2020-10-29 18:18:33 INFO  ExecutionGraph:1584 - Source:
> Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS
> account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01
> 00:12:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time]))
> (1/1) (1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0)
> switched from SCHEDULED to DEPLOYING.
> 2020-10-29 18:18:33 INFO  ExecutionGraph:725 - Deploying Source:
> Values(tuples=[[{ 0 }]]) -> (Calc(select=[CAST(1:BIGINT) AS
> account_id, CAST(188:BIGINT) AS amount, CAST(CAST(2020-01-01
> 00:12:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(2:BIGINT) AS account_id, CAST(374:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:47:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(3:BIGINT) AS account_id, CAST(112:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:36:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(4:BIGINT) AS account_id, CAST(478:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:03:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(5:BIGINT) AS account_id, CAST(208:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:08:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(1:BIGINT) AS account_id, CAST(379:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:53:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(2:BIGINT) AS account_id, CAST(351:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:32:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(3:BIGINT) AS account_id, CAST(320:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:31:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(4:BIGINT) AS account_id, CAST(259:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:19:00:TIMESTAMP(3))) AS transaction_time]),
> Calc(select=[CAST(5:BIGINT) AS account_id, CAST(273:BIGINT) AS amount,
> CAST(CAST(2020-01-01 00:42:00:TIMESTAMP(3))) AS transaction_time]))
> (1/1) (attempt #0) with attempt id
> 1fd25ab4d3d51009542ebda2bbadb55d_d07bb0f8535d5573ba7aa0f9242a6583_0_0
> to 2358fbac-908d-4aa2-b643-c32d44b40193 @ localhost (dataPort=-1) with
> allocation id 45ea6506d06f73f61a36db764ce07ba7
> 2020-10-29 18:18:33 INFO  ExecutionGraph:1584 - Sink:
> Sink(table=[default_catalog.default_database.print_table],
> fields=[account_id, amount, transaction_time]) (1/1)
> (1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0)
> switched from SCHEDULED to DEPLOYING.
> 2020-10-29 18:18:33 INFO  ExecutionGraph:725 - Deploying Sink:
> Sink(table=[default_catalog.default_database.print_table],
> fields=[account_id, amount, transaction_time]) (1/1) (attempt #0) with
> attempt id 1fd25ab4d3d51009542ebda2bbadb55d_4b71d4c67c3b183d6f63a06700c86645_0_0
> to 2358fbac-908d-4aa2-b643-c32d44b40193 @ localhost (dataPort=-1) with
> allocation id 45ea6506d06f73f61a36db764ce07ba7
> 2020-10-29 18:18:33 INFO  TaskSlotTableImpl:361 - Activate slot
> 45ea6506d06f73f61a36db764ce07ba7.
> 2020-10-29 18:18:33 INFO  FileChannelManagerImpl:146 -
> FileChannelManager removed spill file directory
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-io-9aa7adae-549f-461c-95d8-6fe47f31b695
> 2020-10-29 18:18:33 INFO  BlobServer:348 - Stopped BLOB server at 0.0.0.0:50965
> 2020-10-29 18:18:33 INFO  FileChannelManagerImpl:146 -
> FileChannelManager removed spill file directory
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-netty-shuffle-c8e6cc04-6f93-4f55-a2c5-88891c9e9e49
> 2020-10-29 18:18:33 INFO  FileCache:153 - removed file cache directory
> /var/folders/1t/dzy0w8kx6kjf8np8z37tpjd00000gp/T/flink-dist-cache-4ca7b653-8aa5-4967-b23e-574c43ab7b52
>
> Process finished with exit code 0
>
>
> [1]: https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/src/test/java/org/apache/flink/playgrounds/spendreport/SpendReportTest.java
>
> On Thu, Oct 29, 2020 at 4:53 PM Dawid Wysakowicz <[hidden email]> wrote:
>> You should be able to use the "print" sink. Remember though that the
>> "print" sink prints into the stdout/stderr of TaskManagers, not the
>> Client, where you submit the query. This is different from the
>> TableResult, which collects results in the client. BTW, for printing you
>> can use TableResult#print, which will nicely format your results.
>>
>> Best,
>>
>> Dawid
>>
>> On 29/10/2020 16:13, Ruben Laguna wrote:
>>> How can I use the Table [Print SQL connector][1]? I tried the
>>> following (batch mode)  but it does not give any output:
>>>
>>>
>>> EnvironmentSettings settings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
>>> TableEnvironment tEnv = TableEnvironment.create(settings);
>>>
>>> final LocalDateTime DATE_TIME = LocalDateTime.of(2020, 1, 1, 0, 0);
>>>
>>> Table transactions =
>>>         tEnv.fromValues(
>>>                 DataTypes.ROW(
>>>                         DataTypes.FIELD("account_id", DataTypes.BIGINT()),
>>>                         DataTypes.FIELD("amount", DataTypes.BIGINT()),
>>>                         DataTypes.FIELD("transaction_time",
>>> DataTypes.TIMESTAMP(3))),
>>>                 Row.of(1, 188, DATE_TIME.plusMinutes(12)),
>>>                 Row.of(2, 374, DATE_TIME.plusMinutes(47)),
>>>                 Row.of(3, 112, DATE_TIME.plusMinutes(36)),
>>>                 Row.of(4, 478, DATE_TIME.plusMinutes(3)),
>>>                 Row.of(5, 208, DATE_TIME.plusMinutes(8)),
>>>                 Row.of(1, 379, DATE_TIME.plusMinutes(53)),
>>>                 Row.of(2, 351, DATE_TIME.plusMinutes(32)),
>>>                 Row.of(3, 320, DATE_TIME.plusMinutes(31)),
>>>                 Row.of(4, 259, DATE_TIME.plusMinutes(19)),
>>>                 Row.of(5, 273, DATE_TIME.plusMinutes(42)));
>>> tEnv.executeSql("CREATE TABLE print_table(account_id BIGINT, amount
>>> BIGINT, transaction_time TIMESTAMP) WITH ('connector' = 'print')");
>>>
>>> transactions.executeInsert("print_table");
>>>
>>>
>>> I can "materialize" the result manually and print them out with :
>>>
>>> for (Row row : materialize(transactions.execute())) {
>>>     System.out.println(row);
>>> }
>>>
>>> private static List<Row> materialize(TableResult results) {
>>>     try (CloseableIterator<Row> resultIterator = results.collect()) {
>>>         return StreamSupport
>>>
>>> .stream(Spliterators.spliteratorUnknownSize(resultIterator,
>>> Spliterator.ORDERED), false)
>>>                 .collect(Collectors.toList());
>>>     } catch (Exception e) {
>>>         throw new RuntimeException("Failed to materialize results", e);
>>>     }
>>> }
>>>
>>>
>>> But I would like to know why I can't just use the Print sink.
>>>
>>> I've tried with  `.inBatchMode()` and with `inStreamingMode()`, so I
>>> don't thinks it's that.
>>>
>>> Does anybody know of any working example involving the print connector?
>>>
>>>
>>>
>>> [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html
>


signature.asc (849 bytes) Download Attachment